vspd: Consolidate background task timers.

Using a single select loop for background tasks removes a lot of
duplicated boilerplate code and helps to simplify shutdown logic. This
does reduce the amount of things which can run in parallel, but that
isn't of concern for vspd. The web server still runs in its own
goroutine so its responsiveness won't be affected.
This commit is contained in:
jholdstock 2023-09-07 19:18:45 +01:00 committed by Jamie Holdstock
parent cfc34a3adc
commit 8df00752c0
2 changed files with 35 additions and 65 deletions

View File

@ -118,11 +118,6 @@ func (v *vspd) run() int {
// through an interrupt signal. // through an interrupt signal.
shutdownCtx := shutdownListener(v.log) shutdownCtx := shutdownListener(v.log)
// WaitGroup for services to signal when they have shutdown cleanly.
var shutdownWg sync.WaitGroup
v.db.WritePeriodicBackups(shutdownCtx, &shutdownWg, v.cfg.BackupInterval)
// Run database integrity checks to ensure all data in database is present // Run database integrity checks to ensure all data in database is present
// and up-to-date. // and up-to-date.
err := v.checkDatabaseIntegrity() err := v.checkDatabaseIntegrity()
@ -139,19 +134,8 @@ func (v *vspd) run() int {
// date. // date.
v.checkWalletConsistency() v.checkWalletConsistency()
// Run voting wallet consistency check periodically. // WaitGroup for services to signal when they have shutdown cleanly.
shutdownWg.Add(1) var shutdownWg sync.WaitGroup
go func() {
for {
select {
case <-shutdownCtx.Done():
shutdownWg.Done()
return
case <-time.After(consistencyInterval):
v.checkWalletConsistency()
}
}
}()
// Create and start webapi server. // Create and start webapi server.
apiCfg := webapi.Config{ apiCfg := webapi.Config{
@ -176,36 +160,46 @@ func (v *vspd) run() int {
return 1 return 1
} }
// Start handling blockConnected notifications from dcrd. // Start all background tasks and notification handlers.
shutdownWg.Add(1) shutdownWg.Add(1)
go func() { go func() {
for { backupTicker := time.NewTicker(v.cfg.BackupInterval)
select { defer backupTicker.Stop()
case <-shutdownCtx.Done(): consistencyTicker := time.NewTicker(consistencyInterval)
shutdownWg.Done() defer consistencyTicker.Stop()
return dcrdTicker := time.NewTicker(dcrdInterval)
case header := <-v.blockNotifChan: defer dcrdTicker.Stop()
v.log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String())
v.blockConnected()
}
}
}()
// Loop forever attempting ensuring a dcrd connection is available, so
// notifications are received.
shutdownWg.Add(1)
go func() {
for { for {
select { select {
case <-shutdownCtx.Done():
shutdownWg.Done() // Periodically write a database backup file.
return case <-backupTicker.C:
case <-time.After(dcrdInterval): err := v.db.WriteHotBackupFile()
// Ensure dcrd client is still connected. if err != nil {
v.log.Errorf("Failed to write database backup: %v", err)
}
// Run voting wallet consistency check periodically.
case <-consistencyTicker.C:
v.checkWalletConsistency()
// Ensure dcrd client is connected so notifications are received.
case <-dcrdTicker.C:
_, _, err := v.dcrd.Client() _, _, err := v.dcrd.Client()
if err != nil { if err != nil {
v.log.Errorf("dcrd connect error: %v", err) v.log.Errorf("dcrd connect error: %v", err)
} }
// Handle blockconnected notifications from dcrd.
case header := <-v.blockNotifChan:
v.log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String())
v.blockConnected()
// Handle shutdown request.
case <-shutdownCtx.Done():
shutdownWg.Done()
return
} }
} }
}() }()

View File

@ -5,7 +5,6 @@
package database package database
import ( import (
"context"
"crypto/ed25519" "crypto/ed25519"
"crypto/rand" "crypto/rand"
"fmt" "fmt"
@ -61,9 +60,9 @@ const (
// backupMtx should be held when writing to the database backup file. // backupMtx should be held when writing to the database backup file.
var backupMtx sync.Mutex var backupMtx sync.Mutex
// writeHotBackupFile writes a backup of the database file while the database // WriteHotBackupFile writes a backup of the database file while the database
// is still open. // is still open.
func (vdb *VspDatabase) writeHotBackupFile() error { func (vdb *VspDatabase) WriteHotBackupFile() error {
backupMtx.Lock() backupMtx.Lock()
defer backupMtx.Unlock() defer backupMtx.Unlock()
@ -216,29 +215,6 @@ func Open(dbFile string, log slog.Logger, maxVoteChangeRecords int) (*VspDatabas
return vdb, nil return vdb, nil
} }
// WritePeriodicBackups starts a goroutine to periodically write a database backup file.
// It can be stopped by cancelling the provided context, and uses the provided
// WaitGroup to signal that it has finished.
func (vdb *VspDatabase) WritePeriodicBackups(shutdownCtx context.Context, shutdownWg *sync.WaitGroup,
backupInterval time.Duration) {
shutdownWg.Add(1)
go func() {
for {
select {
case <-time.After(backupInterval):
err := vdb.writeHotBackupFile()
if err != nil {
vdb.log.Errorf("Failed to write database backup: %v", err)
}
case <-shutdownCtx.Done():
shutdownWg.Done()
return
}
}
}()
}
// Close will close the database and, if requested, make a copy of the database // Close will close the database and, if requested, make a copy of the database
// to the backup location. // to the backup location.
func (vdb *VspDatabase) Close(writeBackup bool) { func (vdb *VspDatabase) Close(writeBackup bool) {