Remove global vars from background.go

While removing the globals from background.go it also made sense to clean up the use of dcrd RPC clients. Previously two seperate clients were maintained, one for making RPC calls and one for receiving notifications. These clients have been unified.
This commit is contained in:
jholdstock 2022-06-08 10:26:54 +01:00 committed by Jamie Holdstock
parent 01b0df2d7a
commit 7f72caafe7
6 changed files with 162 additions and 182 deletions

View File

@ -5,72 +5,26 @@
package background package background
import ( import (
"context"
"encoding/json"
"errors" "errors"
"strings" "strings"
"sync"
"time"
"github.com/decred/vspd/database" "github.com/decred/vspd/database"
"github.com/decred/vspd/rpc" "github.com/decred/vspd/rpc"
"github.com/jrick/wsrpc/v2" "github.com/jrick/wsrpc/v2"
) )
var (
db *database.VspDatabase
dcrdRPC rpc.DcrdConnect
walletRPC rpc.WalletConnect
notifierClosed chan struct{}
)
type NotificationHandler struct {
ShutdownWg *sync.WaitGroup
}
const ( const (
// consistencyInterval is the time period between wallet consistency checks.
consistencyInterval = 30 * time.Minute
// requiredConfs is the number of confirmations required to consider a // requiredConfs is the number of confirmations required to consider a
// ticket purchase or a fee transaction to be final. // ticket purchase or a fee transaction to be final.
requiredConfs = 6 requiredConfs = 6
) )
// Notify is called every time a block notification is received from dcrd. // BlockConnected is called once when vspd starts up, and once each time a
// Notify is never called concurrently. Notify should not return an error
// because that will cause the client to close and no further notifications will
// be received until a new connection is established.
func (n *NotificationHandler) Notify(method string, params json.RawMessage) error {
n.ShutdownWg.Add(1)
defer n.ShutdownWg.Done()
if method != "blockconnected" {
return nil
}
header, err := rpc.ParseBlockConnectedNotification(params)
if err != nil {
log.Errorf("Failed to parse dcrd block notification: %v", err)
return nil
}
log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String())
blockConnected()
return nil
}
func (n *NotificationHandler) Close() error {
close(notifierClosed)
return nil
}
// blockConnected is called once when vspd starts up, and once each time a
// blockconnected notification is received from dcrd. // blockconnected notification is received from dcrd.
func blockConnected() { func BlockConnected(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *database.VspDatabase) {
const funcName = "blockConnected" const funcName = "BlockConnected"
dcrdClient, _, err := dcrdRPC.Client() dcrdClient, _, err := dcrdRPC.Client()
if err != nil { if err != nil {
@ -326,98 +280,12 @@ func blockConnected() {
} }
func connectNotifier(shutdownCtx context.Context, dcrdWithNotifs rpc.DcrdConnect) error { // CheckWalletConsistency will retrieve all votable tickets from the database
notifierClosed = make(chan struct{})
dcrdClient, _, err := dcrdWithNotifs.Client()
if err != nil {
return err
}
err = dcrdClient.NotifyBlocks()
if err != nil {
return err
}
log.Info("Subscribed for dcrd block notifications")
// Wait until context is done (vspd is shutting down), or until the
// notifier is closed.
select {
case <-shutdownCtx.Done():
// A shutdown signal has been received - close the client with the
// notification handler to prevent further notifications from being
// received.
dcrdWithNotifs.Close()
return nil
case <-notifierClosed:
log.Warnf("dcrd notifier closed")
return nil
}
}
func Start(shutdownCtx context.Context, wg *sync.WaitGroup, vdb *database.VspDatabase, drpc rpc.DcrdConnect,
dcrdWithNotif rpc.DcrdConnect, wrpc rpc.WalletConnect) {
db = vdb
dcrdRPC = drpc
walletRPC = wrpc
// Run the block connected handler now to catch up with any blocks mined
// while vspd was shut down.
blockConnected()
// Run voting wallet consistency check now to ensure all wallets are up to
// date.
checkWalletConsistency()
// Run voting wallet consistency check periodically.
wg.Add(1)
go func() {
consistencyLoop:
for {
select {
case <-shutdownCtx.Done():
break consistencyLoop
case <-time.After(consistencyInterval):
checkWalletConsistency()
}
}
log.Debugf("Consistency checker stopped")
wg.Done()
}()
// Loop forever attempting to create a connection to the dcrd server for
// notifications.
wg.Add(1)
go func() {
notifierLoop:
for {
err := connectNotifier(shutdownCtx, dcrdWithNotif)
if err != nil {
log.Errorf("dcrd connect error: %v", err)
}
// If context is done (vspd is shutting down), return,
// otherwise wait 15 seconds and try to reconnect.
select {
case <-shutdownCtx.Done():
break notifierLoop
case <-time.After(15 * time.Second):
}
}
log.Debugf("Notification connector stopped")
wg.Done()
}()
}
// checkWalletConsistency will retrieve all votable tickets from the database
// and ensure they are all added to voting wallets with the correct vote // and ensure they are all added to voting wallets with the correct vote
// choices. // choices.
func checkWalletConsistency() { func CheckWalletConsistency(dcrdRPC rpc.DcrdConnect, walletRPC rpc.WalletConnect, db *database.VspDatabase) {
const funcName = "checkWalletConsistency" const funcName = "CheckWalletConsistency"
log.Info("Checking voting wallet consistency") log.Info("Checking voting wallet consistency")

View File

@ -37,7 +37,7 @@ type client struct {
notifier wsrpc.Notifier notifier wsrpc.Notifier
} }
func setup(user, pass, addr string, cert []byte, n wsrpc.Notifier) *client { func setup(user, pass, addr string, cert []byte) *client {
// Create TLS options. // Create TLS options.
pool := x509.NewCertPool() pool := x509.NewCertPool()
@ -63,7 +63,7 @@ func setup(user, pass, addr string, cert []byte, n wsrpc.Notifier) *client {
var mu sync.Mutex var mu sync.Mutex
var c *wsrpc.Client var c *wsrpc.Client
fullAddr := "wss://" + addr + "/ws" fullAddr := "wss://" + addr + "/ws"
return &client{&mu, c, fullAddr, tlsOpt, authOpt, n} return &client{&mu, c, fullAddr, tlsOpt, authOpt, nil}
} }
func (c *client) Close() { func (c *client) Close() {

View File

@ -5,10 +5,8 @@
package rpc package rpc
import ( import (
"bytes"
"context" "context"
"encoding/hex" "encoding/hex"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -45,15 +43,25 @@ type DcrdConnect struct {
params *chaincfg.Params params *chaincfg.Params
} }
func SetupDcrd(user, pass, addr string, cert []byte, n wsrpc.Notifier, params *chaincfg.Params) DcrdConnect { func SetupDcrd(user, pass, addr string, cert []byte, params *chaincfg.Params) DcrdConnect {
return DcrdConnect{ return DcrdConnect{
client: setup(user, pass, addr, cert, n), client: setup(user, pass, addr, cert),
params: params, params: params,
} }
} }
// BlockConnectedHandler attaches a blockconnected notification handler to the
// dcrd client. Every time a notification is received, the header of the
// connected block is sent to the provided channel.
func (d *DcrdConnect) BlockConnectedHandler(blockConnected chan *wire.BlockHeader) {
d.client.notifier = &blockConnectedHandler{
blockConnected: blockConnected,
}
}
func (d *DcrdConnect) Close() { func (d *DcrdConnect) Close() {
d.client.Close() d.client.Close()
log.Debug("dcrd client closed")
} }
// Client creates a new DcrdRPC client instance. Returns an error if dialing // Client creates a new DcrdRPC client instance. Returns an error if dialing
@ -62,7 +70,7 @@ func (d *DcrdConnect) Client() (*DcrdRPC, string, error) {
ctx := context.TODO() ctx := context.TODO()
c, newConnection, err := d.client.dial(ctx) c, newConnection, err := d.client.dial(ctx)
if err != nil { if err != nil {
return nil, d.client.addr, fmt.Errorf("dcrd connection error: %w", err) return nil, d.client.addr, fmt.Errorf("dcrd dial error: %w", err)
} }
// If this is a reused connection, we don't need to validate the dcrd config // If this is a reused connection, we don't need to validate the dcrd config
@ -116,6 +124,16 @@ func (d *DcrdConnect) Client() (*DcrdRPC, string, error) {
return nil, d.client.addr, errors.New("dcrd does not have transaction index enabled (--txindex)") return nil, d.client.addr, errors.New("dcrd does not have transaction index enabled (--txindex)")
} }
// Request blockconnected notifications.
if d.client.notifier != nil {
err = c.Call(ctx, "notifyblocks", nil)
if err != nil {
return nil, d.client.addr, fmt.Errorf("notifyblocks failed: %w", err)
}
}
log.Debugf("Connected to dcrd")
return &DcrdRPC{c, ctx}, d.client.addr, nil return &DcrdRPC{c, ctx}, d.client.addr, nil
} }
@ -210,26 +228,3 @@ func (c *DcrdRPC) ExistsLiveTicket(ticketHash string) (bool, error) {
return bitset.Bytes(existsBytes).Get(0), nil return bitset.Bytes(existsBytes).Get(0), nil
} }
// ParseBlockConnectedNotification extracts the block header from a
// blockconnected JSON-RPC notification.
func ParseBlockConnectedNotification(params json.RawMessage) (*wire.BlockHeader, error) {
var notif []string
err := json.Unmarshal(params, &notif)
if err != nil {
return nil, fmt.Errorf("json unmarshal error: %w", err)
}
if len(notif) == 0 {
return nil, errors.New("notification is empty")
}
rawHeader := notif[0]
var header wire.BlockHeader
err = header.Deserialize(hex.NewDecoder(bytes.NewReader([]byte(rawHeader))))
if err != nil {
return nil, fmt.Errorf("error creating block header from bytes: %w", err)
}
return &header, nil
}

View File

@ -34,7 +34,7 @@ func SetupWallet(user, pass, addrs []string, cert [][]byte, params *chaincfg.Par
clients := make([]*client, len(addrs)) clients := make([]*client, len(addrs))
for i := 0; i < len(addrs); i++ { for i := 0; i < len(addrs); i++ {
clients[i] = setup(user[i], pass[i], addrs[i], cert[i], nil) clients[i] = setup(user[i], pass[i], addrs[i], cert[i])
} }
return WalletConnect{ return WalletConnect{
@ -47,6 +47,7 @@ func (w *WalletConnect) Close() {
for _, client := range w.clients { for _, client := range w.clients {
client.Close() client.Close()
} }
log.Debug("dcrwallet clients closed")
} }
// Clients loops over each wallet and tries to establish a connection. It // Clients loops over each wallet and tries to establish a connection. It
@ -61,7 +62,7 @@ func (w *WalletConnect) Clients() ([]*WalletRPC, []string) {
c, newConnection, err := connect.dial(ctx) c, newConnection, err := connect.dial(ctx)
if err != nil { if err != nil {
log.Errorf("dcrwallet connection error: %v", err) log.Errorf("dcrwallet dial error: %v", err)
failedConnections = append(failedConnections, connect.addr) failedConnections = append(failedConnections, connect.addr)
continue continue
} }

61
rpc/notifs.go Normal file
View File

@ -0,0 +1,61 @@
package rpc
import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/decred/dcrd/wire"
)
type blockConnectedHandler struct {
blockConnected chan *wire.BlockHeader
}
// Notify is called every time a notification is received from dcrd client.
// A wsrpc.Client will never call Notify concurrently. Notify should not return
// an error because that will cause the client to close and no further
// notifications will be received until a new connection is established.
func (n *blockConnectedHandler) Notify(method string, msg json.RawMessage) error {
if method != "blockconnected" {
return nil
}
header, err := parseBlockConnected(msg)
if err != nil {
log.Errorf("Failed to parse dcrd block notification: %v", err)
return nil
}
n.blockConnected <- header
return nil
}
func (n *blockConnectedHandler) Close() error {
return nil
}
// parseBlockConnected extracts the block header from a
// blockconnected JSON-RPC notification.
func parseBlockConnected(msg json.RawMessage) (*wire.BlockHeader, error) {
var notif []string
err := json.Unmarshal(msg, &notif)
if err != nil {
return nil, fmt.Errorf("json unmarshal error: %w", err)
}
if len(notif) == 0 {
return nil, errors.New("notification is empty")
}
var header wire.BlockHeader
err = header.Deserialize(hex.NewDecoder(bytes.NewReader([]byte(notif[0]))))
if err != nil {
return nil, fmt.Errorf("error creating block header from bytes: %w", err)
}
return &header, nil
}

77
vspd.go
View File

@ -10,7 +10,9 @@ import (
"os" "os"
"runtime" "runtime"
"sync" "sync"
"time"
"github.com/decred/dcrd/wire"
"github.com/decred/vspd/background" "github.com/decred/vspd/background"
"github.com/decred/vspd/database" "github.com/decred/vspd/database"
"github.com/decred/vspd/rpc" "github.com/decred/vspd/rpc"
@ -24,6 +26,9 @@ import (
// the database is deleted. // the database is deleted.
const maxVoteChangeRecords = 10 const maxVoteChangeRecords = 10
// consistencyInterval is the time period between wallet consistency checks.
const consistencyInterval = 30 * time.Minute
func main() { func main() {
// Create a context that is cancelled when a shutdown request is received // Create a context that is cancelled when a shutdown request is received
// through an interrupt signal. // through an interrupt signal.
@ -81,13 +86,51 @@ func run(shutdownCtx context.Context) int {
// Create RPC client for local dcrd instance (used for broadcasting and // Create RPC client for local dcrd instance (used for broadcasting and
// checking the status of fee transactions). // checking the status of fee transactions).
dcrd := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, cfg.DcrdHost, cfg.dcrdCert, nil, cfg.netParams.Params) dcrd := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, cfg.DcrdHost, cfg.dcrdCert, cfg.netParams.Params)
defer dcrd.Close() defer dcrd.Close()
// Create RPC client for remote dcrwallet instance (used for voting). // Create RPC client for remote dcrwallet instance (used for voting).
wallets := rpc.SetupWallet(cfg.walletUsers, cfg.walletPasswords, cfg.walletHosts, cfg.walletCerts, cfg.netParams.Params) wallets := rpc.SetupWallet(cfg.walletUsers, cfg.walletPasswords, cfg.walletHosts, cfg.walletCerts, cfg.netParams.Params)
defer wallets.Close() defer wallets.Close()
// Create a channel to receive blockConnected notifications from dcrd.
notifChan := make(chan *wire.BlockHeader)
shutdownWg.Add(1)
go func() {
for {
select {
case <-shutdownCtx.Done():
shutdownWg.Done()
return
case header := <-notifChan:
log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String())
background.BlockConnected(dcrd, wallets, db)
}
}
}()
// Attach notification listener to dcrd client.
dcrd.BlockConnectedHandler(notifChan)
// Loop forever attempting ensuring a dcrd connection is available, so
// notifications are received.
shutdownWg.Add(1)
go func() {
for {
select {
case <-shutdownCtx.Done():
shutdownWg.Done()
return
case <-time.After(time.Second * 15):
// Ensure dcrd client is still connected.
_, _, err := dcrd.Client()
if err != nil {
log.Errorf("dcrd connect error: %v", err)
}
}
}
}()
// Ensure all data in database is present and up-to-date. // Ensure all data in database is present and up-to-date.
err = db.CheckIntegrity(dcrd) err = db.CheckIntegrity(dcrd)
if err != nil { if err != nil {
@ -95,6 +138,28 @@ func run(shutdownCtx context.Context) int {
log.Errorf("Could not check database integrity: %v", err) log.Errorf("Could not check database integrity: %v", err)
} }
// Run the block connected handler now to catch up with any blocks mined
// while vspd was shut down.
background.BlockConnected(dcrd, wallets, db)
// Run voting wallet consistency check now to ensure all wallets are up to
// date.
background.CheckWalletConsistency(dcrd, wallets, db)
// Run voting wallet consistency check periodically.
shutdownWg.Add(1)
go func() {
for {
select {
case <-shutdownCtx.Done():
shutdownWg.Done()
return
case <-time.After(consistencyInterval):
background.CheckWalletConsistency(dcrd, wallets, db)
}
}
}()
// Create and start webapi server. // Create and start webapi server.
apiCfg := webapi.Config{ apiCfg := webapi.Config{
VSPFee: cfg.VSPFee, VSPFee: cfg.VSPFee,
@ -118,16 +183,6 @@ func run(shutdownCtx context.Context) int {
return 1 return 1
} }
// Create a dcrd client with a blockconnected notification handler.
notifHandler := background.NotificationHandler{ShutdownWg: &shutdownWg}
dcrdWithNotifs := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass,
cfg.DcrdHost, cfg.dcrdCert, &notifHandler, cfg.netParams.Params)
defer dcrdWithNotifs.Close()
// Start background process which will continually attempt to reconnect to
// dcrd if the connection drops.
background.Start(shutdownCtx, &shutdownWg, db, dcrd, dcrdWithNotifs, wallets)
// Wait for shutdown tasks to complete before running deferred tasks and // Wait for shutdown tasks to complete before running deferred tasks and
// returning. // returning.
shutdownWg.Wait() shutdownWg.Wait()