Clean up use of shutdown context.

- Rename all instances to "shutdownCtx" to be really explicit. This context is special in that it can be closed at any time without warning, so it should be obvious to the reader.
- Don't use shutdownCtx in RPC clients. Clients should not stop working immediately when shutdown is signalled, they need to keep working while the process is shutting down.
This commit is contained in:
jholdstock 2022-03-31 13:44:52 +01:00 committed by Jamie Holdstock
parent e0b0630248
commit 75026f5e91
9 changed files with 36 additions and 39 deletions

View File

@ -72,9 +72,7 @@ func blockConnected() {
const funcName = "blockConnected" const funcName = "blockConnected"
ctx := context.Background() dcrdClient, _, err := dcrdRPC.Client()
dcrdClient, _, err := dcrdRPC.Client(ctx)
if err != nil { if err != nil {
log.Errorf("%s: %v", funcName, err) log.Errorf("%s: %v", funcName, err)
return return
@ -168,7 +166,7 @@ func blockConnected() {
log.Errorf("%s: db.GetUnconfirmedFees error: %v", funcName, err) log.Errorf("%s: db.GetUnconfirmedFees error: %v", funcName, err)
} }
walletClients, failedConnections := walletRPC.Clients(ctx) walletClients, failedConnections := walletRPC.Clients()
if len(walletClients) == 0 { if len(walletClients) == 0 {
log.Errorf("%s: Could not connect to any wallets", funcName) log.Errorf("%s: Could not connect to any wallets", funcName)
return return
@ -331,7 +329,7 @@ func blockConnected() {
func connectNotifier(shutdownCtx context.Context, dcrdWithNotifs rpc.DcrdConnect) error { func connectNotifier(shutdownCtx context.Context, dcrdWithNotifs rpc.DcrdConnect) error {
notifierClosed = make(chan struct{}) notifierClosed = make(chan struct{})
dcrdClient, _, err := dcrdWithNotifs.Client(shutdownCtx) dcrdClient, _, err := dcrdWithNotifs.Client()
if err != nil { if err != nil {
return err return err
} }
@ -423,15 +421,13 @@ func checkWalletConsistency() {
log.Info("Checking voting wallet consistency") log.Info("Checking voting wallet consistency")
ctx := context.Background() dcrdClient, _, err := dcrdRPC.Client()
dcrdClient, _, err := dcrdRPC.Client(ctx)
if err != nil { if err != nil {
log.Errorf("%s: %v", funcName, err) log.Errorf("%s: %v", funcName, err)
return return
} }
walletClients, failedConnections := walletRPC.Clients(ctx) walletClients, failedConnections := walletRPC.Clients()
if len(walletClients) == 0 { if len(walletClients) == 0 {
log.Errorf("%s: Could not connect to any wallets", funcName) log.Errorf("%s: Could not connect to any wallets", funcName)
return return

View File

@ -172,7 +172,8 @@ func CreateNew(dbFile, feeXPub string) error {
// Open initializes and returns an open database. An error is returned if no // Open initializes and returns an open database. An error is returned if no
// database file is found at the provided path. // database file is found at the provided path.
func Open(ctx context.Context, shutdownWg *sync.WaitGroup, dbFile string, backupInterval time.Duration, maxVoteChangeRecords int) (*VspDatabase, error) { func Open(shutdownCtx context.Context, shutdownWg *sync.WaitGroup, dbFile string,
backupInterval time.Duration, maxVoteChangeRecords int) (*VspDatabase, error) {
// Error if db file does not exist. This is needed because bolt.Open will // Error if db file does not exist. This is needed because bolt.Open will
// silently create a new empty database if the file does not exist. A new // silently create a new empty database if the file does not exist. A new
@ -211,7 +212,7 @@ func Open(ctx context.Context, shutdownWg *sync.WaitGroup, dbFile string, backup
if err != nil { if err != nil {
log.Errorf("Failed to write database backup: %v", err) log.Errorf("Failed to write database backup: %v", err)
} }
case <-ctx.Done(): case <-shutdownCtx.Done():
shutdownWg.Done() shutdownWg.Done()
return return
} }
@ -385,7 +386,7 @@ func (vdb *VspDatabase) BackupDB(w http.ResponseWriter) error {
// CheckIntegrity will ensure that all data in the database is present and up to // CheckIntegrity will ensure that all data in the database is present and up to
// date. // date.
func (vdb *VspDatabase) CheckIntegrity(ctx context.Context, dcrd rpc.DcrdConnect) error { func (vdb *VspDatabase) CheckIntegrity(dcrd rpc.DcrdConnect) error {
// Ensure all confirmed tickets have a purchase height. // Ensure all confirmed tickets have a purchase height.
// This is necessary because of an old bug which, in some circumstances, // This is necessary because of an old bug which, in some circumstances,
@ -400,7 +401,7 @@ func (vdb *VspDatabase) CheckIntegrity(ctx context.Context, dcrd rpc.DcrdConnect
return nil return nil
} }
dcrdClient, _, err := dcrd.Client(ctx) dcrdClient, _, err := dcrd.Client()
if err != nil { if err != nil {
return err return err
} }

View File

@ -83,9 +83,8 @@ func (c *client) Close() {
} }
// dial will return a connect rpc client if one exists, or attempt to create a // dial will return a connect rpc client if one exists, or attempt to create a
// new one if not. A // new one if not. A boolean indicates whether this connection is new (true), or
// boolean indicates whether this connection is new (true), or if it is an // if it is an existing connection which is being reused (false).
// existing connection which is being reused (false).
func (c *client) dial(ctx context.Context) (Caller, bool, error) { func (c *client) dial(ctx context.Context) (Caller, bool, error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()

View File

@ -58,7 +58,8 @@ func (d *DcrdConnect) Close() {
// Client creates a new DcrdRPC client instance. Returns an error if dialing // Client creates a new DcrdRPC client instance. Returns an error if dialing
// dcrd fails or if dcrd is misconfigured. // dcrd fails or if dcrd is misconfigured.
func (d *DcrdConnect) Client(ctx context.Context) (*DcrdRPC, string, error) { func (d *DcrdConnect) Client() (*DcrdRPC, string, error) {
ctx := context.Background()
c, newConnection, err := d.client.dial(ctx) c, newConnection, err := d.client.dial(ctx)
if err != nil { if err != nil {
return nil, d.client.addr, fmt.Errorf("dcrd connection error: %w", err) return nil, d.client.addr, fmt.Errorf("dcrd connection error: %w", err)

View File

@ -52,7 +52,8 @@ func (w *WalletConnect) Close() {
// Clients loops over each wallet and tries to establish a connection. It // Clients loops over each wallet and tries to establish a connection. It
// increments a count of failed connections if a connection cannot be // increments a count of failed connections if a connection cannot be
// established, or if the wallet is misconfigured. // established, or if the wallet is misconfigured.
func (w *WalletConnect) Clients(ctx context.Context) ([]*WalletRPC, []string) { func (w *WalletConnect) Clients() ([]*WalletRPC, []string) {
ctx := context.Background()
walletClients := make([]*WalletRPC, 0) walletClients := make([]*WalletRPC, 0)
failedConnections := make([]string, 0) failedConnections := make([]string, 0)

16
vspd.go
View File

@ -28,11 +28,11 @@ const maxVoteChangeRecords = 10
func main() { func main() {
// Create a context that is cancelled when a shutdown request is received // Create a context that is cancelled when a shutdown request is received
// through an interrupt signal. // through an interrupt signal.
ctx := withShutdownCancel(context.Background()) shutdownCtx := withShutdownCancel(context.Background())
go shutdownListener() go shutdownListener()
// Run until error is returned, or shutdown is requested. // Run until error is returned, or shutdown is requested.
if err := run(ctx); err != nil && !errors.Is(err, context.Canceled) { if err := run(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) {
os.Exit(1) os.Exit(1)
} }
} }
@ -41,7 +41,7 @@ func main() {
// is responsible for parsing the config, creating a dcrwallet RPC client, // is responsible for parsing the config, creating a dcrwallet RPC client,
// opening the database, starting the webserver, and stopping all started // opening the database, starting the webserver, and stopping all started
// services when the context is cancelled. // services when the context is cancelled.
func run(ctx context.Context) error { func run(shutdownCtx context.Context) error {
// Load config file and parse CLI args. // Load config file and parse CLI args.
cfg, err := loadConfig() cfg, err := loadConfig()
@ -73,7 +73,7 @@ func run(ctx context.Context) error {
defer log.Criticalf("Shutdown complete") defer log.Criticalf("Shutdown complete")
// Open database. // Open database.
db, err := database.Open(ctx, &shutdownWg, cfg.dbPath, cfg.BackupInterval, maxVoteChangeRecords) db, err := database.Open(shutdownCtx, &shutdownWg, cfg.dbPath, cfg.BackupInterval, maxVoteChangeRecords)
if err != nil { if err != nil {
log.Errorf("Database error: %v", err) log.Errorf("Database error: %v", err)
requestShutdown() requestShutdown()
@ -92,7 +92,7 @@ func run(ctx context.Context) error {
defer wallets.Close() defer wallets.Close()
// Ensure all data in database is present and up-to-date. // Ensure all data in database is present and up-to-date.
err = db.CheckIntegrity(ctx, dcrd) err = db.CheckIntegrity(dcrd)
if err != nil { if err != nil {
// vspd should still start if this fails, so just log an error. // vspd should still start if this fails, so just log an error.
log.Errorf("Could not check database integrity: %v", err) log.Errorf("Could not check database integrity: %v", err)
@ -112,7 +112,7 @@ func run(ctx context.Context) error {
MaxVoteChangeRecords: maxVoteChangeRecords, MaxVoteChangeRecords: maxVoteChangeRecords,
VspdVersion: version.String(), VspdVersion: version.String(),
} }
err = webapi.Start(ctx, requestShutdown, &shutdownWg, cfg.Listen, db, err = webapi.Start(shutdownCtx, requestShutdown, &shutdownWg, cfg.Listen, db,
dcrd, wallets, apiCfg) dcrd, wallets, apiCfg)
if err != nil { if err != nil {
log.Errorf("Failed to initialize webapi: %v", err) log.Errorf("Failed to initialize webapi: %v", err)
@ -129,11 +129,11 @@ func run(ctx context.Context) error {
// Start background process which will continually attempt to reconnect to // Start background process which will continually attempt to reconnect to
// dcrd if the connection drops. // dcrd if the connection drops.
background.Start(ctx, &shutdownWg, db, dcrd, dcrdWithNotifs, wallets) background.Start(shutdownCtx, &shutdownWg, db, dcrd, dcrdWithNotifs, wallets)
// Wait for shutdown tasks to complete before running deferred tasks and // Wait for shutdown tasks to complete before running deferred tasks and
// returning. // returning.
shutdownWg.Wait() shutdownWg.Wait()
return ctx.Err() return shutdownCtx.Err()
} }

View File

@ -5,7 +5,6 @@
package webapi package webapi
import ( import (
"context"
"errors" "errors"
"sync" "sync"
"time" "time"
@ -56,8 +55,8 @@ func newCache(signPubKey string) *cache {
// update will use the provided database and RPC connections to update the // update will use the provided database and RPC connections to update the
// dynamic values in the cache. // dynamic values in the cache.
func (c *cache) update(ctx context.Context, db *database.VspDatabase, func (c *cache) update(db *database.VspDatabase, dcrd rpc.DcrdConnect,
dcrd rpc.DcrdConnect, wallets rpc.WalletConnect) error { wallets rpc.WalletConnect) error {
dbSize, err := db.Size() dbSize, err := db.Size()
if err != nil { if err != nil {
@ -71,7 +70,7 @@ func (c *cache) update(ctx context.Context, db *database.VspDatabase,
} }
// Get latest best block height. // Get latest best block height.
dcrdClient, _, err := dcrd.Client(ctx) dcrdClient, _, err := dcrd.Client()
if err != nil { if err != nil {
return err return err
} }
@ -85,7 +84,7 @@ func (c *cache) update(ctx context.Context, db *database.VspDatabase,
return errors.New("dcr node reports a network ticket pool size of zero") return errors.New("dcr node reports a network ticket pool size of zero")
} }
clients, failedConnections := wallets.Clients(ctx) clients, failedConnections := wallets.Clients()
if len(clients) == 0 { if len(clients) == 0 {
log.Error("Could not connect to any wallets") log.Error("Could not connect to any wallets")
} else if len(failedConnections) > 0 { } else if len(failedConnections) > 0 {

View File

@ -71,7 +71,7 @@ func (s *Server) requireAdmin(c *gin.Context) {
// downstream handlers to make use of. // downstream handlers to make use of.
func withDcrdClient(dcrd rpc.DcrdConnect) gin.HandlerFunc { func withDcrdClient(dcrd rpc.DcrdConnect) gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
client, hostname, err := dcrd.Client(c) client, hostname, err := dcrd.Client()
// Don't handle the error here, add it to the context and let downstream // Don't handle the error here, add it to the context and let downstream
// handlers decide what to do with it. // handlers decide what to do with it.
c.Set(dcrdKey, client) c.Set(dcrdKey, client)
@ -85,7 +85,7 @@ func withDcrdClient(dcrd rpc.DcrdConnect) gin.HandlerFunc {
// must handle the case where no wallet clients are connected. // must handle the case where no wallet clients are connected.
func withWalletClients(wallets rpc.WalletConnect) gin.HandlerFunc { func withWalletClients(wallets rpc.WalletConnect) gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
clients, failedConnections := wallets.Clients(c) clients, failedConnections := wallets.Clients()
if len(clients) == 0 { if len(clients) == 0 {
log.Error("Could not connect to any wallets") log.Error("Could not connect to any wallets")
} else if len(failedConnections) > 0 { } else if len(failedConnections) > 0 {

View File

@ -72,7 +72,7 @@ type Server struct {
signPubKey ed25519.PublicKey signPubKey ed25519.PublicKey
} }
func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGroup, func Start(shutdownCtx context.Context, requestShutdown func(), shutdownWg *sync.WaitGroup,
listen string, vdb *database.VspDatabase, dcrd rpc.DcrdConnect, wallets rpc.WalletConnect, config Config) error { listen string, vdb *database.VspDatabase, dcrd rpc.DcrdConnect, wallets rpc.WalletConnect, config Config) error {
s := &Server{ s := &Server{
@ -90,7 +90,7 @@ func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGro
// Populate cached VSP stats before starting webserver. // Populate cached VSP stats before starting webserver.
s.cache = newCache(base64.StdEncoding.EncodeToString(s.signPubKey)) s.cache = newCache(base64.StdEncoding.EncodeToString(s.signPubKey))
err = s.cache.update(ctx, vdb, dcrd, wallets) err = s.cache.update(vdb, dcrd, wallets)
if err != nil { if err != nil {
log.Errorf("Could not initialize VSP stats cache: %v", err) log.Errorf("Could not initialize VSP stats cache: %v", err)
} }
@ -118,7 +118,7 @@ func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGro
// Create TCP listener. // Create TCP listener.
var listenConfig net.ListenConfig var listenConfig net.ListenConfig
listener, err := listenConfig.Listen(ctx, "tcp", listen) listener, err := listenConfig.Listen(shutdownCtx, "tcp", listen)
if err != nil { if err != nil {
return err return err
} }
@ -134,7 +134,7 @@ func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGro
shutdownWg.Add(1) shutdownWg.Add(1)
go func() { go func() {
// Wait until shutdown is signaled before shutting down. // Wait until shutdown is signaled before shutting down.
<-ctx.Done() <-shutdownCtx.Done()
log.Debug("Stopping webserver...") log.Debug("Stopping webserver...")
// Give the webserver 5 seconds to finish what it is doing. // Give the webserver 5 seconds to finish what it is doing.
@ -171,11 +171,11 @@ func Start(ctx context.Context, requestShutdown func(), shutdownWg *sync.WaitGro
go func() { go func() {
for { for {
select { select {
case <-ctx.Done(): case <-shutdownCtx.Done():
shutdownWg.Done() shutdownWg.Done()
return return
case <-time.After(refresh): case <-time.After(refresh):
err := s.cache.update(ctx, vdb, dcrd, wallets) err := s.cache.update(vdb, dcrd, wallets)
if err != nil { if err != nil {
log.Errorf("Failed to update cached VSP stats: %v", err) log.Errorf("Failed to update cached VSP stats: %v", err)
} }