Store each ticket in its own DB bucket.

**NOTE: This contains a backwards incompatible database migration, so if you plan to test it, please make a copy of your database first.**

Moves tickets from a single database bucket containing JSON encoded strings, to a bucket for each ticket.

This change is to preemptively deal with scaling issues seen with databases containing tens of thousands of tickets.
This commit is contained in:
Jamie Holdstock 2021-05-24 17:45:47 +08:00 committed by GitHub
parent 136e389f95
commit 20cb546e74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 282 additions and 66 deletions

View File

@ -85,6 +85,16 @@ func writeHotBackupFile(db *bolt.DB) error {
return err
}
func int64ToBytes(i int64) []byte {
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, uint64(i))
return bytes
}
func bytesToInt64(bytes []byte) int64 {
return int64(binary.LittleEndian.Uint64(bytes))
}
func uint32ToBytes(i uint32) []byte {
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, i)

View File

@ -39,37 +39,53 @@ const (
Voted TicketOutcome = "voted"
)
// Ticket is serialized to json and stored in bbolt db. The json keys are
// deliberately kept short because they are duplicated many times in the db.
// The keys used to store ticket values in the database.
var (
hashK = []byte("Hash")
purchaseHeightK = []byte("PurchaseHeight")
commitmentAddressK = []byte("CommitmentAddress")
feeAddressIndexK = []byte("FeeAddressIndex")
feeAddressK = []byte("FeeAddress")
feeAmountK = []byte("FeeAmount")
feeExpirationK = []byte("FeeExpiration")
confirmedK = []byte("Confirmed")
votingWIFK = []byte("VotingWIF")
voteChoicesK = []byte("VoteChoices")
feeTxHexK = []byte("FeeTxHex")
feeTxHashK = []byte("FeeTxHash")
feeTxStatusK = []byte("FeeTxStatus")
outcomeK = []byte("Outcome")
)
type Ticket struct {
Hash string `json:"hsh"`
PurchaseHeight int64 `json:"phgt"`
CommitmentAddress string `json:"cmtaddr"`
FeeAddressIndex uint32 `json:"faddridx"`
FeeAddress string `json:"faddr"`
FeeAmount int64 `json:"famt"`
FeeExpiration int64 `json:"fexp"`
Hash string
PurchaseHeight int64
CommitmentAddress string
FeeAddressIndex uint32
FeeAddress string
FeeAmount int64
FeeExpiration int64
// Confirmed will be set when the ticket has 6+ confirmations.
Confirmed bool `json:"conf"`
Confirmed bool
// VotingWIF is set in /payfee.
VotingWIF string `json:"vwif"`
VotingWIF string
// VoteChoices is initially set in /payfee, but can be updated in
// /setvotechoices.
VoteChoices map[string]string `json:"vchces"`
VoteChoices map[string]string
// FeeTxHex and FeeTxHash will be set when the fee tx has been received.
FeeTxHex string `json:"fhex"`
FeeTxHash string `json:"fhsh"`
FeeTxHex string
FeeTxHash string
// FeeTxStatus indicates the current state of the fee transaction.
FeeTxStatus FeeStatus `json:"fsts"`
FeeTxStatus FeeStatus
// Outcome is set once a ticket is either voted or revoked. An empty outcome
// indicates that a ticket is still votable.
Outcome TicketOutcome `json:"otcme"`
Outcome TicketOutcome
}
func (t *Ticket) FeeExpired() bool {
@ -86,22 +102,19 @@ func (vdb *VspDatabase) InsertNewTicket(ticket Ticket) error {
return vdb.db.Update(func(tx *bolt.Tx) error {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)
hashBytes := []byte(ticket.Hash)
if ticketBkt.Get(hashBytes) != nil {
return fmt.Errorf("ticket already exists with hash %s", ticket.Hash)
// Create a bucket for the new ticket. Returns an error if bucket
// already exists.
newTicketBkt, err := ticketBkt.CreateBucket([]byte(ticket.Hash))
if err != nil {
return fmt.Errorf("could not create bucket for ticket: %w", err)
}
// Error if a ticket already exists with the same fee address.
err := ticketBkt.ForEach(func(k, v []byte) error {
var t Ticket
err := json.Unmarshal(v, &t)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
}
err = ticketBkt.ForEach(func(k, v []byte) error {
tbkt := ticketBkt.Bucket(k)
if t.FeeAddress == ticket.FeeAddress {
return fmt.Errorf("ticket with fee address %s already exists", t.FeeAddress)
if string(tbkt.Get(feeAddressK)) == ticket.FeeAddress {
return fmt.Errorf("ticket with fee address %s already exists", ticket.FeeAddress)
}
return nil
@ -110,15 +123,104 @@ func (vdb *VspDatabase) InsertNewTicket(ticket Ticket) error {
return err
}
ticketBytes, err := json.Marshal(ticket)
err = putTicketInBucket(newTicketBkt, ticket)
if err != nil {
return fmt.Errorf("could not marshal ticket: %w", err)
return fmt.Errorf("putting ticket in bucket failed: %w", err)
}
return ticketBkt.Put(hashBytes, ticketBytes)
return nil
})
}
// putTicketInBucket encodes each of the fields of the provided ticket as a byte
// array, and stores them as values within the provided db bucket.
func putTicketInBucket(bkt *bolt.Bucket, ticket Ticket) error {
var err error
if err = bkt.Put(hashK, []byte(ticket.Hash)); err != nil {
return err
}
if err = bkt.Put(commitmentAddressK, []byte(ticket.CommitmentAddress)); err != nil {
return err
}
if err = bkt.Put(feeAddressK, []byte(ticket.FeeAddress)); err != nil {
return err
}
if err = bkt.Put(votingWIFK, []byte(ticket.VotingWIF)); err != nil {
return err
}
if err = bkt.Put(feeTxHexK, []byte(ticket.FeeTxHex)); err != nil {
return err
}
if err = bkt.Put(feeTxHashK, []byte(ticket.FeeTxHash)); err != nil {
return err
}
if err = bkt.Put(feeTxStatusK, []byte(ticket.FeeTxStatus)); err != nil {
return err
}
if err = bkt.Put(outcomeK, []byte(ticket.Outcome)); err != nil {
return err
}
if err = bkt.Put(purchaseHeightK, int64ToBytes(ticket.PurchaseHeight)); err != nil {
return err
}
if err = bkt.Put(feeAddressIndexK, uint32ToBytes(ticket.FeeAddressIndex)); err != nil {
return err
}
if err = bkt.Put(feeAmountK, int64ToBytes(ticket.FeeAmount)); err != nil {
return err
}
if err = bkt.Put(feeExpirationK, int64ToBytes(ticket.FeeExpiration)); err != nil {
return err
}
confirmed := []byte{0}
if ticket.Confirmed {
confirmed = []byte{1}
}
if err = bkt.Put(confirmedK, confirmed); err != nil {
return err
}
jsonVoteChoices, err := json.Marshal(ticket.VoteChoices)
if err != nil {
return err
}
return bkt.Put(voteChoicesK, jsonVoteChoices)
}
func getTicketFromBkt(bkt *bolt.Bucket) (Ticket, error) {
var ticket Ticket
ticket.Hash = string(bkt.Get(hashK))
ticket.CommitmentAddress = string(bkt.Get(commitmentAddressK))
ticket.FeeAddress = string(bkt.Get(feeAddressK))
ticket.VotingWIF = string(bkt.Get(votingWIFK))
ticket.FeeTxHex = string(bkt.Get(feeTxHexK))
ticket.FeeTxHash = string(bkt.Get(feeTxHashK))
ticket.FeeTxStatus = FeeStatus(bkt.Get(feeTxStatusK))
ticket.Outcome = TicketOutcome(bkt.Get(outcomeK))
ticket.PurchaseHeight = bytesToInt64(bkt.Get(purchaseHeightK))
ticket.FeeAddressIndex = bytesToUint32(bkt.Get(feeAddressIndexK))
ticket.FeeAmount = bytesToInt64(bkt.Get(feeAmountK))
ticket.FeeExpiration = bytesToInt64(bkt.Get(feeExpirationK))
// TODO is this dodgy?
if bkt.Get(confirmedK)[0] == byte(1) {
ticket.Confirmed = true
}
voteChoices := make(map[string]string)
err := json.Unmarshal(bkt.Get(voteChoicesK), &voteChoices)
if err != nil {
return ticket, err
}
ticket.VoteChoices = voteChoices
return ticket, nil
}
func (vdb *VspDatabase) DeleteTicket(ticket Ticket) error {
vdb.ticketsMtx.Lock()
defer vdb.ticketsMtx.Unlock()
@ -126,7 +228,7 @@ func (vdb *VspDatabase) DeleteTicket(ticket Ticket) error {
return vdb.db.Update(func(tx *bolt.Tx) error {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)
err := ticketBkt.Delete([]byte(ticket.Hash))
err := ticketBkt.DeleteBucket([]byte(ticket.Hash))
if err != nil {
return fmt.Errorf("could not delete ticket: %w", err)
}
@ -142,18 +244,13 @@ func (vdb *VspDatabase) UpdateTicket(ticket Ticket) error {
return vdb.db.Update(func(tx *bolt.Tx) error {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)
hashBytes := []byte(ticket.Hash)
bkt := ticketBkt.Bucket([]byte(ticket.Hash))
if ticketBkt.Get(hashBytes) == nil {
if bkt == nil {
return fmt.Errorf("ticket does not exist with hash %s", ticket.Hash)
}
ticketBytes, err := json.Marshal(ticket)
if err != nil {
return fmt.Errorf("could not marshal ticket: %w", err)
}
return ticketBkt.Put(hashBytes, ticketBytes)
return putTicketInBucket(bkt, ticket)
})
}
@ -164,16 +261,16 @@ func (vdb *VspDatabase) GetTicketByHash(ticketHash string) (Ticket, bool, error)
var ticket Ticket
var found bool
err := vdb.db.View(func(tx *bolt.Tx) error {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK).Bucket([]byte(ticketHash))
ticketBytes := ticketBkt.Get([]byte(ticketHash))
if ticketBytes == nil {
if ticketBkt == nil {
return nil
}
err := json.Unmarshal(ticketBytes, &ticket)
var err error
ticket, err = getTicketFromBkt(ticketBkt)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
return fmt.Errorf("could not get ticket: %w", err)
}
found = true
@ -185,8 +282,7 @@ func (vdb *VspDatabase) GetTicketByHash(ticketHash string) (Ticket, bool, error)
}
// CountTickets returns the total number of voted, revoked, and currently voting
// tickets. Requires deserializing every ticket in the db so should be used
// sparingly.
// tickets. This func iterates over every ticket so should be used sparingly.
func (vdb *VspDatabase) CountTickets() (int64, int64, int64, error) {
vdb.ticketsMtx.RLock()
defer vdb.ticketsMtx.RUnlock()
@ -196,14 +292,10 @@ func (vdb *VspDatabase) CountTickets() (int64, int64, int64, error) {
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)
return ticketBkt.ForEach(func(k, v []byte) error {
var ticket Ticket
err := json.Unmarshal(v, &ticket)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
}
tBkt := ticketBkt.Bucket(k)
if ticket.FeeTxStatus == FeeConfirmed {
switch ticket.Outcome {
if FeeStatus(tBkt.Get(feeTxStatusK)) == FeeConfirmed {
switch TicketOutcome(tBkt.Get(outcomeK)) {
case Voted:
voted++
case Revoked:
@ -270,10 +362,9 @@ func (vdb *VspDatabase) filterTickets(filter func(Ticket) bool) ([]Ticket, error
ticketBkt := tx.Bucket(vspBktK).Bucket(ticketBktK)
return ticketBkt.ForEach(func(k, v []byte) error {
var ticket Ticket
err := json.Unmarshal(v, &ticket)
ticket, err := getTicketFromBkt(ticketBkt.Bucket(k))
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
return fmt.Errorf("could not get ticket: %w", err)
}
if filter(ticket) {

View File

@ -159,12 +159,14 @@ func testUpdateTicket(t *testing.T) {
// Update ticket with new values.
ticket.FeeAmount = ticket.FeeAmount + 1
ticket.FeeExpiration = ticket.FeeExpiration + 1
ticket.VoteChoices = map[string]string{"New agenda": "New value"}
err = db.UpdateTicket(ticket)
if err != nil {
t.Fatalf("error updating ticket: %v", err)
}
// Retrieve ticket from database.
// Retrieve updated ticket from database.
retrieved, found, err := db.GetTicketByHash(ticket.Hash)
if err != nil {
t.Fatalf("error retrieving ticket by ticket hash: %v", err)
@ -174,7 +176,8 @@ func testUpdateTicket(t *testing.T) {
}
if ticket.FeeAmount != retrieved.FeeAmount ||
ticket.FeeExpiration != retrieved.FeeExpiration {
ticket.FeeExpiration != retrieved.FeeExpiration ||
!reflect.DeepEqual(retrieved.VoteChoices, ticket.VoteChoices) {
t.Fatal("retrieved ticket value didnt match expected")
}

View File

@ -19,7 +19,7 @@ func removeOldFeeTxUpgrade(db *bolt.DB) error {
count := 0
err := ticketBkt.ForEach(func(k, v []byte) error {
// Deserialize the old ticket.
var ticket Ticket
var ticket v1Ticket
err := json.Unmarshal(v, &ticket)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
@ -51,7 +51,7 @@ func removeOldFeeTxUpgrade(db *bolt.DB) error {
// Update database version.
err = vspBkt.Put(versionK, uint32ToBytes(removeOldFeeTxVersion))
if err != nil {
return err
return fmt.Errorf("failed to update db version: %w", err)
}
return nil

86
database/upgrade_v3.go Normal file
View File

@ -0,0 +1,86 @@
package database
import (
"encoding/json"
"fmt"
bolt "go.etcd.io/bbolt"
)
func ticketBucketUpgrade(db *bolt.DB) error {
log.Infof("Upgrading database to version %d", ticketBucketVersion)
// Run the upgrade in a single database transaction so it can be safely
// rolled back if an error is encountered.
err := db.Update(func(tx *bolt.Tx) error {
vspBkt := tx.Bucket(vspBktK)
ticketBkt := vspBkt.Bucket(ticketBktK)
// Count tickets so migration progress can be logged.
todo := 0
err := ticketBkt.ForEach(func(k, v []byte) error {
todo++
return nil
})
if err != nil {
return fmt.Errorf("could not count tickets: %w", err)
}
done := 0
const batchSize = 2000
err = ticketBkt.ForEach(func(k, v []byte) error {
// Deserialize the old ticket.
var ticket v1Ticket
err := json.Unmarshal(v, &ticket)
if err != nil {
return fmt.Errorf("could not unmarshal ticket: %w", err)
}
// Delete the old ticket.
err = ticketBkt.Delete(k)
if err != nil {
return fmt.Errorf("could not delete ticket: %w", err)
}
// Insert the new ticket.
newBkt, err := ticketBkt.CreateBucket(k)
if err != nil {
return fmt.Errorf("could not create new ticket bucket: %w", err)
}
err = putTicketInBucket(newBkt, Ticket(ticket))
if err != nil {
return fmt.Errorf("could not put new ticket in bucket: %w", err)
}
done++
if done%batchSize == 0 {
log.Infof("Migrated %d/%d tickets", done, todo)
}
return nil
})
if err != nil {
return err
}
if done > 0 && done%batchSize != 0 {
log.Infof("Migrated %d/%d tickets", done, todo)
}
// Update database version.
err = vspBkt.Put(versionK, uint32ToBytes(ticketBucketVersion))
if err != nil {
return fmt.Errorf("failed to update db version: %w", err)
}
return nil
})
if err != nil {
return err
}
log.Info("Upgrade completed")
return nil
}

View File

@ -16,16 +16,41 @@ const (
// need to keep these, and they take up a lot of space.
removeOldFeeTxVersion = 2
// ticketBucketVersion changes the way tickets are stored. Previously they
// were stored as JSON encoded strings in a single bucket. This upgrade
// moves each ticket into its own bucket and does away with JSON encoding.
ticketBucketVersion = 3
// latestVersion is the latest version of the database that is understood by
// vspd. Databases with recorded versions higher than this will fail to open
// (meaning any upgrades prevent reverting to older software).
latestVersion = removeOldFeeTxVersion
latestVersion = ticketBucketVersion
)
// upgrades maps between old database versions and the upgrade function to
// upgrade the database to the next version.
var upgrades = []func(tx *bolt.DB) error{
initialVersion: removeOldFeeTxUpgrade,
removeOldFeeTxVersion: ticketBucketUpgrade,
}
// v1Ticket has the json tags required to unmarshal tickets stored in the
// v1 database format.
type v1Ticket struct {
Hash string `json:"hsh"`
PurchaseHeight int64 `json:"phgt"`
CommitmentAddress string `json:"cmtaddr"`
FeeAddressIndex uint32 `json:"faddridx"`
FeeAddress string `json:"faddr"`
FeeAmount int64 `json:"famt"`
FeeExpiration int64 `json:"fexp"`
Confirmed bool `json:"conf"`
VotingWIF string `json:"vwif"`
VoteChoices map[string]string `json:"vchces"`
FeeTxHex string `json:"fhex"`
FeeTxHash string `json:"fhsh"`
FeeTxStatus FeeStatus `json:"fsts"`
Outcome TicketOutcome `json:"otcme"`
}
// Upgrade will update the database to the latest known version.

View File

@ -89,9 +89,10 @@ func feeAddress(c *gin.Context) {
ticketHash := request.TicketHash
// Respond early if we already have the fee tx for this ticket.
if ticket.FeeTxStatus == database.FeeReceieved ||
if knownTicket &&
(ticket.FeeTxStatus == database.FeeReceieved ||
ticket.FeeTxStatus == database.FeeBroadcast ||
ticket.FeeTxStatus == database.FeeConfirmed {
ticket.FeeTxStatus == database.FeeConfirmed) {
log.Warnf("%s: Fee tx already received (clientIP=%s, ticketHash=%s)",
funcName, c.ClientIP(), ticket.Hash)
sendError(errFeeAlreadyReceived, c)