Simplify dcrd and dcrwallet client creation.

This commit is contained in:
jholdstock 2020-06-02 13:51:53 +01:00 committed by David Hill
parent fb6ea54f15
commit 7da79c7561
8 changed files with 173 additions and 156 deletions

View File

@ -14,7 +14,7 @@ import (
type NotificationHandler struct { type NotificationHandler struct {
Ctx context.Context Ctx context.Context
Db *database.VspDatabase Db *database.VspDatabase
WalletConnect []rpc.Connect Wallets rpc.WalletConnect
NetParams *chaincfg.Params NetParams *chaincfg.Params
closed chan struct{} closed chan struct{}
dcrdClient *rpc.DcrdRPC dcrdClient *rpc.DcrdRPC
@ -108,21 +108,12 @@ func (n *NotificationHandler) Notify(method string, params json.RawMessage) erro
return nil return nil
} }
walletClients := make([]*rpc.WalletRPC, len(n.WalletConnect)) walletClients, err := n.Wallets.Clients(n.Ctx, n.NetParams)
for i := 0; i < len(n.WalletConnect); i++ {
walletConn, err := n.WalletConnect[i]()
if err != nil { if err != nil {
log.Errorf("dcrwallet connection error: %v", err) log.Error(err)
// If this fails, there is nothing more we can do. Return. // If this fails, there is nothing more we can do. Return.
return nil return nil
} }
walletClients[i], err = rpc.WalletClient(n.Ctx, walletConn, n.NetParams)
if err != nil {
log.Errorf("dcrwallet client error: %v", err)
// If this fails, there is nothing more we can do. Return.
return nil
}
}
for _, ticket := range unconfirmedFees { for _, ticket := range unconfirmedFees {
feeTx, err := n.dcrdClient.GetRawTransaction(ticket.FeeTxHash) feeTx, err := n.dcrdClient.GetRawTransaction(ticket.FeeTxHash)
@ -186,12 +177,9 @@ func (n *NotificationHandler) Close() error {
return nil return nil
} }
func (n *NotificationHandler) connect(dcrdConnect rpc.Connect) error { func (n *NotificationHandler) connect(dcrdConnect rpc.DcrdConnect) error {
dcrdConn, err := dcrdConnect() var err error
if err != nil { n.dcrdClient, err = dcrdConnect.Client(n.Ctx, n.NetParams)
return err
}
n.dcrdClient, err = rpc.DcrdClient(n.Ctx, dcrdConn, n.NetParams)
if err != nil { if err != nil {
return err return err
} }
@ -213,7 +201,7 @@ func (n *NotificationHandler) connect(dcrdConnect rpc.Connect) error {
} }
} }
func Start(n *NotificationHandler, dcrdConnect rpc.Connect) { func Start(n *NotificationHandler, dcrdConnect rpc.DcrdConnect) {
// Loop forever attempting to create a connection to the dcrd server. // Loop forever attempting to create a connection to the dcrd server.
go func() { go func() {

40
main.go
View File

@ -64,56 +64,38 @@ func run(ctx context.Context) error {
// Create RPC client for local dcrd instance (used for broadcasting and // Create RPC client for local dcrd instance (used for broadcasting and
// checking the status of fee transactions). // checking the status of fee transactions).
// Dial once just to validate config. dcrd := rpc.SetupDcrd(ctx, &shutdownWg, cfg.DcrdUser, cfg.DcrdPass,
dcrdConnect := rpc.Setup(ctx, &shutdownWg, cfg.DcrdUser, cfg.DcrdPass,
cfg.DcrdHost, cfg.dcrdCert, nil) cfg.DcrdHost, cfg.dcrdCert, nil)
dcrdConn, err := dcrdConnect() // Dial once just to validate config.
_, err = dcrd.Client(ctx, cfg.netParams.Params)
if err != nil { if err != nil {
log.Errorf("dcrd connection error: %v", err) log.Error(err)
requestShutdown()
shutdownWg.Wait()
return err
}
_, err = rpc.DcrdClient(ctx, dcrdConn, cfg.netParams.Params)
if err != nil {
log.Errorf("dcrd client error: %v", err)
requestShutdown() requestShutdown()
shutdownWg.Wait() shutdownWg.Wait()
return err return err
} }
// Create RPC client for remote dcrwallet instance (used for voting). // Create RPC client for remote dcrwallet instance (used for voting).
wallets := rpc.SetupWallet(ctx, &shutdownWg, cfg.WalletUser, cfg.WalletPass,
cfg.WalletHosts, cfg.walletCert)
// Dial once just to validate config. // Dial once just to validate config.
walletConnect := make([]rpc.Connect, len(cfg.WalletHosts)) _, err = wallets.Clients(ctx, cfg.netParams.Params)
walletConn := make([]rpc.Caller, len(cfg.WalletHosts))
for i := 0; i < len(cfg.WalletHosts); i++ {
walletConnect[i] = rpc.Setup(ctx, &shutdownWg, cfg.WalletUser, cfg.WalletPass,
cfg.WalletHosts[i], cfg.walletCert, nil)
walletConn[i], err = walletConnect[i]()
if err != nil { if err != nil {
log.Errorf("dcrwallet connection error: %v", err) log.Error(err)
requestShutdown() requestShutdown()
shutdownWg.Wait() shutdownWg.Wait()
return err return err
} }
_, err = rpc.WalletClient(ctx, walletConn[i], cfg.netParams.Params)
if err != nil {
log.Errorf("dcrwallet client error: %v", err)
requestShutdown()
shutdownWg.Wait()
return err
}
}
// Create a dcrd client with an attached notification handler which will run // Create a dcrd client with an attached notification handler which will run
// in the background. // in the background.
notifHandler := &background.NotificationHandler{ notifHandler := &background.NotificationHandler{
Ctx: ctx, Ctx: ctx,
Db: db, Db: db,
WalletConnect: walletConnect, Wallets: wallets,
NetParams: cfg.netParams.Params, NetParams: cfg.netParams.Params,
} }
dcrdWithNotifHandler := rpc.Setup(ctx, &shutdownWg, cfg.DcrdUser, cfg.DcrdPass, dcrdWithNotifHandler := rpc.SetupDcrd(ctx, &shutdownWg, cfg.DcrdUser, cfg.DcrdPass,
cfg.DcrdHost, cfg.dcrdCert, notifHandler) cfg.DcrdHost, cfg.dcrdCert, notifHandler)
// Start background process which will continually attempt to reconnect to // Start background process which will continually attempt to reconnect to
@ -129,7 +111,7 @@ func run(ctx context.Context) error {
VspClosed: cfg.VspClosed, VspClosed: cfg.VspClosed,
} }
err = webapi.Start(ctx, shutdownRequestChannel, &shutdownWg, cfg.Listen, db, err = webapi.Start(ctx, shutdownRequestChannel, &shutdownWg, cfg.Listen, db,
dcrdConnect, walletConnect, cfg.WebServerDebug, cfg.FeeXPub, apiCfg) dcrd, wallets, cfg.WebServerDebug, cfg.FeeXPub, apiCfg)
if err != nil { if err != nil {
log.Errorf("Failed to initialize webapi: %v", err) log.Errorf("Failed to initialize webapi: %v", err)
requestShutdown() requestShutdown()

View File

@ -22,14 +22,16 @@ type Caller interface {
Call(ctx context.Context, method string, res interface{}, args ...interface{}) error Call(ctx context.Context, method string, res interface{}, args ...interface{}) error
} }
// Connect dials and returns a connected RPC client. // connect dials and returns a connected RPC client. A boolean indicates whether
type Connect func() (Caller, error) // this connection is new (true), or if it is an existing connection which is
// being reused (false).
type connect func() (Caller, bool, error)
// Setup accepts RPC connection details, creates an RPC client, and returns a // setup accepts RPC connection details, creates an RPC client, and returns a
// function which can be called to access the client. The returned function will // function which can be called to access the client. The returned function will
// try to handle any client disconnects by attempting to reconnect, but will // try to handle any client disconnects by attempting to reconnect, but will
// return an error if a new connection cannot be established. // return an error if a new connection cannot be established.
func Setup(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass, addr string, cert []byte, n wsrpc.Notifier) Connect { func setup(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass, addr string, cert []byte, n wsrpc.Notifier) connect {
// Create TLS options. // Create TLS options.
pool := x509.NewCertPool() pool := x509.NewCertPool()
@ -67,7 +69,7 @@ func Setup(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass, addr str
shutdownWg.Done() shutdownWg.Done()
}() }()
return func() (Caller, error) { return func() (Caller, bool, error) {
defer mu.Unlock() defer mu.Unlock()
mu.Lock() mu.Lock()
@ -77,16 +79,16 @@ func Setup(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass, addr str
log.Debugf("RPC client %s errored (%v); reconnecting...", addr, c.Err()) log.Debugf("RPC client %s errored (%v); reconnecting...", addr, c.Err())
c = nil c = nil
default: default:
return c, nil return c, false, nil
} }
} }
var err error var err error
c, err = wsrpc.Dial(ctx, fullAddr, tlsOpt, authOpt, wsrpc.WithNotifier(n)) c, err = wsrpc.Dial(ctx, fullAddr, tlsOpt, authOpt, wsrpc.WithNotifier(n))
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
return c, nil return c, true, nil
} }
} }

View File

@ -5,12 +5,14 @@ import (
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
"sync"
"github.com/decred/dcrd/blockchain/stake/v3" "github.com/decred/dcrd/blockchain/stake/v3"
"github.com/decred/dcrd/chaincfg/v3" "github.com/decred/dcrd/chaincfg/v3"
dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v2" dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v2"
"github.com/decred/dcrd/wire" "github.com/decred/dcrd/wire"
"github.com/jrick/bitset" "github.com/jrick/bitset"
"github.com/jrick/wsrpc/v2"
) )
const ( const (
@ -24,18 +26,36 @@ type DcrdRPC struct {
ctx context.Context ctx context.Context
} }
// DcrdClient creates a new DcrdRPC client instance from a caller. type DcrdConnect connect
func DcrdClient(ctx context.Context, c Caller, netParams *chaincfg.Params) (*DcrdRPC, error) {
func SetupDcrd(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass, addr string, cert []byte, n wsrpc.Notifier) DcrdConnect {
return DcrdConnect(setup(ctx, shutdownWg, user, pass, addr, cert, n))
}
// Client creates a new DcrdRPC client instance. Returns an error if dialing
// dcrd fails or if dcrd is misconfigured.
func (d *DcrdConnect) Client(ctx context.Context, netParams *chaincfg.Params) (*DcrdRPC, error) {
c, newConnection, err := connect(*d)()
if err != nil {
return nil, fmt.Errorf("dcrd connection error: %v", err)
}
// If this is a reused connection, we don't need to validate the dcrd config
// again.
if !newConnection {
return &DcrdRPC{c, ctx}, nil
}
// Verify dcrd is at the required api version. // Verify dcrd is at the required api version.
var verMap map[string]dcrdtypes.VersionResult var verMap map[string]dcrdtypes.VersionResult
err := c.Call(ctx, "version", &verMap) err = c.Call(ctx, "version", &verMap)
if err != nil { if err != nil {
return nil, fmt.Errorf("version check failed: %v", err) return nil, fmt.Errorf("dcrd version check failed: %v", err)
} }
dcrdVersion, exists := verMap["dcrdjsonrpcapi"] dcrdVersion, exists := verMap["dcrdjsonrpcapi"]
if !exists { if !exists {
return nil, fmt.Errorf("version response missing 'dcrdjsonrpcapi'") return nil, fmt.Errorf("dcrd version response missing 'dcrdjsonrpcapi'")
} }
if dcrdVersion.VersionString != requiredDcrdVersion { if dcrdVersion.VersionString != requiredDcrdVersion {
return nil, fmt.Errorf("wrong dcrd RPC version: got %s, expected %s", return nil, fmt.Errorf("wrong dcrd RPC version: got %s, expected %s",
@ -46,7 +66,7 @@ func DcrdClient(ctx context.Context, c Caller, netParams *chaincfg.Params) (*Dcr
var netID wire.CurrencyNet var netID wire.CurrencyNet
err = c.Call(ctx, "getcurrentnet", &netID) err = c.Call(ctx, "getcurrentnet", &netID)
if err != nil { if err != nil {
return nil, fmt.Errorf("getcurrentnet check failed: %v", err) return nil, fmt.Errorf("dcrd getcurrentnet check failed: %v", err)
} }
if netID != netParams.Net { if netID != netParams.Net {
return nil, fmt.Errorf("dcrd running on %s, expected %s", netID, netParams.Net) return nil, fmt.Errorf("dcrd running on %s, expected %s", netID, netParams.Net)
@ -56,7 +76,7 @@ func DcrdClient(ctx context.Context, c Caller, netParams *chaincfg.Params) (*Dcr
var info dcrdtypes.InfoChainResult var info dcrdtypes.InfoChainResult
err = c.Call(ctx, "getinfo", &info) err = c.Call(ctx, "getinfo", &info)
if err != nil { if err != nil {
return nil, fmt.Errorf("getinfo check failed: %v", err) return nil, fmt.Errorf("dcrd getinfo check failed: %v", err)
} }
if !info.TxIndex { if !info.TxIndex {
return nil, errors.New("dcrd does not have transaction index enabled (--txindex)") return nil, errors.New("dcrd does not have transaction index enabled (--txindex)")

View File

@ -3,6 +3,7 @@ package rpc
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
wallettypes "decred.org/dcrwallet/rpc/jsonrpc/types" wallettypes "decred.org/dcrwallet/rpc/jsonrpc/types"
"github.com/decred/dcrd/chaincfg/v3" "github.com/decred/dcrd/chaincfg/v3"
@ -21,12 +22,41 @@ type WalletRPC struct {
ctx context.Context ctx context.Context
} }
// WalletClient creates a new WalletRPC client instance from a caller. type WalletConnect []connect
func WalletClient(ctx context.Context, c Caller, netParams *chaincfg.Params) (*WalletRPC, error) {
func SetupWallet(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass string, addrs []string, cert []byte) WalletConnect {
walletConnect := make(WalletConnect, len(addrs))
for i := 0; i < len(addrs); i++ {
walletConnect[i] = setup(ctx, shutdownWg, user, pass,
addrs[i], cert, nil)
}
return walletConnect
}
// Clients creates an array of new WalletRPC client instances. Returns an error
// if dialing any wallet fails, or if any wallet is misconfigured.
func (w *WalletConnect) Clients(ctx context.Context, netParams *chaincfg.Params) ([]*WalletRPC, error) {
walletClients := make([]*WalletRPC, len(*w))
for i := 0; i < len(*w); i++ {
c, newConnection, err := []connect(*w)[i]()
if err != nil {
return nil, fmt.Errorf("dcrwallet connection error: %v", err)
}
// If this is a reused connection, we don't need to validate the
// dcrwallet config again.
if !newConnection {
walletClients[i] = &WalletRPC{c, ctx}
continue
}
// Verify dcrwallet is at the required api version. // Verify dcrwallet is at the required api version.
var verMap map[string]dcrdtypes.VersionResult var verMap map[string]dcrdtypes.VersionResult
err := c.Call(ctx, "version", &verMap) err = c.Call(ctx, "version", &verMap)
if err != nil { if err != nil {
return nil, fmt.Errorf("version check on dcrwallet '%s' failed: %v", return nil, fmt.Errorf("version check on dcrwallet '%s' failed: %v",
c.String(), err) c.String(), err)
@ -48,6 +78,12 @@ func WalletClient(ctx context.Context, c Caller, netParams *chaincfg.Params) (*W
return nil, fmt.Errorf("walletinfo check on dcrwallet '%s' failed: %v", return nil, fmt.Errorf("walletinfo check on dcrwallet '%s' failed: %v",
c.String(), err) c.String(), err)
} }
// TODO: The following 3 checks should probably just log a warning/error and
// not return.
// addtransaction and setvotechoice can still be used with a locked wallet.
// importprivkey will fail if wallet is locked.
if !walletInfo.Voting { if !walletInfo.Voting {
return nil, fmt.Errorf("wallet '%s' has voting disabled", c.String()) return nil, fmt.Errorf("wallet '%s' has voting disabled", c.String())
} }
@ -70,7 +106,11 @@ func WalletClient(ctx context.Context, c Caller, netParams *chaincfg.Params) (*W
c.String(), netID, netParams.Net) c.String(), netID, netParams.Net)
} }
return &WalletRPC{c, ctx}, nil walletClients[i] = &WalletRPC{c, ctx}
}
return walletClients, nil
} }
func (c *WalletRPC) AddTransaction(blockHash, txHex string) error { func (c *WalletRPC) AddTransaction(blockHash, txHex string) error {

View File

@ -16,43 +16,28 @@ type ticketHashRequest struct {
// context for downstream handlers to make use of. // context for downstream handlers to make use of.
func withDcrdClient() gin.HandlerFunc { func withDcrdClient() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
dcrdConn, err := dcrdConnect() client, err := dcrd.Client(c, cfg.NetParams)
if err != nil { if err != nil {
log.Errorf("dcrd connection error: %v", err) log.Error(err)
sendErrorResponse("dcrd RPC error", http.StatusInternalServerError, c)
return
}
dcrdClient, err := rpc.DcrdClient(c, dcrdConn, cfg.NetParams)
if err != nil {
log.Errorf("dcrd client error: %v", err)
sendErrorResponse("dcrd RPC error", http.StatusInternalServerError, c) sendErrorResponse("dcrd RPC error", http.StatusInternalServerError, c)
return return
} }
c.Set("DcrdClient", dcrdClient) c.Set("DcrdClient", client)
} }
} }
// withWalletClient middleware adds a voting wallet client to the request // withWalletClients middleware adds a voting wallet clients to the request
// context for downstream handlers to make use of. // context for downstream handlers to make use of.
func withWalletClient() gin.HandlerFunc { func withWalletClients() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
walletClient := make([]*rpc.WalletRPC, len(walletConnect)) clients, err := wallets.Clients(c, cfg.NetParams)
for i := 0; i < len(walletConnect); i++ {
walletConn, err := walletConnect[i]()
if err != nil { if err != nil {
log.Errorf("dcrwallet connection error: %v", err) log.Error(err)
sendErrorResponse("dcrwallet RPC error", http.StatusInternalServerError, c) sendErrorResponse("dcrwallet RPC error", http.StatusInternalServerError, c)
return return
} }
walletClient[i], err = rpc.WalletClient(c, walletConn, cfg.NetParams) c.Set("WalletClients", clients)
if err != nil {
log.Errorf("dcrwallet client error: %v", err)
sendErrorResponse("dcrwallet RPC error", http.StatusInternalServerError, c)
return
}
}
c.Set("WalletClient", walletClient)
} }
} }

View File

@ -17,7 +17,7 @@ func setVoteChoices(c *gin.Context) {
rawRequest := c.MustGet("RawRequest").([]byte) rawRequest := c.MustGet("RawRequest").([]byte)
ticket := c.MustGet("Ticket").(database.Ticket) ticket := c.MustGet("Ticket").(database.Ticket)
knownTicket := c.MustGet("KnownTicket").(bool) knownTicket := c.MustGet("KnownTicket").(bool)
walletClients := c.MustGet("WalletClient").([]*rpc.WalletRPC) walletClients := c.MustGet("WalletClients").([]*rpc.WalletRPC)
if !knownTicket { if !knownTicket {
log.Warnf("Invalid ticket from %s", c.ClientIP()) log.Warnf("Invalid ticket from %s", c.ClientIP())

View File

@ -34,19 +34,19 @@ const (
var cfg Config var cfg Config
var db *database.VspDatabase var db *database.VspDatabase
var dcrdConnect rpc.Connect var dcrd rpc.DcrdConnect
var walletConnect []rpc.Connect var wallets rpc.WalletConnect
var addrGen *addressGenerator var addrGen *addressGenerator
var signPrivKey ed25519.PrivateKey var signPrivKey ed25519.PrivateKey
var signPubKey ed25519.PublicKey var signPubKey ed25519.PublicKey
func Start(ctx context.Context, requestShutdownChan chan struct{}, shutdownWg *sync.WaitGroup, func Start(ctx context.Context, requestShutdownChan chan struct{}, shutdownWg *sync.WaitGroup,
listen string, vdb *database.VspDatabase, dConnect rpc.Connect, wConnect []rpc.Connect, debugMode bool, feeXPub string, config Config) error { listen string, vdb *database.VspDatabase, dConnect rpc.DcrdConnect, wConnect rpc.WalletConnect, debugMode bool, feeXPub string, config Config) error {
cfg = config cfg = config
db = vdb db = vdb
dcrdConnect = dConnect dcrd = dConnect
walletConnect = wConnect wallets = wConnect
var err error var err error
@ -184,7 +184,7 @@ func router(debugMode bool) *gin.Engine {
// These API routes access dcrd and the voting wallets, and they need // These API routes access dcrd and the voting wallets, and they need
// authentication. // authentication.
both := router.Group("/api").Use( both := router.Group("/api").Use(
withDcrdClient(), withWalletClient(), vspAuth(), withDcrdClient(), withWalletClients(), vspAuth(),
) )
both.POST("/setvotechoices", setVoteChoices) both.POST("/setvotechoices", setVoteChoices)