From 75026f5e91593123a8c0880546d2ce73eed005df Mon Sep 17 00:00:00 2001 From: jholdstock Date: Thu, 31 Mar 2022 13:44:52 +0100 Subject: [PATCH] Clean up use of shutdown context. - Rename all instances to "shutdownCtx" to be really explicit. This context is special in that it can be closed at any time without warning, so it should be obvious to the reader. - Don't use shutdownCtx in RPC clients. Clients should not stop working immediately when shutdown is signalled, they need to keep working while the process is shutting down. --- background/background.go | 14 +++++--------- database/database.go | 9 +++++---- rpc/client.go | 5 ++--- rpc/dcrd.go | 3 ++- rpc/dcrwallet.go | 3 ++- vspd.go | 16 ++++++++-------- webapi/cache.go | 9 ++++----- webapi/middleware.go | 4 ++-- webapi/webapi.go | 12 ++++++------ 9 files changed, 36 insertions(+), 39 deletions(-) diff --git a/background/background.go b/background/background.go index 821f95d..14d0660 100644 --- a/background/background.go +++ b/background/background.go @@ -72,9 +72,7 @@ func blockConnected() { const funcName = "blockConnected" - ctx := context.Background() - - dcrdClient, _, err := dcrdRPC.Client(ctx) + dcrdClient, _, err := dcrdRPC.Client() if err != nil { log.Errorf("%s: %v", funcName, err) return @@ -168,7 +166,7 @@ func blockConnected() { log.Errorf("%s: db.GetUnconfirmedFees error: %v", funcName, err) } - walletClients, failedConnections := walletRPC.Clients(ctx) + walletClients, failedConnections := walletRPC.Clients() if len(walletClients) == 0 { log.Errorf("%s: Could not connect to any wallets", funcName) return @@ -331,7 +329,7 @@ func blockConnected() { func connectNotifier(shutdownCtx context.Context, dcrdWithNotifs rpc.DcrdConnect) error { notifierClosed = make(chan struct{}) - dcrdClient, _, err := dcrdWithNotifs.Client(shutdownCtx) + dcrdClient, _, err := dcrdWithNotifs.Client() if err != nil { return err } @@ -423,15 +421,13 @@ func checkWalletConsistency() { log.Info("Checking voting wallet consistency") - ctx := context.Background() - - dcrdClient, _, err := dcrdRPC.Client(ctx) + dcrdClient, _, err := dcrdRPC.Client() if err != nil { log.Errorf("%s: %v", funcName, err) return } - walletClients, failedConnections := walletRPC.Clients(ctx) + walletClients, failedConnections := walletRPC.Clients() if len(walletClients) == 0 { log.Errorf("%s: Could not connect to any wallets", funcName) return diff --git a/database/database.go b/database/database.go index 93c28c1..0881809 100644 --- a/database/database.go +++ b/database/database.go @@ -172,7 +172,8 @@ func CreateNew(dbFile, feeXPub string) error { // Open initializes and returns an open database. An error is returned if no // database file is found at the provided path. -func Open(ctx context.Context, shutdownWg *sync.WaitGroup, dbFile string, backupInterval time.Duration, maxVoteChangeRecords int) (*VspDatabase, error) { +func Open(shutdownCtx context.Context, shutdownWg *sync.WaitGroup, dbFile string, + backupInterval time.Duration, maxVoteChangeRecords int) (*VspDatabase, error) { // Error if db file does not exist. This is needed because bolt.Open will // silently create a new empty database if the file does not exist. A new @@ -211,7 +212,7 @@ func Open(ctx context.Context, shutdownWg *sync.WaitGroup, dbFile string, backup if err != nil { log.Errorf("Failed to write database backup: %v", err) } - case <-ctx.Done(): + case <-shutdownCtx.Done(): shutdownWg.Done() return } @@ -385,7 +386,7 @@ func (vdb *VspDatabase) BackupDB(w http.ResponseWriter) error { // CheckIntegrity will ensure that all data in the database is present and up to // date. -func (vdb *VspDatabase) CheckIntegrity(ctx context.Context, dcrd rpc.DcrdConnect) error { +func (vdb *VspDatabase) CheckIntegrity(dcrd rpc.DcrdConnect) error { // Ensure all confirmed tickets have a purchase height. // This is necessary because of an old bug which, in some circumstances, @@ -400,7 +401,7 @@ func (vdb *VspDatabase) CheckIntegrity(ctx context.Context, dcrd rpc.DcrdConnect return nil } - dcrdClient, _, err := dcrd.Client(ctx) + dcrdClient, _, err := dcrd.Client() if err != nil { return err } diff --git a/rpc/client.go b/rpc/client.go index 4c28577..1f34d80 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -83,9 +83,8 @@ func (c *client) Close() { } // dial will return a connect rpc client if one exists, or attempt to create a -// new one if not. A -// boolean indicates whether this connection is new (true), or if it is an -// existing connection which is being reused (false). +// new one if not. A boolean indicates whether this connection is new (true), or +// if it is an existing connection which is being reused (false). func (c *client) dial(ctx context.Context) (Caller, bool, error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/rpc/dcrd.go b/rpc/dcrd.go index ab23896..f6eafeb 100644 --- a/rpc/dcrd.go +++ b/rpc/dcrd.go @@ -58,7 +58,8 @@ func (d *DcrdConnect) Close() { // Client creates a new DcrdRPC client instance. Returns an error if dialing // dcrd fails or if dcrd is misconfigured. -func (d *DcrdConnect) Client(ctx context.Context) (*DcrdRPC, string, error) { +func (d *DcrdConnect) Client() (*DcrdRPC, string, error) { + ctx := context.Background() c, newConnection, err := d.client.dial(ctx) if err != nil { return nil, d.client.addr, fmt.Errorf("dcrd connection error: %w", err) diff --git a/rpc/dcrwallet.go b/rpc/dcrwallet.go index 3b95ad0..fa15895 100644 --- a/rpc/dcrwallet.go +++ b/rpc/dcrwallet.go @@ -52,7 +52,8 @@ func (w *WalletConnect) Close() { // Clients loops over each wallet and tries to establish a connection. It // increments a count of failed connections if a connection cannot be // established, or if the wallet is misconfigured. -func (w *WalletConnect) Clients(ctx context.Context) ([]*WalletRPC, []string) { +func (w *WalletConnect) Clients() ([]*WalletRPC, []string) { + ctx := context.Background() walletClients := make([]*WalletRPC, 0) failedConnections := make([]string, 0) diff --git a/vspd.go b/vspd.go index 054e3f0..858dd1d 100644 --- a/vspd.go +++ b/vspd.go @@ -28,11 +28,11 @@ const maxVoteChangeRecords = 10 func main() { // Create a context that is cancelled when a shutdown request is received // through an interrupt signal. - ctx := withShutdownCancel(context.Background()) + shutdownCtx := withShutdownCancel(context.Background()) go shutdownListener() // Run until error is returned, or shutdown is requested. - if err := run(ctx); err != nil && !errors.Is(err, context.Canceled) { + if err := run(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) { os.Exit(1) } } @@ -41,7 +41,7 @@ func main() { // is responsible for parsing the config, creating a dcrwallet RPC client, // opening the database, starting the webserver, and stopping all started // services when the context is cancelled. -func run(ctx context.Context) error { +func run(shutdownCtx context.Context) error { // Load config file and parse CLI args. cfg, err := loadConfig() @@ -73,7 +73,7 @@ func run(ctx context.Context) error { defer log.Criticalf("Shutdown complete") // Open database. - db, err := database.Open(ctx, &shutdownWg, cfg.dbPath, cfg.BackupInterval, maxVoteChangeRecords) + db, err := database.Open(shutdownCtx, &shutdownWg, cfg.dbPath, cfg.BackupInterval, maxVoteChangeRecords) if err != nil { log.Errorf("Database error: %v", err) requestShutdown() @@ -92,7 +92,7 @@ func run(ctx context.Context) error { defer wallets.Close() // Ensure all data in database is present and up-to-date. - err = db.CheckIntegrity(ctx, dcrd) + err = db.CheckIntegrity(dcrd) if err != nil { // vspd should still start if this fails, so just log an error. log.Errorf("Could not check database integrity: %v", err) @@ -112,7 +112,7 @@ func run(ctx context.Context) error { MaxVoteChangeRecords: maxVoteChangeRecords, VspdVersion: version.String(), } - err = webapi.Start(ctx, requestShutdown, &shutdownWg, cfg.Listen, db, + err = webapi.Start(shutdownCtx, requestShutdown, &shutdownWg, cfg.Listen, db, dcrd, wallets, apiCfg) if err != nil { log.Errorf("Failed to initialize webapi: %v", err) @@ -129,11 +129,11 @@ func run(ctx context.Context) error { // Start background process which will continually attempt to reconnect to // dcrd if the connection drops. - background.Start(ctx, &shutdownWg, db, dcrd, dcrdWithNotifs, wallets) + background.Start(shutdownCtx, &shutdownWg, db, dcrd, dcrdWithNotifs, wallets) // Wait for shutdown tasks to complete before running deferred tasks and // returning. shutdownWg.Wait() - return ctx.Err() + return shutdownCtx.Err() } diff --git a/webapi/cache.go b/webapi/cache.go index e19adab..3dd97a6 100644 --- a/webapi/cache.go +++ b/webapi/cache.go @@ -5,7 +5,6 @@ package webapi import ( - "context" "errors" "sync" "time" @@ -56,8 +55,8 @@ func newCache(signPubKey string) *cache { // update will use the provided database and RPC connections to update the // dynamic values in the cache. -func (c *cache) update(ctx context.Context, db *database.VspDatabase, - dcrd rpc.DcrdConnect, wallets rpc.WalletConnect) error { +func (c *cache) update(db *database.VspDatabase, dcrd rpc.DcrdConnect, + wallets rpc.WalletConnect) error { dbSize, err := db.Size() if err != nil { @@ -71,7 +70,7 @@ func (c *cache) update(ctx context.Context, db *database.VspDatabase, } // Get latest best block height. - dcrdClient, _, err := dcrd.Client(ctx) + dcrdClient, _, err := dcrd.Client() if err != nil { return err } @@ -85,7 +84,7 @@ func (c *cache) update(ctx context.Context, db *database.VspDatabase, return errors.New("dcr node reports a network ticket pool size of zero") } - clients, failedConnections := wallets.Clients(ctx) + clients, failedConnections := wallets.Clients() if len(clients) == 0 { log.Error("Could not connect to any wallets") } else if len(failedConnections) > 0 { diff --git a/webapi/middleware.go b/webapi/middleware.go index 8bcf61b..8828796 100644 --- a/webapi/middleware.go +++ b/webapi/middleware.go @@ -71,7 +71,7 @@ func (s *Server) requireAdmin(c *gin.Context) { // downstream handlers to make use of. func withDcrdClient(dcrd rpc.DcrdConnect) gin.HandlerFunc { return func(c *gin.Context) { - client, hostname, err := dcrd.Client(c) + client, hostname, err := dcrd.Client() // Don't handle the error here, add it to the context and let downstream // handlers decide what to do with it. c.Set(dcrdKey, client) @@ -85,7 +85,7 @@ func withDcrdClient(dcrd rpc.DcrdConnect) gin.HandlerFunc { // must handle the case where no wallet clients are connected. func withWalletClients(wallets rpc.WalletConnect) gin.HandlerFunc { return func(c *gin.Context) { - clients, failedConnections := wallets.Clients(c) + clients, failedConnections := wallets.Clients() if len(clients) == 0 { log.Error("Could not connect to any wallets") } else if len(failedConnections) > 0 { diff --git a/webapi/webapi.go b/webapi/webapi.go index 6c18ef7..0857132 100644 --- a/webapi/webapi.go +++ b/webapi/webapi.go @@ -72,7 +72,7 @@ type Server struct { signPubKey ed25519.PublicKey } -func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGroup, +func Start(shutdownCtx context.Context, requestShutdown func(), shutdownWg *sync.WaitGroup, listen string, vdb *database.VspDatabase, dcrd rpc.DcrdConnect, wallets rpc.WalletConnect, config Config) error { s := &Server{ @@ -90,7 +90,7 @@ func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGro // Populate cached VSP stats before starting webserver. s.cache = newCache(base64.StdEncoding.EncodeToString(s.signPubKey)) - err = s.cache.update(ctx, vdb, dcrd, wallets) + err = s.cache.update(vdb, dcrd, wallets) if err != nil { log.Errorf("Could not initialize VSP stats cache: %v", err) } @@ -118,7 +118,7 @@ func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGro // Create TCP listener. var listenConfig net.ListenConfig - listener, err := listenConfig.Listen(ctx, "tcp", listen) + listener, err := listenConfig.Listen(shutdownCtx, "tcp", listen) if err != nil { return err } @@ -134,7 +134,7 @@ func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGro shutdownWg.Add(1) go func() { // Wait until shutdown is signaled before shutting down. - <-ctx.Done() + <-shutdownCtx.Done() log.Debug("Stopping webserver...") // Give the webserver 5 seconds to finish what it is doing. @@ -171,11 +171,11 @@ func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGro go func() { for { select { - case <-ctx.Done(): + case <-shutdownCtx.Done(): shutdownWg.Done() return case <-time.After(refresh): - err := s.cache.update(ctx, vdb, dcrd, wallets) + err := s.cache.update(vdb, dcrd, wallets) if err != nil { log.Errorf("Failed to update cached VSP stats: %v", err) }