From 7f72caafe7688a5458f09d5c2cedcbb824a6aa7a Mon Sep 17 00:00:00 2001 From: jholdstock Date: Wed, 8 Jun 2022 10:26:54 +0100 Subject: [PATCH] Remove global vars from background.go While removing the globals from background.go it also made sense to clean up the use of dcrd RPC clients. Previously two seperate clients were maintained, one for making RPC calls and one for receiving notifications. These clients have been unified. --- background/background.go | 146 ++------------------------------------- rpc/client.go | 4 +- rpc/dcrd.go | 51 ++++++-------- rpc/dcrwallet.go | 5 +- rpc/notifs.go | 61 ++++++++++++++++ vspd.go | 77 ++++++++++++++++++--- 6 files changed, 162 insertions(+), 182 deletions(-) create mode 100644 rpc/notifs.go diff --git a/background/background.go b/background/background.go index 14d0660..1aef6d8 100644 --- a/background/background.go +++ b/background/background.go @@ -5,72 +5,26 @@ package background import ( - "context" - "encoding/json" "errors" "strings" - "sync" - "time" "github.com/decred/vspd/database" "github.com/decred/vspd/rpc" "github.com/jrick/wsrpc/v2" ) -var ( - db *database.VspDatabase - dcrdRPC rpc.DcrdConnect - walletRPC rpc.WalletConnect - notifierClosed chan struct{} -) - -type NotificationHandler struct { - ShutdownWg *sync.WaitGroup -} - const ( - // consistencyInterval is the time period between wallet consistency checks. - consistencyInterval = 30 * time.Minute + // requiredConfs is the number of confirmations required to consider a // ticket purchase or a fee transaction to be final. requiredConfs = 6 ) -// Notify is called every time a block notification is received from dcrd. -// Notify is never called concurrently. Notify should not return an error -// because that will cause the client to close and no further notifications will -// be received until a new connection is established. -func (n *NotificationHandler) Notify(method string, params json.RawMessage) error { - n.ShutdownWg.Add(1) - defer n.ShutdownWg.Done() - - if method != "blockconnected" { - return nil - } - - header, err := rpc.ParseBlockConnectedNotification(params) - if err != nil { - log.Errorf("Failed to parse dcrd block notification: %v", err) - return nil - } - - log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String()) - - blockConnected() - - return nil -} - -func (n *NotificationHandler) Close() error { - close(notifierClosed) - return nil -} - -// blockConnected is called once when vspd starts up, and once each time a +// BlockConnected is called once when vspd starts up, and once each time a // blockconnected notification is received from dcrd. -func blockConnected() { +func BlockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *database.VspDatabase) { - const funcName = "blockConnected" + const funcName = "BlockConnected" dcrdClient, _, err := dcrdRPC.Client() if err != nil { @@ -326,98 +280,12 @@ func blockConnected() { } -func connectNotifier(shutdownCtx context.Context, dcrdWithNotifs rpc.DcrdConnect) error { - notifierClosed = make(chan struct{}) - - dcrdClient, _, err := dcrdWithNotifs.Client() - if err != nil { - return err - } - - err = dcrdClient.NotifyBlocks() - if err != nil { - return err - } - - log.Info("Subscribed for dcrd block notifications") - - // Wait until context is done (vspd is shutting down), or until the - // notifier is closed. - select { - case <-shutdownCtx.Done(): - // A shutdown signal has been received - close the client with the - // notification handler to prevent further notifications from being - // received. - dcrdWithNotifs.Close() - return nil - case <-notifierClosed: - log.Warnf("dcrd notifier closed") - return nil - } -} - -func Start(shutdownCtx context.Context, wg *sync.WaitGroup, vdb *database.VspDatabase, drpc rpc.DcrdConnect, - dcrdWithNotif rpc.DcrdConnect, wrpc rpc.WalletConnect) { - - db = vdb - dcrdRPC = drpc - walletRPC = wrpc - - // Run the block connected handler now to catch up with any blocks mined - // while vspd was shut down. - blockConnected() - - // Run voting wallet consistency check now to ensure all wallets are up to - // date. - checkWalletConsistency() - - // Run voting wallet consistency check periodically. - wg.Add(1) - go func() { - consistencyLoop: - for { - select { - case <-shutdownCtx.Done(): - break consistencyLoop - case <-time.After(consistencyInterval): - checkWalletConsistency() - } - } - log.Debugf("Consistency checker stopped") - wg.Done() - }() - - // Loop forever attempting to create a connection to the dcrd server for - // notifications. - wg.Add(1) - go func() { - notifierLoop: - for { - err := connectNotifier(shutdownCtx, dcrdWithNotif) - if err != nil { - log.Errorf("dcrd connect error: %v", err) - } - - // If context is done (vspd is shutting down), return, - // otherwise wait 15 seconds and try to reconnect. - select { - case <-shutdownCtx.Done(): - break notifierLoop - case <-time.After(15 * time.Second): - } - - } - log.Debugf("Notification connector stopped") - wg.Done() - }() -} - -// checkWalletConsistency will retrieve all votable tickets from the database +// CheckWalletConsistency will retrieve all votable tickets from the database // and ensure they are all added to voting wallets with the correct vote // choices. -func checkWalletConsistency() { +func CheckWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *database.VspDatabase) { - const funcName = "checkWalletConsistency" + const funcName = "CheckWalletConsistency" log.Info("Checking voting wallet consistency") diff --git a/rpc/client.go b/rpc/client.go index 1f34d80..d3ac789 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -37,7 +37,7 @@ type client struct { notifier wsrpc.Notifier } -func setup(user, pass, addr string, cert []byte, n wsrpc.Notifier) *client { +func setup(user, pass, addr string, cert []byte) *client { // Create TLS options. pool := x509.NewCertPool() @@ -63,7 +63,7 @@ func setup(user, pass, addr string, cert []byte, n wsrpc.Notifier) *client { var mu sync.Mutex var c *wsrpc.Client fullAddr := "wss://" + addr + "/ws" - return &client{&mu, c, fullAddr, tlsOpt, authOpt, n} + return &client{&mu, c, fullAddr, tlsOpt, authOpt, nil} } func (c *client) Close() { diff --git a/rpc/dcrd.go b/rpc/dcrd.go index 224b07a..b183bd1 100644 --- a/rpc/dcrd.go +++ b/rpc/dcrd.go @@ -5,10 +5,8 @@ package rpc import ( - "bytes" "context" "encoding/hex" - "encoding/json" "errors" "fmt" @@ -45,15 +43,25 @@ type DcrdConnect struct { params *chaincfg.Params } -func SetupDcrd(user, pass, addr string, cert []byte, n wsrpc.Notifier, params *chaincfg.Params) DcrdConnect { +func SetupDcrd(user, pass, addr string, cert []byte, params *chaincfg.Params) DcrdConnect { return DcrdConnect{ - client: setup(user, pass, addr, cert, n), + client: setup(user, pass, addr, cert), params: params, } } +// BlockConnectedHandler attaches a blockconnected notification handler to the +// dcrd client. Every time a notification is received, the header of the +// connected block is sent to the provided channel. +func (d *DcrdConnect) BlockConnectedHandler(blockConnected chan *wire.BlockHeader) { + d.client.notifier = &blockConnectedHandler{ + blockConnected: blockConnected, + } +} + func (d *DcrdConnect) Close() { d.client.Close() + log.Debug("dcrd client closed") } // Client creates a new DcrdRPC client instance. Returns an error if dialing @@ -62,7 +70,7 @@ func (d *DcrdConnect) Client() (*DcrdRPC, string, error) { ctx := context.TODO() c, newConnection, err := d.client.dial(ctx) if err != nil { - return nil, d.client.addr, fmt.Errorf("dcrd connection error: %w", err) + return nil, d.client.addr, fmt.Errorf("dcrd dial error: %w", err) } // If this is a reused connection, we don't need to validate the dcrd config @@ -116,6 +124,16 @@ func (d *DcrdConnect) Client() (*DcrdRPC, string, error) { return nil, d.client.addr, errors.New("dcrd does not have transaction index enabled (--txindex)") } + // Request blockconnected notifications. + if d.client.notifier != nil { + err = c.Call(ctx, "notifyblocks", nil) + if err != nil { + return nil, d.client.addr, fmt.Errorf("notifyblocks failed: %w", err) + } + } + + log.Debugf("Connected to dcrd") + return &DcrdRPC{c, ctx}, d.client.addr, nil } @@ -210,26 +228,3 @@ func (c *DcrdRPC) ExistsLiveTicket(ticketHash string) (bool, error) { return bitset.Bytes(existsBytes).Get(0), nil } - -// ParseBlockConnectedNotification extracts the block header from a -// blockconnected JSON-RPC notification. -func ParseBlockConnectedNotification(params json.RawMessage) (*wire.BlockHeader, error) { - var notif []string - err := json.Unmarshal(params, ¬if) - if err != nil { - return nil, fmt.Errorf("json unmarshal error: %w", err) - } - - if len(notif) == 0 { - return nil, errors.New("notification is empty") - } - - rawHeader := notif[0] - var header wire.BlockHeader - err = header.Deserialize(hex.NewDecoder(bytes.NewReader([]byte(rawHeader)))) - if err != nil { - return nil, fmt.Errorf("error creating block header from bytes: %w", err) - } - - return &header, nil -} diff --git a/rpc/dcrwallet.go b/rpc/dcrwallet.go index f423e6c..8e7f5b9 100644 --- a/rpc/dcrwallet.go +++ b/rpc/dcrwallet.go @@ -34,7 +34,7 @@ func SetupWallet(user, pass, addrs []string, cert [][]byte, params *chaincfg.Par clients := make([]*client, len(addrs)) for i := 0; i < len(addrs); i++ { - clients[i] = setup(user[i], pass[i], addrs[i], cert[i], nil) + clients[i] = setup(user[i], pass[i], addrs[i], cert[i]) } return WalletConnect{ @@ -47,6 +47,7 @@ func (w *WalletConnect) Close() { for _, client := range w.clients { client.Close() } + log.Debug("dcrwallet clients closed") } // Clients loops over each wallet and tries to establish a connection. It @@ -61,7 +62,7 @@ func (w *WalletConnect) Clients() ([]*WalletRPC, []string) { c, newConnection, err := connect.dial(ctx) if err != nil { - log.Errorf("dcrwallet connection error: %v", err) + log.Errorf("dcrwallet dial error: %v", err) failedConnections = append(failedConnections, connect.addr) continue } diff --git a/rpc/notifs.go b/rpc/notifs.go new file mode 100644 index 0000000..70f5394 --- /dev/null +++ b/rpc/notifs.go @@ -0,0 +1,61 @@ +package rpc + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + + "github.com/decred/dcrd/wire" +) + +type blockConnectedHandler struct { + blockConnected chan *wire.BlockHeader +} + +// Notify is called every time a notification is received from dcrd client. +// A wsrpc.Client will never call Notify concurrently. Notify should not return +// an error because that will cause the client to close and no further +// notifications will be received until a new connection is established. +func (n *blockConnectedHandler) Notify(method string, msg json.RawMessage) error { + if method != "blockconnected" { + return nil + } + + header, err := parseBlockConnected(msg) + if err != nil { + log.Errorf("Failed to parse dcrd block notification: %v", err) + return nil + } + + n.blockConnected <- header + + return nil +} + +func (n *blockConnectedHandler) Close() error { + return nil +} + +// parseBlockConnected extracts the block header from a +// blockconnected JSON-RPC notification. +func parseBlockConnected(msg json.RawMessage) (*wire.BlockHeader, error) { + var notif []string + err := json.Unmarshal(msg, ¬if) + if err != nil { + return nil, fmt.Errorf("json unmarshal error: %w", err) + } + + if len(notif) == 0 { + return nil, errors.New("notification is empty") + } + + var header wire.BlockHeader + err = header.Deserialize(hex.NewDecoder(bytes.NewReader([]byte(notif[0])))) + if err != nil { + return nil, fmt.Errorf("error creating block header from bytes: %w", err) + } + + return &header, nil +} diff --git a/vspd.go b/vspd.go index eddc70b..367750a 100644 --- a/vspd.go +++ b/vspd.go @@ -10,7 +10,9 @@ import ( "os" "runtime" "sync" + "time" + "github.com/decred/dcrd/wire" "github.com/decred/vspd/background" "github.com/decred/vspd/database" "github.com/decred/vspd/rpc" @@ -24,6 +26,9 @@ import ( // the database is deleted. const maxVoteChangeRecords = 10 +// consistencyInterval is the time period between wallet consistency checks. +const consistencyInterval = 30 * time.Minute + func main() { // Create a context that is cancelled when a shutdown request is received // through an interrupt signal. @@ -81,13 +86,51 @@ func run(shutdownCtx context.Context) int { // Create RPC client for local dcrd instance (used for broadcasting and // checking the status of fee transactions). - dcrd := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, cfg.DcrdHost, cfg.dcrdCert, nil, cfg.netParams.Params) + dcrd := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, cfg.DcrdHost, cfg.dcrdCert, cfg.netParams.Params) defer dcrd.Close() // Create RPC client for remote dcrwallet instance (used for voting). wallets := rpc.SetupWallet(cfg.walletUsers, cfg.walletPasswords, cfg.walletHosts, cfg.walletCerts, cfg.netParams.Params) defer wallets.Close() + // Create a channel to receive blockConnected notifications from dcrd. + notifChan := make(chan *wire.BlockHeader) + shutdownWg.Add(1) + go func() { + for { + select { + case <-shutdownCtx.Done(): + shutdownWg.Done() + return + case header := <-notifChan: + log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String()) + background.BlockConnected(dcrd, wallets, db) + } + } + }() + + // Attach notification listener to dcrd client. + dcrd.BlockConnectedHandler(notifChan) + + // Loop forever attempting ensuring a dcrd connection is available, so + // notifications are received. + shutdownWg.Add(1) + go func() { + for { + select { + case <-shutdownCtx.Done(): + shutdownWg.Done() + return + case <-time.After(time.Second * 15): + // Ensure dcrd client is still connected. + _, _, err := dcrd.Client() + if err != nil { + log.Errorf("dcrd connect error: %v", err) + } + } + } + }() + // Ensure all data in database is present and up-to-date. err = db.CheckIntegrity(dcrd) if err != nil { @@ -95,6 +138,28 @@ func run(shutdownCtx context.Context) int { log.Errorf("Could not check database integrity: %v", err) } + // Run the block connected handler now to catch up with any blocks mined + // while vspd was shut down. + background.BlockConnected(dcrd, wallets, db) + + // Run voting wallet consistency check now to ensure all wallets are up to + // date. + background.CheckWalletConsistency(dcrd, wallets, db) + + // Run voting wallet consistency check periodically. + shutdownWg.Add(1) + go func() { + for { + select { + case <-shutdownCtx.Done(): + shutdownWg.Done() + return + case <-time.After(consistencyInterval): + background.CheckWalletConsistency(dcrd, wallets, db) + } + } + }() + // Create and start webapi server. apiCfg := webapi.Config{ VSPFee: cfg.VSPFee, @@ -118,16 +183,6 @@ func run(shutdownCtx context.Context) int { return 1 } - // Create a dcrd client with a blockconnected notification handler. - notifHandler := background.NotificationHandler{ShutdownWg: &shutdownWg} - dcrdWithNotifs := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, - cfg.DcrdHost, cfg.dcrdCert, ¬ifHandler, cfg.netParams.Params) - defer dcrdWithNotifs.Close() - - // Start background process which will continually attempt to reconnect to - // dcrd if the connection drops. - background.Start(shutdownCtx, &shutdownWg, db, dcrd, dcrdWithNotifs, wallets) - // Wait for shutdown tasks to complete before running deferred tasks and // returning. shutdownWg.Wait()