From 7dbee5b6c24f3da8b9c2587b8b1cc9777094b58f Mon Sep 17 00:00:00 2001 From: Jamie Holdstock Date: Mon, 18 May 2020 15:19:35 +0100 Subject: [PATCH] Add dcrwallet RPC client (#25) --- config.go | 54 +++++++++++++++++++++++++++++++-- log.go | 4 +++ main.go | 17 ++++++++--- rpc/client.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++ rpc/log.go | 26 ++++++++++++++++ webapi/methods.go | 27 ++++++++++++----- webapi/server.go | 10 +++--- 7 files changed, 197 insertions(+), 18 deletions(-) create mode 100644 rpc/client.go create mode 100644 rpc/log.go diff --git a/config.go b/config.go index e883994..9dfa460 100644 --- a/config.go +++ b/config.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io/ioutil" + "net" "os" "os/user" "path/filepath" @@ -24,6 +25,7 @@ var ( defaultHomeDir = dcrutil.AppDataDir("dcrvsp", false) defaultConfigFilename = "dcrvsp.conf" defaultConfigFile = filepath.Join(defaultHomeDir, defaultConfigFilename) + defaultWalletHost = "127.0.0.1" ) // config defines the configuration options for the VSP. @@ -34,11 +36,16 @@ type config struct { VSPFee float64 `long:"vspfee" ini-name:"vspfee" description:"The fee percentage charged for VSP use. eg. 0.01 (1%), 0.05 (5%)."` HomeDir string `long:"homedir" ini-name:"homedir" no-ini:"true" description:"Path to application home directory. Used for storing VSP database and logs."` ConfigFile string `long:"configfile" ini-name:"configfile" no-ini:"true" description:"Path to configuration file."` + WalletHost string `long:"wallethost" ini-name:"wallethost" description:"The ip:port to establish a JSON-RPC connection with dcrwallet."` + WalletUser string `long:"walletuser" ini-name:"walletuser" description:"Username for dcrwallet RPC connections."` + WalletPass string `long:"walletpass" ini-name:"walletpass" description:"Password for dcrwallet RPC connections."` + WalletCert string `long:"walletcert" ini-name:"walletcert" description:"The dcrwallet RPC certificate file."` signKey ed25519.PrivateKey pubKey ed25519.PublicKey dbPath string netParams *netParams + dcrwCert []byte } // fileExists reports whether the named file or directory exists. @@ -103,6 +110,16 @@ func cleanAndExpandPath(path string) string { return filepath.Join(homeDir, path) } +// normalizeAddress returns addr with the passed default port appended if +// there is not already a port specified. +func normalizeAddress(addr, defaultPort string) string { + _, _, err := net.SplitHostPort(addr) + if err != nil { + return net.JoinHostPort(addr, defaultPort) + } + return addr +} + // loadConfig initializes and parses the config using a config file and command // line options. // @@ -125,6 +142,7 @@ func loadConfig() (*config, error) { VSPFee: defaultVSPFee, HomeDir: defaultHomeDir, ConfigFile: defaultConfigFile, + WalletHost: defaultWalletHost, } // Pre-parse the command line options to see if an alternative config @@ -134,7 +152,6 @@ func loadConfig() (*config, error) { preCfg := cfg preParser := flags.NewParser(&preCfg, flags.HelpFlag) - _, err := preParser.Parse() if err != nil { if e, ok := err.(*flags.Error); ok && e.Type != flags.ErrHelp { @@ -189,7 +206,7 @@ func loadConfig() (*config, error) { } // Load additional config from file. - parser := flags.NewParser(&preCfg, flags.Default) + parser := flags.NewParser(&cfg, flags.Default) err = flags.NewIniParser(parser).ParseFile(preCfg.ConfigFile) if err != nil { @@ -216,6 +233,39 @@ func loadConfig() (*config, error) { cfg.netParams = &simNetParams } + // Ensure the dcrwallet RPC username is set. + if cfg.WalletUser == "" { + str := "%s: the walletuser option is not set" + err := fmt.Errorf(str, funcName) + return nil, err + } + + // Ensure the dcrwallet RPC password is set. + if cfg.WalletPass == "" { + str := "%s: the walletpass option is not set" + err := fmt.Errorf(str, funcName) + return nil, err + } + + // Ensure the dcrwallet RPC cert path is set. + if cfg.WalletCert == "" { + str := "%s: the walletcert option is not set" + err := fmt.Errorf(str, funcName) + return nil, err + } + + // Add default port for the active network if there is no port specified. + cfg.WalletHost = normalizeAddress(cfg.WalletHost, cfg.netParams.WalletRPCServerPort) + + // Load dcrwallet RPC certificate. + cfg.WalletCert = cleanAndExpandPath(cfg.WalletCert) + cfg.dcrwCert, err = ioutil.ReadFile(cfg.WalletCert) + if err != nil { + str := "%s: failed to read dcrwallet cert file: %s" + err := fmt.Errorf(str, funcName, err) + return nil, err + } + // Create the data directory. dataDir := filepath.Join(cfg.HomeDir, "data", cfg.netParams.Name) err = os.MkdirAll(dataDir, 0700) diff --git a/log.go b/log.go index 2251417..c6a6e10 100644 --- a/log.go +++ b/log.go @@ -9,6 +9,7 @@ import ( "github.com/jrick/logrotate/rotator" "github.com/jholdstock/dcrvsp/database" + "github.com/jholdstock/dcrvsp/rpc" "github.com/jholdstock/dcrvsp/webapi" ) @@ -42,12 +43,14 @@ var ( log = backendLog.Logger("VSP") dbLog = backendLog.Logger(" DB") apiLog = backendLog.Logger("API") + rpcLog = backendLog.Logger("RPC") ) // Initialize package-global logger variables. func init() { database.UseLogger(dbLog) webapi.UseLogger(apiLog) + rpc.UseLogger(rpcLog) } // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -55,6 +58,7 @@ var subsystemLoggers = map[string]slog.Logger{ "VSP": log, " DB": dbLog, "API": apiLog, + "RPC": rpcLog, } // initLogRotator initializes the logging rotater to write logs to logFile and diff --git a/main.go b/main.go index a9207dc..ad038f6 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,8 @@ import ( "sync" "github.com/jholdstock/dcrvsp/database" + "github.com/jholdstock/dcrvsp/rpc" "github.com/jholdstock/dcrvsp/webapi" - "github.com/jrick/wsrpc/v2" ) func main() { @@ -34,7 +34,7 @@ func run(ctx context.Context) error { cfg, err := loadConfig() if err != nil { // Don't use logger here because it may not be initialised. - fmt.Fprintf(os.Stderr, "Config error: %v", err) + fmt.Fprintf(os.Stderr, "Config error: %v\n", err) return err } @@ -50,8 +50,15 @@ func run(ctx context.Context) error { return err } - // TODO: Create real RPC client. - var rpc *wsrpc.Client + // Create dcrwallet RPC client. + walletRPC := rpc.Setup(ctx, &shutdownWg, cfg.WalletUser, cfg.WalletPass, cfg.WalletHost, cfg.dcrwCert) + _, err = walletRPC() + if err != nil { + log.Errorf("dcrwallet RPC error: %v", err) + requestShutdown() + shutdownWg.Wait() + return err + } // Create and start webapi server. apiCfg := webapi.Config{ @@ -63,7 +70,7 @@ func run(ctx context.Context) error { // TODO: Make releaseMode properly configurable. Release mode enables very // detailed webserver logging and live reloading of HTML templates. releaseMode := true - err = webapi.Start(ctx, shutdownRequestChannel, &shutdownWg, cfg.Listen, db, rpc, releaseMode, apiCfg) + err = webapi.Start(ctx, shutdownRequestChannel, &shutdownWg, cfg.Listen, db, walletRPC, releaseMode, apiCfg) if err != nil { log.Errorf("Failed to initialise webapi: %v", err) requestShutdown() diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 0000000..518a973 --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,77 @@ +package rpc + +import ( + "context" + "crypto/tls" + "crypto/x509" + "sync" + + "github.com/jrick/wsrpc/v2" +) + +type Client func() (*wsrpc.Client, error) + +// Setup accepts RPC connection details, creates an RPC client, and returns a +// function which can be called to access the client. The returned function will +// try to handle any client disconnects by attempting to reconnect, but will +// return an error if a new connection cannot be established. +func Setup(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass, addr string, cert []byte) Client { + + // Create TLS options. + pool := x509.NewCertPool() + pool.AppendCertsFromPEM(cert) + tc := &tls.Config{RootCAs: pool} + tlsOpt := wsrpc.WithTLSConfig(tc) + + // Create authentication options. + authOpt := wsrpc.WithBasicAuth(user, pass) + + var mu sync.Mutex + var c *wsrpc.Client + + // Add the graceful shutdown to the waitgroup. + shutdownWg.Add(1) + go func() { + // Wait until shutdown is signaled before shutting down. + <-ctx.Done() + + if c != nil { + select { + case <-c.Done(): + log.Debugf("RPC already closed (%s)", addr) + + default: + log.Debugf("Closing RPC (%s)...", addr) + if err := c.Close(); err != nil { + log.Errorf("Failed to close RPC (%s): %v", addr, err) + } else { + log.Debugf("RPC closed (%s)", addr) + } + } + } + shutdownWg.Done() + }() + + return func() (*wsrpc.Client, error) { + defer mu.Unlock() + mu.Lock() + + if c != nil { + select { + case <-c.Done(): + log.Infof("RPC client errored (%v); reconnecting...", c.Err()) + c = nil + default: + return c, nil + } + } + + fullAddr := "wss://" + addr + "/ws" + c, err := wsrpc.Dial(ctx, fullAddr, tlsOpt, authOpt) + if err != nil { + return nil, err + } + log.Infof("Dialed RPC websocket %v", addr) + return c, nil + } +} diff --git a/rpc/log.go b/rpc/log.go new file mode 100644 index 0000000..2229194 --- /dev/null +++ b/rpc/log.go @@ -0,0 +1,26 @@ +package rpc + +import ( + "github.com/decred/slog" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log slog.Logger + +// The default amount of logging is none. +func init() { + DisableLog() +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + log = slog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger slog.Logger) { + log = logger +} diff --git a/webapi/methods.go b/webapi/methods.go index 7664ab3..3a7fa11 100644 --- a/webapi/methods.go +++ b/webapi/methods.go @@ -111,8 +111,14 @@ func feeAddress(c *gin.Context) { ctx := c.Request.Context() + walletClient, err := walletRPC() + if err != nil { + c.AbortWithError(http.StatusBadRequest, errors.New("wallet RPC error")) + return + } + var resp dcrdtypes.TxRawResult - err = nodeConnection.Call(ctx, "getrawtransaction", &resp, txHash.String(), true) + err = walletClient.Call(ctx, "getrawtransaction", &resp, txHash.String(), true) if err != nil { c.AbortWithError(http.StatusBadRequest, errors.New("unknown transaction")) return @@ -165,14 +171,14 @@ func feeAddress(c *gin.Context) { // txrules.StakePoolTicketFee, and store them in the database // for processing by payfee var blockHeader dcrdtypes.GetBlockHeaderVerboseResult - err = nodeConnection.Call(ctx, "getblockheader", &blockHeader, resp.BlockHash, true) + err = walletClient.Call(ctx, "getblockheader", &blockHeader, resp.BlockHash, true) if err != nil { c.AbortWithError(http.StatusInternalServerError, errors.New("RPC server error")) return } var newAddress string - err = nodeConnection.Call(ctx, "getnewaddress", &newAddress, "fees") + err = walletClient.Call(ctx, "getnewaddress", &newAddress, "fees") if err != nil { c.AbortWithError(http.StatusInternalServerError, errors.New("unable to generate fee address")) return @@ -345,19 +351,26 @@ findAddress: // PayFee2 is copied from the stakepoold implementation in #625 func PayFee2(ctx context.Context, ticketHash *chainhash.Hash, votingWIF *dcrutil.WIF, feeTx *wire.MsgTx) (string, error) { var resp dcrdtypes.TxRawResult - err := nodeConnection.Call(ctx, "getrawtransaction", &resp, ticketHash.String(), true) + + walletClient, err := walletRPC() + if err != nil { + fmt.Printf("PayFee: wallet RPC error: %v", err) + return "", errors.New("RPC server error") + } + + err = walletClient.Call(ctx, "getrawtransaction", &resp, ticketHash.String(), true) if err != nil { fmt.Printf("PayFee: getrawtransaction: %v", err) return "", errors.New("RPC server error") } - err = nodeConnection.Call(ctx, "addticket", nil, resp.Hex) + err = walletClient.Call(ctx, "addticket", nil, resp.Hex) if err != nil { fmt.Printf("PayFee: addticket: %v", err) return "", errors.New("RPC server error") } - err = nodeConnection.Call(ctx, "importprivkey", nil, votingWIF.String(), "imported", false, 0) + err = walletClient.Call(ctx, "importprivkey", nil, votingWIF.String(), "imported", false, 0) if err != nil { fmt.Printf("PayFee: importprivkey: %v", err) return "", errors.New("RPC server error") @@ -372,7 +385,7 @@ func PayFee2(ctx context.Context, ticketHash *chainhash.Hash, votingWIF *dcrutil } var res string - err = nodeConnection.Call(ctx, "sendrawtransaction", &res, hex.NewEncoder(feeTxBuf), false) + err = walletClient.Call(ctx, "sendrawtransaction", &res, hex.NewEncoder(feeTxBuf), false) if err != nil { fmt.Printf("PayFee: sendrawtransaction: %v", err) return "", errors.New("transaction failed to send") diff --git a/webapi/server.go b/webapi/server.go index 4e69105..7ae687f 100644 --- a/webapi/server.go +++ b/webapi/server.go @@ -8,10 +8,11 @@ import ( "sync" "time" + "github.com/jholdstock/dcrvsp/database" + "github.com/jholdstock/dcrvsp/rpc" + "github.com/decred/dcrd/chaincfg/v3" "github.com/gin-gonic/gin" - "github.com/jholdstock/dcrvsp/database" - "github.com/jrick/wsrpc/v2" ) type Config struct { @@ -23,10 +24,10 @@ type Config struct { var cfg Config var db *database.VspDatabase -var nodeConnection *wsrpc.Client +var walletRPC rpc.Client func Start(ctx context.Context, requestShutdownChan chan struct{}, shutdownWg *sync.WaitGroup, - listen string, db *database.VspDatabase, nodeConnection *wsrpc.Client, releaseMode bool, config Config) error { + listen string, db *database.VspDatabase, wRPC rpc.Client, releaseMode bool, config Config) error { // Create TCP listener. var listenConfig net.ListenConfig @@ -73,6 +74,7 @@ func Start(ctx context.Context, requestShutdownChan chan struct{}, shutdownWg *s }() cfg = config + walletRPC = wRPC return nil }