This is an unfortunate workaround which is necessary because an instance of dcrwallet will only recognise a ticket as revoked if it has done the revoke itself. If another wallet broadcasts the revoke then the ticket will forever be reported as missed/expired. This causes problems for vspd when a wallet outside of the vspd deployment revokes a ticket (eg. a users ticketbuyer wallet). I'm happy to include this slightly dirty workaround because this should no longer be an issue when the auto-revoke work in DCP-0009 lands.
577 lines
17 KiB
Go
577 lines
17 KiB
Go
// Copyright (c) 2020-2021 The Decred developers
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package background
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/decred/dcrd/chaincfg/v3"
|
|
"github.com/decred/vspd/database"
|
|
"github.com/decred/vspd/rpc"
|
|
"github.com/jrick/wsrpc/v2"
|
|
)
|
|
|
|
var (
|
|
db *database.VspDatabase
|
|
dcrdRPC rpc.DcrdConnect
|
|
walletRPC rpc.WalletConnect
|
|
netParams *chaincfg.Params
|
|
notifierClosed chan struct{}
|
|
)
|
|
|
|
type NotificationHandler struct {
|
|
ShutdownWg *sync.WaitGroup
|
|
}
|
|
|
|
const (
|
|
// consistencyInterval is the time period between wallet consistency checks.
|
|
consistencyInterval = 30 * time.Minute
|
|
// requiredConfs is the number of confirmations required to consider a
|
|
// ticket purchase or a fee transaction to be final.
|
|
requiredConfs = 6
|
|
)
|
|
|
|
// Notify is called every time a block notification is received from dcrd.
|
|
// Notify is never called concurrently. Notify should not return an error
|
|
// because that will cause the client to close and no further notifications will
|
|
// be received until a new connection is established.
|
|
func (n *NotificationHandler) Notify(method string, params json.RawMessage) error {
|
|
n.ShutdownWg.Add(1)
|
|
defer n.ShutdownWg.Done()
|
|
|
|
if method != "blockconnected" {
|
|
return nil
|
|
}
|
|
|
|
header, err := rpc.ParseBlockConnectedNotification(params)
|
|
if err != nil {
|
|
log.Errorf("Failed to parse dcrd block notification: %v", err)
|
|
return nil
|
|
}
|
|
|
|
log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String())
|
|
|
|
blockConnected()
|
|
|
|
return nil
|
|
}
|
|
|
|
// blockConnected is called once when vspd starts up, and once each time a
|
|
// blockconnected notification is received from dcrd.
|
|
func blockConnected() {
|
|
|
|
const funcName = "blockConnected"
|
|
|
|
ctx := context.Background()
|
|
|
|
dcrdClient, _, err := dcrdRPC.Client(ctx, netParams)
|
|
if err != nil {
|
|
log.Errorf("%s: %v", funcName, err)
|
|
return
|
|
}
|
|
|
|
// Step 1/4: Update the database with any tickets which now have 6+
|
|
// confirmations.
|
|
|
|
unconfirmed, err := db.GetUnconfirmedTickets()
|
|
if err != nil {
|
|
log.Errorf("%s: db.GetUnconfirmedTickets error: %v", funcName, err)
|
|
}
|
|
|
|
for _, ticket := range unconfirmed {
|
|
tktTx, err := dcrdClient.GetRawTransaction(ticket.Hash)
|
|
if err != nil {
|
|
// ErrNoTxInfo here probably indicates a tx which was never mined
|
|
// and has been removed from the mempool. For example, a ticket
|
|
// purchase tx close to an sdiff change, or a ticket purchase tx
|
|
// which expired. Remove it from the db.
|
|
var e *wsrpc.Error
|
|
if errors.As(err, &e) && e.Code == rpc.ErrNoTxInfo {
|
|
log.Infof("%s: Removing unconfirmed ticket from db - no information available "+
|
|
"about transaction (ticketHash=%s)", funcName, ticket.Hash)
|
|
|
|
err = db.DeleteTicket(ticket)
|
|
if err != nil {
|
|
log.Errorf("%s: db.DeleteTicket error (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
}
|
|
|
|
// This will not error if an alternate signing address does not
|
|
// exist for ticket.
|
|
err = db.DeleteAltSignAddr(ticket.Hash)
|
|
if err != nil {
|
|
log.Errorf("%s: db.DeleteAltSignAddr error (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
}
|
|
} else {
|
|
log.Errorf("%s: dcrd.GetRawTransaction for ticket failed (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if tktTx.Confirmations >= requiredConfs {
|
|
ticket.PurchaseHeight = tktTx.BlockHeight
|
|
ticket.Confirmed = true
|
|
err = db.UpdateTicket(ticket)
|
|
if err != nil {
|
|
log.Errorf("%s: db.UpdateTicket error, failed to set ticket as confirmed (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
continue
|
|
}
|
|
|
|
log.Infof("%s: Ticket confirmed (ticketHash=%s)", funcName, ticket.Hash)
|
|
}
|
|
}
|
|
|
|
// Step 2/4: Broadcast fee tx for tickets which are confirmed.
|
|
|
|
pending, err := db.GetPendingFees()
|
|
if err != nil {
|
|
log.Errorf("%s: db.GetPendingFees error: %v", funcName, err)
|
|
}
|
|
|
|
for _, ticket := range pending {
|
|
err = dcrdClient.SendRawTransaction(ticket.FeeTxHex)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrd.SendRawTransaction for fee tx failed (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
ticket.FeeTxStatus = database.FeeError
|
|
} else {
|
|
log.Infof("%s: Fee tx broadcast for ticket (ticketHash=%s, feeHash=%s)",
|
|
funcName, ticket.Hash, ticket.FeeTxHash)
|
|
ticket.FeeTxStatus = database.FeeBroadcast
|
|
}
|
|
|
|
err = db.UpdateTicket(ticket)
|
|
if err != nil {
|
|
log.Errorf("%s: db.UpdateTicket error, failed to set fee tx as broadcast (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
}
|
|
}
|
|
|
|
// Step 3/4: Add tickets with confirmed fees to voting wallets.
|
|
|
|
unconfirmedFees, err := db.GetUnconfirmedFees()
|
|
if err != nil {
|
|
log.Errorf("%s: db.GetUnconfirmedFees error: %v", funcName, err)
|
|
}
|
|
|
|
walletClients, failedConnections := walletRPC.Clients(ctx, netParams)
|
|
if len(walletClients) == 0 {
|
|
log.Errorf("%s: Could not connect to any wallets", funcName)
|
|
return
|
|
}
|
|
if len(failedConnections) > 0 {
|
|
log.Errorf("%s: Failed to connect to %d wallet(s), proceeding with only %d",
|
|
funcName, len(failedConnections), len(walletClients))
|
|
}
|
|
|
|
for _, ticket := range unconfirmedFees {
|
|
feeTx, err := dcrdClient.GetRawTransaction(ticket.FeeTxHash)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrd.GetRawTransaction for fee tx failed (feeTxHash=%s, ticketHash=%s): %v",
|
|
funcName, ticket.FeeTxHash, ticket.Hash, err)
|
|
|
|
ticket.FeeTxStatus = database.FeeError
|
|
err = db.UpdateTicket(ticket)
|
|
if err != nil {
|
|
log.Errorf("%s: db.UpdateTicket error, failed to set fee tx status to error (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// If fee is confirmed, update the database and add ticket to voting
|
|
// wallets.
|
|
if feeTx.Confirmations >= requiredConfs {
|
|
// We no longer need the hex once the tx is confirmed on-chain.
|
|
ticket.FeeTxHex = ""
|
|
ticket.FeeTxStatus = database.FeeConfirmed
|
|
err = db.UpdateTicket(ticket)
|
|
if err != nil {
|
|
log.Errorf("%s: db.UpdateTicket error, failed to set fee tx as confirmed (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
continue
|
|
}
|
|
log.Infof("%s: Fee tx confirmed (ticketHash=%s)", funcName, ticket.Hash)
|
|
|
|
// Add ticket to the voting wallet.
|
|
|
|
rawTicket, err := dcrdClient.GetRawTransaction(ticket.Hash)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrd.GetRawTransaction for ticket failed (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
continue
|
|
}
|
|
for _, walletClient := range walletClients {
|
|
err = walletClient.AddTicketForVoting(ticket.VotingWIF, rawTicket.BlockHash, rawTicket.Hex)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrwallet.AddTicketForVoting error (wallet=%s, ticketHash=%s): %v",
|
|
funcName, walletClient.String(), ticket.Hash, err)
|
|
continue
|
|
}
|
|
|
|
// Set consensus vote choices on voting wallets.
|
|
for agenda, choice := range ticket.VoteChoices {
|
|
err = walletClient.SetVoteChoice(agenda, choice, ticket.Hash)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "no agenda with ID") {
|
|
log.Warnf("%s: Removing invalid agenda from ticket vote choices (ticketHash=%s, agenda=%s)",
|
|
funcName, ticket.Hash, agenda)
|
|
delete(ticket.VoteChoices, agenda)
|
|
err = db.UpdateTicket(ticket)
|
|
if err != nil {
|
|
log.Errorf("%s: db.UpdateTicket error, failed to remove invalid agenda (ticketHash=%s): %v",
|
|
funcName, ticket.Hash, err)
|
|
}
|
|
} else {
|
|
log.Errorf("%s: dcrwallet.SetVoteChoice error (wallet=%s, ticketHash=%s): %v",
|
|
funcName, walletClient.String(), ticket.Hash, err)
|
|
}
|
|
}
|
|
}
|
|
log.Infof("%s: Ticket added to voting wallet (wallet=%s, ticketHash=%s)",
|
|
funcName, walletClient.String(), ticket.Hash)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 4/4: Set ticket outcome in database if any tickets are voted/revoked.
|
|
|
|
// Ticket status needs to be checked on every wallet. This is because only
|
|
// one of the voting wallets will actually succeed in voting/revoking
|
|
// tickets (the others will get errors like "tx already exists"). Only the
|
|
// successful wallet will have the most up-to-date ticket status, the others
|
|
// will be outdated.
|
|
for _, walletClient := range walletClients {
|
|
votableTickets, err := db.GetVotableTickets()
|
|
if err != nil {
|
|
log.Errorf("%s: db.GetVotableTickets failed: %v", funcName, err)
|
|
continue
|
|
}
|
|
|
|
// If the database has no votable tickets, there is nothing more to do
|
|
if len(votableTickets) == 0 {
|
|
break
|
|
}
|
|
|
|
// Find the oldest block height from confirmed tickets.
|
|
oldestHeight := findOldestHeight(votableTickets)
|
|
|
|
ticketInfo, err := walletClient.TicketInfo(oldestHeight)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrwallet.TicketInfo failed (startHeight=%d, wallet=%s): %v",
|
|
funcName, oldestHeight, walletClient.String(), err)
|
|
continue
|
|
}
|
|
|
|
for _, dbTicket := range votableTickets {
|
|
tInfo, ok := ticketInfo[dbTicket.Hash]
|
|
if !ok {
|
|
log.Warnf("%s: TicketInfo response did not include expected ticket (wallet=%s, ticketHash=%s)",
|
|
funcName, walletClient.String(), dbTicket.Hash)
|
|
continue
|
|
}
|
|
|
|
switch tInfo.Status {
|
|
case "missed", "expired", "revoked":
|
|
dbTicket.Outcome = database.Revoked
|
|
case "voted":
|
|
dbTicket.Outcome = database.Voted
|
|
default:
|
|
// Skip to next ticket.
|
|
continue
|
|
}
|
|
|
|
err = db.UpdateTicket(dbTicket)
|
|
if err != nil {
|
|
log.Errorf("%s: db.UpdateTicket error, failed to set ticket outcome (ticketHash=%s): %v",
|
|
funcName, dbTicket.Hash, err)
|
|
continue
|
|
}
|
|
|
|
log.Infof("%s: Ticket no longer votable: outcome=%s, ticketHash=%s", funcName,
|
|
dbTicket.Outcome, dbTicket.Hash)
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (n *NotificationHandler) Close() error {
|
|
close(notifierClosed)
|
|
return nil
|
|
}
|
|
|
|
func connectNotifier(shutdownCtx context.Context, dcrdWithNotifs rpc.DcrdConnect) error {
|
|
notifierClosed = make(chan struct{})
|
|
|
|
dcrdClient, _, err := dcrdWithNotifs.Client(shutdownCtx, netParams)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = dcrdClient.NotifyBlocks()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("Subscribed for dcrd block notifications")
|
|
|
|
// Wait until context is done (vspd is shutting down), or until the
|
|
// notifier is closed.
|
|
select {
|
|
case <-shutdownCtx.Done():
|
|
// A shutdown signal has been received - close the client with the
|
|
// notification handler to prevent further notifications from being
|
|
// received.
|
|
dcrdWithNotifs.Close()
|
|
return nil
|
|
case <-notifierClosed:
|
|
log.Warnf("dcrd notifier closed")
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func Start(shutdownCtx context.Context, wg *sync.WaitGroup, vdb *database.VspDatabase, drpc rpc.DcrdConnect,
|
|
dcrdWithNotif rpc.DcrdConnect, wrpc rpc.WalletConnect, p *chaincfg.Params) {
|
|
|
|
db = vdb
|
|
dcrdRPC = drpc
|
|
walletRPC = wrpc
|
|
netParams = p
|
|
|
|
// Run the block connected handler now to catch up with any blocks mined
|
|
// while vspd was shut down.
|
|
blockConnected()
|
|
|
|
// Run voting wallet consistency check now to ensure all wallets are up to
|
|
// date.
|
|
checkWalletConsistency()
|
|
|
|
// Run voting wallet consistency check periodically.
|
|
wg.Add(1)
|
|
go func() {
|
|
ticker := time.NewTicker(consistencyInterval)
|
|
consistencyLoop:
|
|
for {
|
|
select {
|
|
case <-shutdownCtx.Done():
|
|
ticker.Stop()
|
|
break consistencyLoop
|
|
case <-ticker.C:
|
|
checkWalletConsistency()
|
|
}
|
|
}
|
|
log.Debugf("Consistency checker stopped")
|
|
wg.Done()
|
|
}()
|
|
|
|
// Loop forever attempting to create a connection to the dcrd server for
|
|
// notifications.
|
|
wg.Add(1)
|
|
go func() {
|
|
notifierLoop:
|
|
for {
|
|
err := connectNotifier(shutdownCtx, dcrdWithNotif)
|
|
if err != nil {
|
|
log.Errorf("dcrd connect error: %v", err)
|
|
}
|
|
|
|
// If context is done (vspd is shutting down), return,
|
|
// otherwise wait 15 seconds and try to reconnect.
|
|
select {
|
|
case <-shutdownCtx.Done():
|
|
break notifierLoop
|
|
case <-time.After(15 * time.Second):
|
|
}
|
|
|
|
}
|
|
log.Debugf("Notification connector stopped")
|
|
wg.Done()
|
|
}()
|
|
}
|
|
|
|
// checkWalletConsistency will retrieve all votable tickets from the database
|
|
// and ensure they are all added to voting wallets with the correct vote
|
|
// choices.
|
|
func checkWalletConsistency() {
|
|
|
|
const funcName = "checkWalletConsistency"
|
|
|
|
log.Info("Checking voting wallet consistency")
|
|
|
|
ctx := context.Background()
|
|
|
|
dcrdClient, _, err := dcrdRPC.Client(ctx, netParams)
|
|
if err != nil {
|
|
log.Errorf("%s: %v", funcName, err)
|
|
return
|
|
}
|
|
|
|
walletClients, failedConnections := walletRPC.Clients(ctx, netParams)
|
|
if len(walletClients) == 0 {
|
|
log.Errorf("%s: Could not connect to any wallets", funcName)
|
|
return
|
|
}
|
|
if len(failedConnections) > 0 {
|
|
log.Errorf("%s: Failed to connect to %d wallet(s), proceeding with only %d",
|
|
funcName, len(failedConnections), len(walletClients))
|
|
}
|
|
|
|
// Step 1/2: Check all tickets are added to all voting wallets.
|
|
|
|
votableTickets, err := db.GetVotableTickets()
|
|
if err != nil {
|
|
log.Errorf("%s: db.GetVotableTickets failed: %v", funcName, err)
|
|
return
|
|
}
|
|
|
|
// If the database has no votable tickets, there is nothing more to do
|
|
if len(votableTickets) == 0 {
|
|
return
|
|
}
|
|
|
|
// Find the oldest block height from confirmed tickets.
|
|
oldestHeight := findOldestHeight(votableTickets)
|
|
|
|
// Iterate over each wallet and add any missing tickets.
|
|
for _, walletClient := range walletClients {
|
|
// Get all tickets the wallet is aware of.
|
|
walletTickets, err := walletClient.TicketInfo(oldestHeight)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrwallet.TicketInfo failed (startHeight=%d, wallet=%s): %v",
|
|
funcName, oldestHeight, walletClient.String(), err)
|
|
continue
|
|
}
|
|
|
|
// If missing tickets are added, set a flag and keep track of the
|
|
// earliest purchase height.
|
|
var added bool
|
|
var minHeight int64
|
|
for _, dbTicket := range votableTickets {
|
|
// If wallet already knows this ticket, skip to the next one.
|
|
_, exists := walletTickets[dbTicket.Hash]
|
|
if exists {
|
|
continue
|
|
}
|
|
|
|
log.Debugf("%s: Adding missing ticket (wallet=%s, ticketHash=%s)",
|
|
funcName, walletClient.String(), dbTicket.Hash)
|
|
|
|
rawTicket, err := dcrdClient.GetRawTransaction(dbTicket.Hash)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrd.GetRawTransaction error: %v", funcName, err)
|
|
continue
|
|
}
|
|
|
|
err = walletClient.AddTicketForVoting(dbTicket.VotingWIF, rawTicket.BlockHash, rawTicket.Hex)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrwallet.AddTicketForVoting error (wallet=%s, ticketHash=%s): %v",
|
|
funcName, walletClient.String(), dbTicket.Hash, err)
|
|
continue
|
|
}
|
|
|
|
added = true
|
|
if minHeight == 0 || minHeight > rawTicket.BlockHeight {
|
|
minHeight = rawTicket.BlockHeight
|
|
}
|
|
}
|
|
|
|
// Perform a rescan if any missing tickets were added to this wallet.
|
|
if added {
|
|
log.Infof("%s: Performing a rescan on wallet %s (fromHeight=%d)",
|
|
funcName, walletClient.String(), minHeight)
|
|
err = walletClient.RescanFrom(minHeight)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrwallet.RescanFrom failed (wallet=%s): %v",
|
|
funcName, walletClient.String(), err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 2/2: Ensure vote choices are set correctly for all tickets on
|
|
// all wallets.
|
|
|
|
for _, walletClient := range walletClients {
|
|
// Get all tickets the wallet is aware of.
|
|
walletTickets, err := walletClient.TicketInfo(oldestHeight)
|
|
if err != nil {
|
|
log.Errorf("%s: dcrwallet.TicketInfo failed (startHeight=%d, wallet=%s): %v",
|
|
funcName, oldestHeight, walletClient.String(), err)
|
|
continue
|
|
}
|
|
|
|
for _, dbTicket := range votableTickets {
|
|
// All tickets should be added to all wallets at this point, so log
|
|
// a warning if any are still missing.
|
|
walletTicket, exists := walletTickets[dbTicket.Hash]
|
|
if !exists {
|
|
log.Warnf("%s: Ticket missing from voting wallet (wallet=%s, ticketHash=%s)",
|
|
funcName, walletClient.String(), dbTicket.Hash)
|
|
continue
|
|
}
|
|
|
|
// Check if consensus vote choices match
|
|
for dbAgenda, dbChoice := range dbTicket.VoteChoices {
|
|
match := false
|
|
for _, walletChoice := range walletTicket.Choices {
|
|
if walletChoice.AgendaID == dbAgenda && walletChoice.ChoiceID == dbChoice {
|
|
match = true
|
|
}
|
|
}
|
|
|
|
// Skip to next agenda if db and wallet are matching.
|
|
if match {
|
|
continue
|
|
}
|
|
|
|
log.Debugf("%s: Updating incorrect consensus vote choices (wallet=%s, agenda=%s, ticketHash=%s)",
|
|
funcName, walletClient.String(), dbAgenda, dbTicket.Hash)
|
|
|
|
// If db and wallet are not matching, update wallet with correct
|
|
// choice.
|
|
err = walletClient.SetVoteChoice(dbAgenda, dbChoice, dbTicket.Hash)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "no agenda with ID") {
|
|
log.Warnf("%s: Removing invalid agenda from ticket vote choices (ticketHash=%s, agenda=%s)",
|
|
funcName, dbTicket.Hash, dbAgenda)
|
|
delete(dbTicket.VoteChoices, dbAgenda)
|
|
err = db.UpdateTicket(dbTicket)
|
|
if err != nil {
|
|
log.Errorf("%s: db.UpdateTicket error, failed to remove invalid agenda (ticketHash=%s): %v",
|
|
funcName, dbTicket.Hash, err)
|
|
}
|
|
} else {
|
|
log.Errorf("%s: dcrwallet.SetVoteChoice error (wallet=%s, ticketHash=%s): %v",
|
|
funcName, walletClient.String(), dbTicket.Hash, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func findOldestHeight(tickets []database.Ticket) int64 {
|
|
var oldestHeight int64
|
|
for _, ticket := range tickets {
|
|
// skip unconfirmed tickets
|
|
if ticket.PurchaseHeight == 0 {
|
|
continue
|
|
}
|
|
if oldestHeight == 0 || oldestHeight > ticket.PurchaseHeight {
|
|
oldestHeight = ticket.PurchaseHeight
|
|
}
|
|
}
|
|
return oldestHeight
|
|
}
|