Close RPC connections after the web server is stopped.

Previously all of the shutdown tasks were running concurrently, which meant the RPC connections be closed before the webserver is finished using them.
This commit is contained in:
jholdstock 2020-06-10 12:07:46 +01:00 committed by David Hill
parent a304fc9890
commit 72c16ad2c7
4 changed files with 73 additions and 68 deletions

11
main.go
View File

@ -60,12 +60,12 @@ func run(ctx context.Context) error {
// Create RPC client for local dcrd instance (used for broadcasting and
// checking the status of fee transactions).
dcrd := rpc.SetupDcrd(ctx, &shutdownWg, cfg.DcrdUser, cfg.DcrdPass,
cfg.DcrdHost, cfg.dcrdCert, nil)
dcrd := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass, cfg.DcrdHost, cfg.dcrdCert, nil)
defer dcrd.Close()
// Create RPC client for remote dcrwallet instance (used for voting).
wallets := rpc.SetupWallet(ctx, &shutdownWg, cfg.WalletUser, cfg.WalletPass,
cfg.WalletHosts, cfg.walletCert)
wallets := rpc.SetupWallet(cfg.WalletUser, cfg.WalletPass, cfg.WalletHosts, cfg.walletCert)
defer wallets.Close()
// Create and start webapi server.
apiCfg := webapi.Config{
@ -85,8 +85,9 @@ func run(ctx context.Context) error {
}
// Create a dcrd client with a blockconnected notification handler.
dcrdWithNotifs := rpc.SetupDcrd(ctx, &shutdownWg, cfg.DcrdUser, cfg.DcrdPass,
dcrdWithNotifs := rpc.SetupDcrd(cfg.DcrdUser, cfg.DcrdPass,
cfg.DcrdHost, cfg.dcrdCert, &background.NotificationHandler{})
defer dcrdWithNotifs.Close()
// Start background process which will continually attempt to reconnect to
// dcrd if the connection drops.

View File

@ -22,16 +22,18 @@ type Caller interface {
Call(ctx context.Context, method string, res interface{}, args ...interface{}) error
}
// connect dials and returns a connected RPC client. A boolean indicates whether
// this connection is new (true), or if it is an existing connection which is
// being reused (false).
type connect func() (Caller, bool, error)
// client wraps a wsrpc.Client, as well as all of the connection details
// required to make a new client if the existing client is closed.
type client struct {
mu *sync.Mutex
client *wsrpc.Client
addr string
tlsOpt wsrpc.Option
authOpt wsrpc.Option
notifier wsrpc.Notifier
}
// setup accepts RPC connection details, creates an RPC client, and returns a
// function which can be called to access the client. The returned function will
// try to handle any client disconnects by attempting to reconnect, but will
// return an error if a new connection cannot be established.
func setup(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass, addr string, cert []byte, n wsrpc.Notifier) connect {
func setup(user, pass, addr string, cert []byte, n wsrpc.Notifier) *client {
// Create TLS options.
pool := x509.NewCertPool()
@ -42,53 +44,51 @@ func setup(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass, addr str
// Create authentication options.
authOpt := wsrpc.WithBasicAuth(user, pass)
fullAddr := "wss://" + addr + "/ws"
var mu sync.Mutex
var c *wsrpc.Client
// Add the graceful shutdown to the waitgroup.
shutdownWg.Add(1)
go func() {
// Wait until shutdown is signaled before shutting down.
<-ctx.Done()
return &client{&mu, c, addr, tlsOpt, authOpt, n}
}
if c != nil {
select {
case <-c.Done():
log.Tracef("RPC already closed (%s)", addr)
func (c *client) Close() {
if c.client != nil {
select {
case <-c.client.Done():
log.Tracef("RPC already closed (%s)", c.addr)
default:
if err := c.Close(); err != nil {
log.Errorf("Failed to close RPC (%s): %v", addr, err)
} else {
log.Tracef("RPC closed (%s)", addr)
}
default:
if err := c.client.Close(); err != nil {
log.Errorf("Failed to close RPC (%s): %v", c.addr, err)
} else {
log.Tracef("RPC closed (%s)", c.addr)
}
}
shutdownWg.Done()
}()
return func() (Caller, bool, error) {
defer mu.Unlock()
mu.Lock()
if c != nil {
select {
case <-c.Done():
log.Debugf("RPC client %s errored (%v); reconnecting...", addr, c.Err())
c = nil
default:
return c, false, nil
}
}
var err error
c, err = wsrpc.Dial(ctx, fullAddr, tlsOpt, authOpt, wsrpc.WithNotifier(n))
if err != nil {
return nil, false, err
}
return c, true, nil
}
}
// dial will return a connect rpc client if one exists, or attempt to create a
// new one if not. A
// boolean indicates whether this connection is new (true), or if it is an
// existing connection which is being reused (false).
func (c *client) dial(ctx context.Context) (Caller, bool, error) {
defer c.mu.Unlock()
c.mu.Lock()
if c.client != nil {
select {
case <-c.client.Done():
log.Debugf("RPC client %s errored (%v); reconnecting...", c.addr, c.client.Err())
c.client = nil
default:
return c.client, false, nil
}
}
var err error
fullAddr := "wss://" + c.addr + "/ws"
c.client, err = wsrpc.Dial(ctx, fullAddr, c.tlsOpt, c.authOpt, wsrpc.WithNotifier(c.notifier))
if err != nil {
return nil, false, err
}
return c.client, true, nil
}

View File

@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"strings"
"sync"
"github.com/decred/dcrd/blockchain/stake/v3"
"github.com/decred/dcrd/chaincfg/v3"
@ -27,17 +26,18 @@ type DcrdRPC struct {
ctx context.Context
}
type DcrdConnect connect
type DcrdConnect struct {
*client
}
func SetupDcrd(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass, addr string, cert []byte, n wsrpc.Notifier) DcrdConnect {
return DcrdConnect(setup(ctx, shutdownWg, user, pass, addr, cert, n))
func SetupDcrd(user, pass, addr string, cert []byte, n wsrpc.Notifier) DcrdConnect {
return DcrdConnect{setup(user, pass, addr, cert, n)}
}
// Client creates a new DcrdRPC client instance. Returns an error if dialing
// dcrd fails or if dcrd is misconfigured.
func (d *DcrdConnect) Client(ctx context.Context, netParams *chaincfg.Params) (*DcrdRPC, error) {
c, newConnection, err := connect(*d)()
c, newConnection, err := d.dial(ctx)
if err != nil {
return nil, fmt.Errorf("dcrd connection error: %v", err)
}

View File

@ -2,7 +2,6 @@ package rpc
import (
"context"
"sync"
wallettypes "decred.org/dcrwallet/rpc/jsonrpc/types"
"github.com/decred/dcrd/chaincfg/v3"
@ -21,19 +20,24 @@ type WalletRPC struct {
ctx context.Context
}
type WalletConnect []connect
type WalletConnect []*client
func SetupWallet(ctx context.Context, shutdownWg *sync.WaitGroup, user, pass string, addrs []string, cert []byte) WalletConnect {
func SetupWallet(user, pass string, addrs []string, cert []byte) WalletConnect {
walletConnect := make(WalletConnect, len(addrs))
for i := 0; i < len(addrs); i++ {
walletConnect[i] = setup(ctx, shutdownWg, user, pass,
addrs[i], cert, nil)
walletConnect[i] = setup(user, pass, addrs[i], cert, nil)
}
return walletConnect
}
func (w *WalletConnect) Close() {
for _, connect := range []*client(*w) {
connect.Close()
}
}
// Clients loops over each wallet and tries to establish a connection. It
// increments a count of failed connections if a connection cannot be
// established, or if the wallet is misconfigured.
@ -41,9 +45,9 @@ func (w *WalletConnect) Clients(ctx context.Context, netParams *chaincfg.Params)
walletClients := make([]*WalletRPC, 0)
failedConnections := 0
for _, connect := range []connect(*w) {
for _, connect := range []*client(*w) {
c, newConnection, err := connect()
c, newConnection, err := connect.dial(ctx)
if err != nil {
log.Errorf("dcrwallet connection error: %v", err)
failedConnections++