diff --git a/background/background.go b/background/background.go index 14d0660..1aef6d8 100644 --- a/background/background.go +++ b/background/background.go @@ -5,72 +5,26 @@ package background import ( - "context" - "encoding/json" "errors" "strings" - "sync" - "time" "github.com/decred/vspd/database" "github.com/decred/vspd/rpc" "github.com/jrick/wsrpc/v2" ) -var ( - db *database.VspDatabase - dcrdRPC rpc.DcrdConnect - walletRPC rpc.WalletConnect - notifierClosed chan struct{} -) - -type NotificationHandler struct { - ShutdownWg *sync.WaitGroup -} - const ( - // consistencyInterval is the time period between wallet consistency checks. - consistencyInterval = 30 * time.Minute + // requiredConfs is the number of confirmations required to consider a // ticket purchase or a fee transaction to be final. requiredConfs = 6 ) -// Notify is called every time a block notification is received from dcrd. -// Notify is never called concurrently. Notify should not return an error -// because that will cause the client to close and no further notifications will -// be received until a new connection is established. -func (n *NotificationHandler) Notify(method string, params json.RawMessage) error { - n.ShutdownWg.Add(1) - defer n.ShutdownWg.Done() - - if method != "blockconnected" { - return nil - } - - header, err := rpc.ParseBlockConnectedNotification(params) - if err != nil { - log.Errorf("Failed to parse dcrd block notification: %v", err) - return nil - } - - log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String()) - - blockConnected() - - return nil -} - -func (n *NotificationHandler) Close() error { - close(notifierClosed) - return nil -} - -// blockConnected is called once when vspd starts up, and once each time a +// BlockConnected is called once when vspd starts up, and once each time a // blockconnected notification is received from dcrd. -func blockConnected() { +func BlockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *database.VspDatabase) { - const funcName = "blockConnected" + const funcName = "BlockConnected" dcrdClient, _, err := dcrdRPC.Client() if err != nil { @@ -326,98 +280,12 @@ func blockConnected() { } -func connectNotifier(shutdownCtx context.Context, dcrdWithNotifs rpc.DcrdConnect) error { - notifierClosed = make(chan struct{}) - - dcrdClient, _, err := dcrdWithNotifs.Client() - if err != nil { - return err - } - - err = dcrdClient.NotifyBlocks() - if err != nil { - return err - } - - log.Info("Subscribed for dcrd block notifications") - - // Wait until context is done (vspd is shutting down), or until the - // notifier is closed. - select { - case <-shutdownCtx.Done(): - // A shutdown signal has been received - close the client with the - // notification handler to prevent further notifications from being - // received. - dcrdWithNotifs.Close() - return nil - case <-notifierClosed: - log.Warnf("dcrd notifier closed") - return nil - } -} - -func Start(shutdownCtx context.Context, wg *sync.WaitGroup, vdb *database.VspDatabase, drpc rpc.DcrdConnect, - dcrdWithNotif rpc.DcrdConnect, wrpc rpc.WalletConnect) { - - db = vdb - dcrdRPC = drpc - walletRPC = wrpc - - // Run the block connected handler now to catch up with any blocks mined - // while vspd was shut down. - blockConnected() - - // Run voting wallet consistency check now to ensure all wallets are up to - // date. - checkWalletConsistency() - - // Run voting wallet consistency check periodically. - wg.Add(1) - go func() { - consistencyLoop: - for { - select { - case <-shutdownCtx.Done(): - break consistencyLoop - case <-time.After(consistencyInterval): - checkWalletConsistency() - } - } - log.Debugf("Consistency checker stopped") - wg.Done() - }() - - // Loop forever attempting to create a connection to the dcrd server for - // notifications. - wg.Add(1) - go func() { - notifierLoop: - for { - err := connectNotifier(shutdownCtx, dcrdWithNotif) - if err != nil { - log.Errorf("dcrd connect error: %v", err) - } - - // If context is done (vspd is shutting down), return, - // otherwise wait 15 seconds and try to reconnect. - select { - case <-shutdownCtx.Done(): - break notifierLoop - case <-time.After(15 * time.Second): - } - - } - log.Debugf("Notification connector stopped") - wg.Done() - }() -} - -// checkWalletConsistency will retrieve all votable tickets from the database +// CheckWalletConsistency will retrieve all votable tickets from the database // and ensure they are all added to voting wallets with the correct vote // choices. -func checkWalletConsistency() { +func CheckWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *database.VspDatabase) { - const funcName = "checkWalletConsistency" + const funcName = "CheckWalletConsistency" log.Info("Checking voting wallet consistency") diff --git a/rpc/client.go b/rpc/client.go index 1f34d80..d3ac789 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -37,7 +37,7 @@ type client struct { notifier wsrpc.Notifier } -func setup(user, pass, addr string, cert []byte, n wsrpc.Notifier) *client { +func setup(user, pass, addr string, cert []byte) *client { // Create TLS options. pool := x509.NewCertPool() @@ -63,7 +63,7 @@ func setup(user, pass, addr string, cert []byte, n wsrpc.Notifier) *client { var mu sync.Mutex var c *wsrpc.Client fullAddr := "wss://" + addr + "/ws" - return &client{&mu, c, fullAddr, tlsOpt, authOpt, n} + return &client{&mu, c, fullAddr, tlsOpt, authOpt, nil} } func (c *client) Close() { diff --git a/rpc/dcrd.go b/rpc/dcrd.go index 224b07a..b183bd1 100644 --- a/rpc/dcrd.go +++ b/rpc/dcrd.go @@ -5,10 +5,8 @@ package rpc import ( - "bytes" "context" "encoding/hex" - "encoding/json" "errors" "fmt" @@ -45,15 +43,25 @@ type DcrdConnect struct { params *chaincfg.Params } -func SetupDcrd(user, pass, addr string, cert []byte, n wsrpc.Notifier, params *chaincfg.Params) DcrdConnect { +func SetupDcrd(user, pass, addr string, cert []byte, params *chaincfg.Params) DcrdConnect { return DcrdConnect{ - client: setup(user, pass, addr, cert, n), + client: setup(user, pass, addr, cert), params: params, } } +// BlockConnectedHandler attaches a blockconnected notification handler to the +// dcrd client. Every time a notification is received, the header of the +// connected block is sent to the provided channel. +func (d *DcrdConnect) BlockConnectedHandler(blockConnected chan *wire.BlockHeader) { + d.client.notifier = &blockConnectedHandler{ + blockConnected: blockConnected, + } +} + func (d *DcrdConnect) Close() { d.client.Close() + log.Debug("dcrd client closed") } // Client creates a new DcrdRPC client instance. Returns an error if dialing @@ -62,7 +70,7 @@ func (d *DcrdConnect) Client() (*DcrdRPC, string, error) { ctx := context.TODO() c, newConnection, err := d.client.dial(ctx) if err != nil { - return nil, d.client.addr, fmt.Errorf("dcrd connection error: %w", err) + return nil, d.client.addr, fmt.Errorf("dcrd dial error: %w", err) } // If this is a reused connection, we don't need to validate the dcrd config @@ -116,6 +124,16 @@ func (d *DcrdConnect) Client() (*DcrdRPC, string, error) { return nil, d.client.addr, errors.New("dcrd does not have transaction index enabled (--txindex)") } + // Request blockconnected notifications. + if d.client.notifier != nil { + err = c.Call(ctx, "notifyblocks", nil) + if err != nil { + return nil, d.client.addr, fmt.Errorf("notifyblocks failed: %w", err) + } + } + + log.Debugf("Connected to dcrd") + return &DcrdRPC{c, ctx}, d.client.addr, nil } @@ -210,26 +228,3 @@ func (c *DcrdRPC) ExistsLiveTicket(ticketHash string) (bool, error) { return bitset.Bytes(existsBytes).Get(0), nil } - -// ParseBlockConnectedNotification extracts the block header from a -// blockconnected JSON-RPC notification. -func ParseBlockConnectedNotification(params json.RawMessage) (*wire.BlockHeader, error) { - var notif []string - err := json.Unmarshal(params, ¬if) - if err != nil { - return nil, fmt.Errorf("json unmarshal error: %w", err) - } - - if len(notif) == 0 { - return nil, errors.New("notification is empty") - } - - rawHeader := notif[0] - var header wire.BlockHeader - err = header.Deserialize(hex.NewDecoder(bytes.NewReader([]byte(rawHeader)))) - if err != nil { - return nil, fmt.Errorf("error creating block header from bytes: %w", err) - } - - return &header, nil -} diff --git a/rpc/dcrwallet.go b/rpc/dcrwallet.go index f423e6c..8e7f5b9 100644 --- a/rpc/dcrwallet.go +++ b/rpc/dcrwallet.go @@ -34,7 +34,7 @@ func SetupWallet(user, pass, addrs []string, cert [][]byte, params *chaincfg.Par clients := make([]*client, len(addrs)) for i := 0; i < len(addrs); i++ { - clients[i] = setup(user[i], pass[i], addrs[i], cert[i], nil) + clients[i] = setup(user[i], pass[i], addrs[i], cert[i]) } return WalletConnect{ @@ -47,6 +47,7 @@ func (w *WalletConnect) Close() { for _, client := range w.clients { client.Close() } + log.Debug("dcrwallet clients closed") } // Clients loops over each wallet and tries to establish a connection. It @@ -61,7 +62,7 @@ func (w *WalletConnect) Clients() ([]*WalletRPC, []string) { c, newConnection, err := connect.dial(ctx) if err != nil { - log.Errorf("dcrwallet connection error: %v", err) + log.Errorf("dcrwallet dial error: %v", err) failedConnections = append(failedConnections, connect.addr) continue } diff --git a/rpc/notifs.go b/rpc/notifs.go new file mode 100644 index 0000000..70f5394 --- /dev/null +++ b/rpc/notifs.go @@ -0,0 +1,61 @@ +package rpc + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + + "github.com/decred/dcrd/wire" +) + +type blockConnectedHandler struct { + blockConnected chan *wire.BlockHeader +} + +// Notify is called every time a notification is received from dcrd client. +// A wsrpc.Client will never call Notify concurrently. Notify should not return +// an error because that will cause the client to close and no further +// notifications will be received until a new connection is established. +func (n *blockConnectedHandler) Notify(method string, msg json.RawMessage) error { + if method != "blockconnected" { + return nil + } + + header, err := parseBlockConnected(msg) + if err != nil { + log.Errorf("Failed to parse dcrd block notification: %v", err) + return nil + } + + n.blockConnected <- header + + return nil +} + +func (n *blockConnectedHandler) Close() error { + return nil +} + +// parseBlockConnected extracts the block header from a +// blockconnected JSON-RPC notification. +func parseBlockConnected(msg json.RawMessage) (*wire.BlockHeader, error) { + var notif []string + err := json.Unmarshal(msg, ¬if) + if err != nil { + return nil, fmt.Errorf("json unmarshal error: %w", err) + } + + if len(notif) == 0 { + return nil, errors.New("notification is empty") + } + + var header wire.BlockHeader + err = header.Deserialize(hex.NewDecoder(bytes.NewReader([]byte(notif[0])))) + if err != nil { + return nil, fmt.Errorf("error creating block header from bytes: %w", err) + } + + return &header, nil +} diff --git a/vspd.go b/vspd.go index eddc70b..367750a 100644 --- a/vspd.go +++ b/vspd.go @@ -10,7 +10,9 @@ import ( "os" "runtime" "sync" + "time" + "github.com/decred/dcrd/wire" "github.com/decred/vspd/background" "github.com/decred/vspd/database" "github.com/decred/vspd/rpc" @@ -24,6 +26,9 @@ import ( // the database is deleted. const maxVoteChangeRecords = 10 +// consistencyInterval is the time period between wallet consistency checks. +const consistencyInterval = 30 * time.Minute + func main() { // Create a context that is cancelled when a shutdown request is received // through an interrupt signal. @@ -81,13 +86,51 @@ func run(shutdownCtx context.Context) int { // Create RPC client for local dcrd instance (used for broadcasting and // checking the status of fee transactions). - dcrd := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, cfg.DcrdHost, cfg.dcrdCert, nil, cfg.netParams.Params) + dcrd := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, cfg.DcrdHost, cfg.dcrdCert, cfg.netParams.Params) defer dcrd.Close() // Create RPC client for remote dcrwallet instance (used for voting). wallets := rpc.SetupWallet(cfg.walletUsers, cfg.walletPasswords, cfg.walletHosts, cfg.walletCerts, cfg.netParams.Params) defer wallets.Close() + // Create a channel to receive blockConnected notifications from dcrd. + notifChan := make(chan *wire.BlockHeader) + shutdownWg.Add(1) + go func() { + for { + select { + case <-shutdownCtx.Done(): + shutdownWg.Done() + return + case header := <-notifChan: + log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String()) + background.BlockConnected(dcrd, wallets, db) + } + } + }() + + // Attach notification listener to dcrd client. + dcrd.BlockConnectedHandler(notifChan) + + // Loop forever attempting ensuring a dcrd connection is available, so + // notifications are received. + shutdownWg.Add(1) + go func() { + for { + select { + case <-shutdownCtx.Done(): + shutdownWg.Done() + return + case <-time.After(time.Second * 15): + // Ensure dcrd client is still connected. + _, _, err := dcrd.Client() + if err != nil { + log.Errorf("dcrd connect error: %v", err) + } + } + } + }() + // Ensure all data in database is present and up-to-date. err = db.CheckIntegrity(dcrd) if err != nil { @@ -95,6 +138,28 @@ func run(shutdownCtx context.Context) int { log.Errorf("Could not check database integrity: %v", err) } + // Run the block connected handler now to catch up with any blocks mined + // while vspd was shut down. + background.BlockConnected(dcrd, wallets, db) + + // Run voting wallet consistency check now to ensure all wallets are up to + // date. + background.CheckWalletConsistency(dcrd, wallets, db) + + // Run voting wallet consistency check periodically. + shutdownWg.Add(1) + go func() { + for { + select { + case <-shutdownCtx.Done(): + shutdownWg.Done() + return + case <-time.After(consistencyInterval): + background.CheckWalletConsistency(dcrd, wallets, db) + } + } + }() + // Create and start webapi server. apiCfg := webapi.Config{ VSPFee: cfg.VSPFee, @@ -118,16 +183,6 @@ func run(shutdownCtx context.Context) int { return 1 } - // Create a dcrd client with a blockconnected notification handler. - notifHandler := background.NotificationHandler{ShutdownWg: &shutdownWg} - dcrdWithNotifs := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, - cfg.DcrdHost, cfg.dcrdCert, ¬ifHandler, cfg.netParams.Params) - defer dcrdWithNotifs.Close() - - // Start background process which will continually attempt to reconnect to - // dcrd if the connection drops. - background.Start(shutdownCtx, &shutdownWg, db, dcrd, dcrdWithNotifs, wallets) - // Wait for shutdown tasks to complete before running deferred tasks and // returning. shutdownWg.Wait()