vspd: Introduce vspd struct.

Storing all of the essential resources such as db, rpcs and logger in a
vspd struct enables vspd functions to be called without passing a bunch
of parameters unnecessarily. This will be increasingly useful later when
further changes are introduced.
This commit is contained in:
jholdstock 2023-08-23 09:53:02 +01:00 committed by Jamie Holdstock
parent c039dc86cb
commit 4419ae3a6e
2 changed files with 168 additions and 149 deletions

View File

@ -8,7 +8,6 @@ import (
"errors" "errors"
"strings" "strings"
"github.com/decred/slog"
"github.com/decred/vspd/database" "github.com/decred/vspd/database"
"github.com/decred/vspd/rpc" "github.com/decred/vspd/rpc"
"github.com/jrick/wsrpc/v2" "github.com/jrick/wsrpc/v2"
@ -22,22 +21,21 @@ const (
// blockConnected is called once when vspd starts up, and once each time a // blockConnected is called once when vspd starts up, and once each time a
// blockconnected notification is received from dcrd. // blockconnected notification is received from dcrd.
func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *database.VspDatabase, log slog.Logger) { func (v *vspd) blockConnected() {
const funcName = "blockConnected" const funcName = "blockConnected"
dcrdClient, _, err := dcrdRPC.Client() dcrdClient, _, err := v.dcrd.Client()
if err != nil { if err != nil {
log.Errorf("%s: %v", funcName, err) v.log.Errorf("%s: %v", funcName, err)
return return
} }
// Step 1/4: Update the database with any tickets which now have 6+ // Step 1/4: Update the database with any tickets which now have 6+
// confirmations. // confirmations.
unconfirmed, err := db.GetUnconfirmedTickets() unconfirmed, err := v.db.GetUnconfirmedTickets()
if err != nil { if err != nil {
log.Errorf("%s: db.GetUnconfirmedTickets error: %v", funcName, err) v.log.Errorf("%s: db.GetUnconfirmedTickets error: %v", funcName, err)
} }
for _, ticket := range unconfirmed { for _, ticket := range unconfirmed {
@ -49,24 +47,24 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
// which expired. Remove it from the db. // which expired. Remove it from the db.
var e *wsrpc.Error var e *wsrpc.Error
if errors.As(err, &e) && e.Code == rpc.ErrNoTxInfo { if errors.As(err, &e) && e.Code == rpc.ErrNoTxInfo {
log.Infof("%s: Removing unconfirmed ticket from db - no information available "+ v.log.Infof("%s: Removing unconfirmed ticket from db - no information available "+
"about transaction (ticketHash=%s)", funcName, ticket.Hash) "about transaction (ticketHash=%s)", funcName, ticket.Hash)
err = db.DeleteTicket(ticket) err = v.db.DeleteTicket(ticket)
if err != nil { if err != nil {
log.Errorf("%s: db.DeleteTicket error (ticketHash=%s): %v", v.log.Errorf("%s: db.DeleteTicket error (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
} }
// This will not error if an alternate signing address does not // This will not error if an alternate signing address does not
// exist for ticket. // exist for ticket.
err = db.DeleteAltSignAddr(ticket.Hash) err = v.db.DeleteAltSignAddr(ticket.Hash)
if err != nil { if err != nil {
log.Errorf("%s: db.DeleteAltSignAddr error (ticketHash=%s): %v", v.log.Errorf("%s: db.DeleteAltSignAddr error (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
} }
} else { } else {
log.Errorf("%s: dcrd.GetRawTransaction for ticket failed (ticketHash=%s): %v", v.log.Errorf("%s: dcrd.GetRawTransaction for ticket failed (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
} }
@ -76,70 +74,70 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
if tktTx.Confirmations >= requiredConfs { if tktTx.Confirmations >= requiredConfs {
ticket.PurchaseHeight = tktTx.BlockHeight ticket.PurchaseHeight = tktTx.BlockHeight
ticket.Confirmed = true ticket.Confirmed = true
err = db.UpdateTicket(ticket) err = v.db.UpdateTicket(ticket)
if err != nil { if err != nil {
log.Errorf("%s: db.UpdateTicket error, failed to set ticket as confirmed (ticketHash=%s): %v", v.log.Errorf("%s: db.UpdateTicket error, failed to set ticket as confirmed (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
continue continue
} }
log.Infof("%s: Ticket confirmed (ticketHash=%s)", funcName, ticket.Hash) v.log.Infof("%s: Ticket confirmed (ticketHash=%s)", funcName, ticket.Hash)
} }
} }
// Step 2/4: Broadcast fee tx for tickets which are confirmed. // Step 2/4: Broadcast fee tx for tickets which are confirmed.
pending, err := db.GetPendingFees() pending, err := v.db.GetPendingFees()
if err != nil { if err != nil {
log.Errorf("%s: db.GetPendingFees error: %v", funcName, err) v.log.Errorf("%s: db.GetPendingFees error: %v", funcName, err)
} }
for _, ticket := range pending { for _, ticket := range pending {
err = dcrdClient.SendRawTransaction(ticket.FeeTxHex) err = dcrdClient.SendRawTransaction(ticket.FeeTxHex)
if err != nil { if err != nil {
log.Errorf("%s: dcrd.SendRawTransaction for fee tx failed (ticketHash=%s): %v", v.log.Errorf("%s: dcrd.SendRawTransaction for fee tx failed (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
ticket.FeeTxStatus = database.FeeError ticket.FeeTxStatus = database.FeeError
} else { } else {
log.Infof("%s: Fee tx broadcast for ticket (ticketHash=%s, feeHash=%s)", v.log.Infof("%s: Fee tx broadcast for ticket (ticketHash=%s, feeHash=%s)",
funcName, ticket.Hash, ticket.FeeTxHash) funcName, ticket.Hash, ticket.FeeTxHash)
ticket.FeeTxStatus = database.FeeBroadcast ticket.FeeTxStatus = database.FeeBroadcast
} }
err = db.UpdateTicket(ticket) err = v.db.UpdateTicket(ticket)
if err != nil { if err != nil {
log.Errorf("%s: db.UpdateTicket error, failed to set fee tx as broadcast (ticketHash=%s): %v", v.log.Errorf("%s: db.UpdateTicket error, failed to set fee tx as broadcast (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
} }
} }
// Step 3/4: Add tickets with confirmed fees to voting wallets. // Step 3/4: Add tickets with confirmed fees to voting wallets.
unconfirmedFees, err := db.GetUnconfirmedFees() unconfirmedFees, err := v.db.GetUnconfirmedFees()
if err != nil { if err != nil {
log.Errorf("%s: db.GetUnconfirmedFees error: %v", funcName, err) v.log.Errorf("%s: db.GetUnconfirmedFees error: %v", funcName, err)
} }
walletClients, failedConnections := walletRPC.Clients() walletClients, failedConnections := v.wallets.Clients()
if len(walletClients) == 0 { if len(walletClients) == 0 {
log.Errorf("%s: Could not connect to any wallets", funcName) v.log.Errorf("%s: Could not connect to any wallets", funcName)
return return
} }
if len(failedConnections) > 0 { if len(failedConnections) > 0 {
log.Errorf("%s: Failed to connect to %d wallet(s), proceeding with only %d", v.log.Errorf("%s: Failed to connect to %d wallet(s), proceeding with only %d",
funcName, len(failedConnections), len(walletClients)) funcName, len(failedConnections), len(walletClients))
} }
for _, ticket := range unconfirmedFees { for _, ticket := range unconfirmedFees {
feeTx, err := dcrdClient.GetRawTransaction(ticket.FeeTxHash) feeTx, err := dcrdClient.GetRawTransaction(ticket.FeeTxHash)
if err != nil { if err != nil {
log.Errorf("%s: dcrd.GetRawTransaction for fee tx failed (feeTxHash=%s, ticketHash=%s): %v", v.log.Errorf("%s: dcrd.GetRawTransaction for fee tx failed (feeTxHash=%s, ticketHash=%s): %v",
funcName, ticket.FeeTxHash, ticket.Hash, err) funcName, ticket.FeeTxHash, ticket.Hash, err)
ticket.FeeTxStatus = database.FeeError ticket.FeeTxStatus = database.FeeError
err = db.UpdateTicket(ticket) err = v.db.UpdateTicket(ticket)
if err != nil { if err != nil {
log.Errorf("%s: db.UpdateTicket error, failed to set fee tx status to error (ticketHash=%s): %v", v.log.Errorf("%s: db.UpdateTicket error, failed to set fee tx status to error (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
} }
continue continue
@ -151,26 +149,26 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
// We no longer need the hex once the tx is confirmed on-chain. // We no longer need the hex once the tx is confirmed on-chain.
ticket.FeeTxHex = "" ticket.FeeTxHex = ""
ticket.FeeTxStatus = database.FeeConfirmed ticket.FeeTxStatus = database.FeeConfirmed
err = db.UpdateTicket(ticket) err = v.db.UpdateTicket(ticket)
if err != nil { if err != nil {
log.Errorf("%s: db.UpdateTicket error, failed to set fee tx as confirmed (ticketHash=%s): %v", v.log.Errorf("%s: db.UpdateTicket error, failed to set fee tx as confirmed (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
continue continue
} }
log.Infof("%s: Fee tx confirmed (ticketHash=%s)", funcName, ticket.Hash) v.log.Infof("%s: Fee tx confirmed (ticketHash=%s)", funcName, ticket.Hash)
// Add ticket to the voting wallet. // Add ticket to the voting wallet.
rawTicket, err := dcrdClient.GetRawTransaction(ticket.Hash) rawTicket, err := dcrdClient.GetRawTransaction(ticket.Hash)
if err != nil { if err != nil {
log.Errorf("%s: dcrd.GetRawTransaction for ticket failed (ticketHash=%s): %v", v.log.Errorf("%s: dcrd.GetRawTransaction for ticket failed (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
continue continue
} }
for _, walletClient := range walletClients { for _, walletClient := range walletClients {
err = walletClient.AddTicketForVoting(ticket.VotingWIF, rawTicket.BlockHash, rawTicket.Hex) err = walletClient.AddTicketForVoting(ticket.VotingWIF, rawTicket.BlockHash, rawTicket.Hex)
if err != nil { if err != nil {
log.Errorf("%s: dcrwallet.AddTicketForVoting error (wallet=%s, ticketHash=%s): %v", v.log.Errorf("%s: dcrwallet.AddTicketForVoting error (wallet=%s, ticketHash=%s): %v",
funcName, walletClient.String(), ticket.Hash, err) funcName, walletClient.String(), ticket.Hash, err)
continue continue
} }
@ -180,16 +178,16 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
err = walletClient.SetVoteChoice(agenda, choice, ticket.Hash) err = walletClient.SetVoteChoice(agenda, choice, ticket.Hash)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "no agenda with ID") { if strings.Contains(err.Error(), "no agenda with ID") {
log.Warnf("%s: Removing invalid agenda from ticket vote choices (ticketHash=%s, agenda=%s)", v.log.Warnf("%s: Removing invalid agenda from ticket vote choices (ticketHash=%s, agenda=%s)",
funcName, ticket.Hash, agenda) funcName, ticket.Hash, agenda)
delete(ticket.VoteChoices, agenda) delete(ticket.VoteChoices, agenda)
err = db.UpdateTicket(ticket) err = v.db.UpdateTicket(ticket)
if err != nil { if err != nil {
log.Errorf("%s: db.UpdateTicket error, failed to remove invalid agenda (ticketHash=%s): %v", v.log.Errorf("%s: db.UpdateTicket error, failed to remove invalid agenda (ticketHash=%s): %v",
funcName, ticket.Hash, err) funcName, ticket.Hash, err)
} }
} else { } else {
log.Errorf("%s: dcrwallet.SetVoteChoice error (wallet=%s, ticketHash=%s): %v", v.log.Errorf("%s: dcrwallet.SetVoteChoice error (wallet=%s, ticketHash=%s): %v",
funcName, walletClient.String(), ticket.Hash, err) funcName, walletClient.String(), ticket.Hash, err)
} }
} }
@ -199,7 +197,7 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
for tspend, policy := range ticket.TSpendPolicy { for tspend, policy := range ticket.TSpendPolicy {
err = walletClient.SetTSpendPolicy(tspend, policy, ticket.Hash) err = walletClient.SetTSpendPolicy(tspend, policy, ticket.Hash)
if err != nil { if err != nil {
log.Errorf("%s: dcrwallet.SetTSpendPolicy failed (wallet=%s, ticketHash=%s): %v", v.log.Errorf("%s: dcrwallet.SetTSpendPolicy failed (wallet=%s, ticketHash=%s): %v",
funcName, walletClient.String(), ticket.Hash, err) funcName, walletClient.String(), ticket.Hash, err)
} }
} }
@ -208,12 +206,12 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
for key, policy := range ticket.TreasuryPolicy { for key, policy := range ticket.TreasuryPolicy {
err = walletClient.SetTreasuryPolicy(key, policy, ticket.Hash) err = walletClient.SetTreasuryPolicy(key, policy, ticket.Hash)
if err != nil { if err != nil {
log.Errorf("%s: dcrwallet.SetTreasuryPolicy failed (wallet=%s, ticketHash=%s): %v", v.log.Errorf("%s: dcrwallet.SetTreasuryPolicy failed (wallet=%s, ticketHash=%s): %v",
funcName, walletClient.String(), ticket.Hash, err) funcName, walletClient.String(), ticket.Hash, err)
} }
} }
log.Infof("%s: Ticket added to voting wallet (wallet=%s, ticketHash=%s)", v.log.Infof("%s: Ticket added to voting wallet (wallet=%s, ticketHash=%s)",
funcName, walletClient.String(), ticket.Hash) funcName, walletClient.String(), ticket.Hash)
} }
} }
@ -227,9 +225,9 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
// successful wallet will have the most up-to-date ticket status, the others // successful wallet will have the most up-to-date ticket status, the others
// will be outdated. // will be outdated.
for _, walletClient := range walletClients { for _, walletClient := range walletClients {
votableTickets, err := db.GetVotableTickets() votableTickets, err := v.db.GetVotableTickets()
if err != nil { if err != nil {
log.Errorf("%s: db.GetVotableTickets failed: %v", funcName, err) v.log.Errorf("%s: db.GetVotableTickets failed: %v", funcName, err)
continue continue
} }
@ -243,7 +241,7 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
ticketInfo, err := walletClient.TicketInfo(oldestHeight) ticketInfo, err := walletClient.TicketInfo(oldestHeight)
if err != nil { if err != nil {
log.Errorf("%s: dcrwallet.TicketInfo failed (startHeight=%d, wallet=%s): %v", v.log.Errorf("%s: dcrwallet.TicketInfo failed (startHeight=%d, wallet=%s): %v",
funcName, oldestHeight, walletClient.String(), err) funcName, oldestHeight, walletClient.String(), err)
continue continue
} }
@ -251,7 +249,7 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
for _, dbTicket := range votableTickets { for _, dbTicket := range votableTickets {
tInfo, ok := ticketInfo[dbTicket.Hash] tInfo, ok := ticketInfo[dbTicket.Hash]
if !ok { if !ok {
log.Warnf("%s: TicketInfo response did not include expected ticket (wallet=%s, ticketHash=%s)", v.log.Warnf("%s: TicketInfo response did not include expected ticket (wallet=%s, ticketHash=%s)",
funcName, walletClient.String(), dbTicket.Hash) funcName, walletClient.String(), dbTicket.Hash)
continue continue
} }
@ -266,14 +264,14 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
continue continue
} }
err = db.UpdateTicket(dbTicket) err = v.db.UpdateTicket(dbTicket)
if err != nil { if err != nil {
log.Errorf("%s: db.UpdateTicket error, failed to set ticket outcome (ticketHash=%s): %v", v.log.Errorf("%s: db.UpdateTicket error, failed to set ticket outcome (ticketHash=%s): %v",
funcName, dbTicket.Hash, err) funcName, dbTicket.Hash, err)
continue continue
} }
log.Infof("%s: Ticket no longer votable: outcome=%s, ticketHash=%s", funcName, v.log.Infof("%s: Ticket no longer votable: outcome=%s, ticketHash=%s", funcName,
dbTicket.Outcome, dbTicket.Hash) dbTicket.Outcome, dbTicket.Hash)
} }
} }
@ -283,33 +281,32 @@ func blockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *da
// checkWalletConsistency will retrieve all votable tickets from the database // checkWalletConsistency will retrieve all votable tickets from the database
// and ensure they are all added to voting wallets with the correct vote // and ensure they are all added to voting wallets with the correct vote
// choices. // choices.
func checkWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *database.VspDatabase, log slog.Logger) { func (v *vspd) checkWalletConsistency() {
const funcName = "checkWalletConsistency" const funcName = "checkWalletConsistency"
log.Info("Checking voting wallet consistency") v.log.Info("Checking voting wallet consistency")
dcrdClient, _, err := dcrdRPC.Client() dcrdClient, _, err := v.dcrd.Client()
if err != nil { if err != nil {
log.Errorf("%s: %v", funcName, err) v.log.Errorf("%s: %v", funcName, err)
return return
} }
walletClients, failedConnections := walletRPC.Clients() walletClients, failedConnections := v.wallets.Clients()
if len(walletClients) == 0 { if len(walletClients) == 0 {
log.Errorf("%s: Could not connect to any wallets", funcName) v.log.Errorf("%s: Could not connect to any wallets", funcName)
return return
} }
if len(failedConnections) > 0 { if len(failedConnections) > 0 {
log.Errorf("%s: Failed to connect to %d wallet(s), proceeding with only %d", v.log.Errorf("%s: Failed to connect to %d wallet(s), proceeding with only %d",
funcName, len(failedConnections), len(walletClients)) funcName, len(failedConnections), len(walletClients))
} }
// Step 1/2: Check all tickets are added to all voting wallets. // Step 1/2: Check all tickets are added to all voting wallets.
votableTickets, err := db.GetVotableTickets() votableTickets, err := v.db.GetVotableTickets()
if err != nil { if err != nil {
log.Errorf("%s: db.GetVotableTickets failed: %v", funcName, err) v.log.Errorf("%s: db.GetVotableTickets failed: %v", funcName, err)
return return
} }
@ -326,7 +323,7 @@ func checkWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect
// Get all tickets the wallet is aware of. // Get all tickets the wallet is aware of.
walletTickets, err := walletClient.TicketInfo(oldestHeight) walletTickets, err := walletClient.TicketInfo(oldestHeight)
if err != nil { if err != nil {
log.Errorf("%s: dcrwallet.TicketInfo failed (startHeight=%d, wallet=%s): %v", v.log.Errorf("%s: dcrwallet.TicketInfo failed (startHeight=%d, wallet=%s): %v",
funcName, oldestHeight, walletClient.String(), err) funcName, oldestHeight, walletClient.String(), err)
continue continue
} }
@ -342,18 +339,18 @@ func checkWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect
continue continue
} }
log.Debugf("%s: Adding missing ticket (wallet=%s, ticketHash=%s)", v.log.Debugf("%s: Adding missing ticket (wallet=%s, ticketHash=%s)",
funcName, walletClient.String(), dbTicket.Hash) funcName, walletClient.String(), dbTicket.Hash)
rawTicket, err := dcrdClient.GetRawTransaction(dbTicket.Hash) rawTicket, err := dcrdClient.GetRawTransaction(dbTicket.Hash)
if err != nil { if err != nil {
log.Errorf("%s: dcrd.GetRawTransaction error: %v", funcName, err) v.log.Errorf("%s: dcrd.GetRawTransaction error: %v", funcName, err)
continue continue
} }
err = walletClient.AddTicketForVoting(dbTicket.VotingWIF, rawTicket.BlockHash, rawTicket.Hex) err = walletClient.AddTicketForVoting(dbTicket.VotingWIF, rawTicket.BlockHash, rawTicket.Hex)
if err != nil { if err != nil {
log.Errorf("%s: dcrwallet.AddTicketForVoting error (wallet=%s, ticketHash=%s): %v", v.log.Errorf("%s: dcrwallet.AddTicketForVoting error (wallet=%s, ticketHash=%s): %v",
funcName, walletClient.String(), dbTicket.Hash, err) funcName, walletClient.String(), dbTicket.Hash, err)
continue continue
} }
@ -366,11 +363,11 @@ func checkWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect
// Perform a rescan if any missing tickets were added to this wallet. // Perform a rescan if any missing tickets were added to this wallet.
if added { if added {
log.Infof("%s: Performing a rescan on wallet %s (fromHeight=%d)", v.log.Infof("%s: Performing a rescan on wallet %s (fromHeight=%d)",
funcName, walletClient.String(), minHeight) funcName, walletClient.String(), minHeight)
err = walletClient.RescanFrom(minHeight) err = walletClient.RescanFrom(minHeight)
if err != nil { if err != nil {
log.Errorf("%s: dcrwallet.RescanFrom failed (wallet=%s): %v", v.log.Errorf("%s: dcrwallet.RescanFrom failed (wallet=%s): %v",
funcName, walletClient.String(), err) funcName, walletClient.String(), err)
continue continue
} }
@ -384,7 +381,7 @@ func checkWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect
// Get all tickets the wallet is aware of. // Get all tickets the wallet is aware of.
walletTickets, err := walletClient.TicketInfo(oldestHeight) walletTickets, err := walletClient.TicketInfo(oldestHeight)
if err != nil { if err != nil {
log.Errorf("%s: dcrwallet.TicketInfo failed (startHeight=%d, wallet=%s): %v", v.log.Errorf("%s: dcrwallet.TicketInfo failed (startHeight=%d, wallet=%s): %v",
funcName, oldestHeight, walletClient.String(), err) funcName, oldestHeight, walletClient.String(), err)
continue continue
} }
@ -394,7 +391,7 @@ func checkWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect
// a warning if any are still missing. // a warning if any are still missing.
walletTicket, exists := walletTickets[dbTicket.Hash] walletTicket, exists := walletTickets[dbTicket.Hash]
if !exists { if !exists {
log.Warnf("%s: Ticket missing from voting wallet (wallet=%s, ticketHash=%s)", v.log.Warnf("%s: Ticket missing from voting wallet (wallet=%s, ticketHash=%s)",
funcName, walletClient.String(), dbTicket.Hash) funcName, walletClient.String(), dbTicket.Hash)
continue continue
} }
@ -413,7 +410,7 @@ func checkWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect
continue continue
} }
log.Debugf("%s: Updating incorrect consensus vote choices (wallet=%s, agenda=%s, ticketHash=%s)", v.log.Debugf("%s: Updating incorrect consensus vote choices (wallet=%s, agenda=%s, ticketHash=%s)",
funcName, walletClient.String(), dbAgenda, dbTicket.Hash) funcName, walletClient.String(), dbAgenda, dbTicket.Hash)
// If db and wallet are not matching, update wallet with correct // If db and wallet are not matching, update wallet with correct
@ -421,16 +418,16 @@ func checkWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect
err = walletClient.SetVoteChoice(dbAgenda, dbChoice, dbTicket.Hash) err = walletClient.SetVoteChoice(dbAgenda, dbChoice, dbTicket.Hash)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "no agenda with ID") { if strings.Contains(err.Error(), "no agenda with ID") {
log.Warnf("%s: Removing invalid agenda from ticket vote choices (ticketHash=%s, agenda=%s)", v.log.Warnf("%s: Removing invalid agenda from ticket vote choices (ticketHash=%s, agenda=%s)",
funcName, dbTicket.Hash, dbAgenda) funcName, dbTicket.Hash, dbAgenda)
delete(dbTicket.VoteChoices, dbAgenda) delete(dbTicket.VoteChoices, dbAgenda)
err = db.UpdateTicket(dbTicket) err = v.db.UpdateTicket(dbTicket)
if err != nil { if err != nil {
log.Errorf("%s: db.UpdateTicket error, failed to remove invalid agenda (ticketHash=%s): %v", v.log.Errorf("%s: db.UpdateTicket error, failed to remove invalid agenda (ticketHash=%s): %v",
funcName, dbTicket.Hash, err) funcName, dbTicket.Hash, err)
} }
} else { } else {
log.Errorf("%s: dcrwallet.SetVoteChoice error (wallet=%s, ticketHash=%s): %v", v.log.Errorf("%s: dcrwallet.SetVoteChoice error (wallet=%s, ticketHash=%s): %v",
funcName, walletClient.String(), dbTicket.Hash, err) funcName, walletClient.String(), dbTicket.Hash, err)
} }
} }

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/decred/dcrd/wire" "github.com/decred/dcrd/wire"
"github.com/decred/slog"
"github.com/decred/vspd/database" "github.com/decred/vspd/database"
"github.com/decred/vspd/rpc" "github.com/decred/vspd/rpc"
"github.com/decred/vspd/version" "github.com/decred/vspd/version"
@ -28,89 +29,110 @@ const maxVoteChangeRecords = 10
const consistencyInterval = 30 * time.Minute const consistencyInterval = 30 * time.Minute
func main() { func main() {
// Run until an exit code is returned.
os.Exit(run())
}
// run is the main startup and teardown logic performed by the main package. It
// is responsible for parsing the config, creating dcrd and dcrwallet RPC clients,
// opening the database, starting the webserver, and stopping all started
// services when a shutdown is requested.
func run() int {
// Load config file and parse CLI args. // Load config file and parse CLI args.
cfg, err := loadConfig() cfg, err := loadConfig()
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Config error: %v\n", err) fmt.Fprintf(os.Stderr, "loadConfig error: %v\n", err)
return 1 os.Exit(1)
}
vspd, err := newVspd(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "newVspd error: %v\n", err)
os.Exit(1)
}
// Run until an exit code is returned.
os.Exit(vspd.run())
}
type vspd struct {
cfg *config
log slog.Logger
db *database.VspDatabase
dcrd rpc.DcrdConnect
wallets rpc.WalletConnect
}
// newVspd creates the essential resources required by vspd - a database, logger
// and RPC clients - then returns an instance of vspd which is ready to be run.
func newVspd(cfg *config) (*vspd, error) {
// Open database.
db, err := database.Open(cfg.dbPath, cfg.logger(" DB"), maxVoteChangeRecords)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
} }
log := cfg.logger("VSP") log := cfg.logger("VSP")
dbLog := cfg.logger(" DB")
apiLog := cfg.logger("API")
rpcLog := cfg.logger("RPC") rpcLog := cfg.logger("RPC")
// Show version at startup.
log.Criticalf("Version %s (Go version %s %s/%s)", version.String(), runtime.Version(),
runtime.GOOS, runtime.GOARCH)
if cfg.netParams == &mainNetParams &&
version.PreRelease != "" {
log.Warnf("")
log.Warnf("\tWARNING: This is a pre-release version of vspd which should not be used on mainnet.")
log.Warnf("")
}
if cfg.VspClosed {
log.Warnf("")
log.Warnf("\tWARNING: Config --vspclosed is set. This will prevent vspd from accepting new tickets.")
log.Warnf("")
}
defer log.Criticalf("Shutdown complete")
// Open database.
db, err := database.Open(cfg.dbPath, dbLog, maxVoteChangeRecords)
if err != nil {
log.Errorf("Database error: %v", err)
return 1
}
const writeBackup = true
defer db.Close(writeBackup)
// Create a context that is cancelled when a shutdown request is received
// through an interrupt signal.
shutdownCtx := shutdownListener(log)
// WaitGroup for services to signal when they have shutdown cleanly.
var shutdownWg sync.WaitGroup
db.WritePeriodicBackups(shutdownCtx, &shutdownWg, cfg.BackupInterval)
// Create RPC client for local dcrd instance (used for broadcasting and // Create RPC client for local dcrd instance (used for broadcasting and
// checking the status of fee transactions). // checking the status of fee transactions).
dcrd := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, cfg.DcrdHost, cfg.dcrdCert, cfg.netParams.Params, rpcLog) dcrd := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, cfg.DcrdHost, cfg.dcrdCert, cfg.netParams.Params, rpcLog)
defer dcrd.Close()
// Create RPC client for remote dcrwallet instance (used for voting). // Create RPC client for remote dcrwallet instances (used for voting).
wallets := rpc.SetupWallet(cfg.walletUsers, cfg.walletPasswords, cfg.walletHosts, cfg.walletCerts, cfg.netParams.Params, rpcLog) wallets := rpc.SetupWallet(cfg.walletUsers, cfg.walletPasswords, cfg.walletHosts, cfg.walletCerts, cfg.netParams.Params, rpcLog)
defer wallets.Close()
v := &vspd{
cfg: cfg,
log: log,
db: db,
dcrd: dcrd,
wallets: wallets,
}
return v, nil
}
// run starts all of vspds background services including the web server, and
// stops all started services when a shutdown is requested.
func (v *vspd) run() int {
v.log.Criticalf("Version %s (Go version %s %s/%s)", version.String(), runtime.Version(),
runtime.GOOS, runtime.GOARCH)
if v.cfg.netParams == &mainNetParams &&
version.PreRelease != "" {
v.log.Warnf("")
v.log.Warnf("\tWARNING: This is a pre-release version of vspd which should not be used on mainnet.")
v.log.Warnf("")
}
if v.cfg.VspClosed {
v.log.Warnf("")
v.log.Warnf("\tWARNING: Config --vspclosed is set. This will prevent vspd from accepting new tickets.")
v.log.Warnf("")
}
// Defer shutdown tasks.
defer v.log.Criticalf("Shutdown complete")
const writeBackup = true
defer v.db.Close(writeBackup)
defer v.dcrd.Close()
defer v.wallets.Close()
// Create a context that is cancelled when a shutdown request is received
// through an interrupt signal.
shutdownCtx := shutdownListener(v.log)
// WaitGroup for services to signal when they have shutdown cleanly.
var shutdownWg sync.WaitGroup
v.db.WritePeriodicBackups(shutdownCtx, &shutdownWg, v.cfg.BackupInterval)
// Ensure all data in database is present and up-to-date. // Ensure all data in database is present and up-to-date.
err = db.CheckIntegrity(dcrd) err := v.db.CheckIntegrity(v.dcrd)
if err != nil { if err != nil {
// vspd should still start if this fails, so just log an error. // vspd should still start if this fails, so just log an error.
log.Errorf("Could not check database integrity: %v", err) v.log.Errorf("Could not check database integrity: %v", err)
} }
// Run the block connected handler now to catch up with any blocks mined // Run the block connected handler now to catch up with any blocks mined
// while vspd was shut down. // while vspd was shut down.
blockConnected(dcrd, wallets, db, log) v.blockConnected()
// Run voting wallet consistency check now to ensure all wallets are up to // Run voting wallet consistency check now to ensure all wallets are up to
// date. // date.
checkWalletConsistency(dcrd, wallets, db, log) v.checkWalletConsistency()
// Run voting wallet consistency check periodically. // Run voting wallet consistency check periodically.
shutdownWg.Add(1) shutdownWg.Add(1)
@ -121,29 +143,29 @@ func run() int {
shutdownWg.Done() shutdownWg.Done()
return return
case <-time.After(consistencyInterval): case <-time.After(consistencyInterval):
checkWalletConsistency(dcrd, wallets, db, log) v.checkWalletConsistency()
} }
} }
}() }()
// Create and start webapi server. // Create and start webapi server.
apiCfg := webapi.Config{ apiCfg := webapi.Config{
VSPFee: cfg.VSPFee, VSPFee: v.cfg.VSPFee,
NetParams: cfg.netParams.Params, NetParams: v.cfg.netParams.Params,
BlockExplorerURL: cfg.netParams.blockExplorerURL, BlockExplorerURL: v.cfg.netParams.blockExplorerURL,
SupportEmail: cfg.SupportEmail, SupportEmail: v.cfg.SupportEmail,
VspClosed: cfg.VspClosed, VspClosed: v.cfg.VspClosed,
VspClosedMsg: cfg.VspClosedMsg, VspClosedMsg: v.cfg.VspClosedMsg,
AdminPass: cfg.AdminPass, AdminPass: v.cfg.AdminPass,
Debug: cfg.WebServerDebug, Debug: v.cfg.WebServerDebug,
Designation: cfg.Designation, Designation: v.cfg.Designation,
MaxVoteChangeRecords: maxVoteChangeRecords, MaxVoteChangeRecords: maxVoteChangeRecords,
VspdVersion: version.String(), VspdVersion: version.String(),
} }
err = webapi.Start(shutdownCtx, requestShutdown, &shutdownWg, cfg.Listen, db, apiLog, err = webapi.Start(shutdownCtx, requestShutdown, &shutdownWg, v.cfg.Listen, v.db, v.cfg.logger("API"),
dcrd, wallets, apiCfg) v.dcrd, v.wallets, apiCfg)
if err != nil { if err != nil {
log.Errorf("Failed to initialize webapi: %v", err) v.log.Errorf("Failed to initialize webapi: %v", err)
requestShutdown() requestShutdown()
shutdownWg.Wait() shutdownWg.Wait()
return 1 return 1
@ -159,14 +181,14 @@ func run() int {
shutdownWg.Done() shutdownWg.Done()
return return
case header := <-notifChan: case header := <-notifChan:
log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String()) v.log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String())
blockConnected(dcrd, wallets, db, log) v.blockConnected()
} }
} }
}() }()
// Attach notification listener to dcrd client. // Attach notification listener to dcrd client.
dcrd.BlockConnectedHandler(notifChan) v.dcrd.BlockConnectedHandler(notifChan)
// Loop forever attempting ensuring a dcrd connection is available, so // Loop forever attempting ensuring a dcrd connection is available, so
// notifications are received. // notifications are received.
@ -179,9 +201,9 @@ func run() int {
return return
case <-time.After(time.Second * 15): case <-time.After(time.Second * 15):
// Ensure dcrd client is still connected. // Ensure dcrd client is still connected.
_, _, err := dcrd.Client() _, _, err := v.dcrd.Client()
if err != nil { if err != nil {
log.Errorf("dcrd connect error: %v", err) v.log.Errorf("dcrd connect error: %v", err)
} }
} }
} }