Run background tasks immediately on startup.
This commit is contained in:
parent
eeaefab280
commit
e3d7ab6e75
@ -11,14 +11,16 @@ import (
|
||||
"github.com/decred/vspd/rpc"
|
||||
)
|
||||
|
||||
type NotificationHandler struct {
|
||||
Ctx context.Context
|
||||
Db *database.VspDatabase
|
||||
Wallets rpc.WalletConnect
|
||||
NetParams *chaincfg.Params
|
||||
closed chan struct{}
|
||||
dcrdClient *rpc.DcrdRPC
|
||||
}
|
||||
var (
|
||||
ctx context.Context
|
||||
db *database.VspDatabase
|
||||
dcrdRPC rpc.DcrdConnect
|
||||
walletRPC rpc.WalletConnect
|
||||
netParams *chaincfg.Params
|
||||
notifierClosed chan struct{}
|
||||
)
|
||||
|
||||
type NotificationHandler struct{}
|
||||
|
||||
const (
|
||||
// requiredConfs is the number of confirmations required to consider a
|
||||
@ -43,23 +45,36 @@ func (n *NotificationHandler) Notify(method string, params json.RawMessage) erro
|
||||
|
||||
log.Debugf("Block notification %d (%s)", header.Height, header.BlockHash().String())
|
||||
|
||||
blockConnected()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func blockConnected() {
|
||||
|
||||
dcrdClient, err := dcrdRPC.Client(ctx, netParams)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Step 1/3: Update the database with any tickets which now have 6+
|
||||
// confirmations.
|
||||
|
||||
unconfirmed, err := n.Db.GetUnconfirmedTickets()
|
||||
unconfirmed, err := db.GetUnconfirmedTickets()
|
||||
if err != nil {
|
||||
log.Errorf("GetUnconfirmedTickets error: %v", err)
|
||||
}
|
||||
|
||||
for _, ticket := range unconfirmed {
|
||||
tktTx, err := n.dcrdClient.GetRawTransaction(ticket.Hash)
|
||||
tktTx, err := dcrdClient.GetRawTransaction(ticket.Hash)
|
||||
if err != nil {
|
||||
log.Errorf("GetRawTransaction error: %v", err)
|
||||
continue
|
||||
}
|
||||
if tktTx.Confirmations >= requiredConfs {
|
||||
ticket.Confirmed = true
|
||||
err = n.Db.UpdateTicket(ticket)
|
||||
err = db.UpdateTicket(ticket)
|
||||
if err != nil {
|
||||
log.Errorf("UpdateTicket error: %v", err)
|
||||
continue
|
||||
@ -71,13 +86,13 @@ func (n *NotificationHandler) Notify(method string, params json.RawMessage) erro
|
||||
|
||||
// Step 2/3: Broadcast fee tx for tickets which are confirmed.
|
||||
|
||||
pending, err := n.Db.GetPendingFees()
|
||||
pending, err := db.GetPendingFees()
|
||||
if err != nil {
|
||||
log.Errorf("GetPendingFees error: %v", err)
|
||||
}
|
||||
|
||||
for _, ticket := range pending {
|
||||
feeTxHash, err := n.dcrdClient.SendRawTransaction(ticket.FeeTxHex)
|
||||
feeTxHash, err := dcrdClient.SendRawTransaction(ticket.FeeTxHex)
|
||||
if err != nil {
|
||||
// TODO: SendRawTransaction can return a "transcation already
|
||||
// exists" error, which isnt necessarily a problem here.
|
||||
@ -86,7 +101,7 @@ func (n *NotificationHandler) Notify(method string, params json.RawMessage) erro
|
||||
}
|
||||
|
||||
ticket.FeeTxHash = feeTxHash
|
||||
err = n.Db.UpdateTicket(ticket)
|
||||
err = db.UpdateTicket(ticket)
|
||||
if err != nil {
|
||||
log.Errorf("UpdateTicket error: %v", err)
|
||||
continue
|
||||
@ -96,23 +111,23 @@ func (n *NotificationHandler) Notify(method string, params json.RawMessage) erro
|
||||
|
||||
// Step 3/3: Add tickets with confirmed fees to voting wallets.
|
||||
|
||||
unconfirmedFees, err := n.Db.GetUnconfirmedFees()
|
||||
unconfirmedFees, err := db.GetUnconfirmedFees()
|
||||
if err != nil {
|
||||
log.Errorf("GetUnconfirmedFees error: %v", err)
|
||||
// If this fails, there is nothing more we can do. Return.
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// If there are no confirmed fees, there is nothing more to do. Return.
|
||||
if len(unconfirmedFees) == 0 {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
walletClients, failedConnections := n.Wallets.Clients(n.Ctx, n.NetParams)
|
||||
walletClients, failedConnections := walletRPC.Clients(ctx, netParams)
|
||||
if len(walletClients) == 0 {
|
||||
// If no wallet clients, there is nothing more we can do. Return.
|
||||
log.Error("Could not connect to any wallets")
|
||||
return nil
|
||||
return
|
||||
}
|
||||
if failedConnections > 0 {
|
||||
log.Errorf("Failed to connect to %d wallet(s), proceeding with only %d",
|
||||
@ -120,7 +135,7 @@ func (n *NotificationHandler) Notify(method string, params json.RawMessage) erro
|
||||
}
|
||||
|
||||
for _, ticket := range unconfirmedFees {
|
||||
feeTx, err := n.dcrdClient.GetRawTransaction(ticket.FeeTxHash)
|
||||
feeTx, err := dcrdClient.GetRawTransaction(ticket.FeeTxHash)
|
||||
if err != nil {
|
||||
log.Errorf("GetRawTransaction error: %v", err)
|
||||
continue
|
||||
@ -130,16 +145,16 @@ func (n *NotificationHandler) Notify(method string, params json.RawMessage) erro
|
||||
// wallets.
|
||||
if feeTx.Confirmations >= requiredConfs {
|
||||
ticket.FeeConfirmed = true
|
||||
err = n.Db.UpdateTicket(ticket)
|
||||
err = db.UpdateTicket(ticket)
|
||||
if err != nil {
|
||||
log.Errorf("UpdateTicket error: %v", err)
|
||||
return nil
|
||||
return
|
||||
}
|
||||
log.Debugf("Fee tx confirmed for ticket: ticketHash=%s", ticket.Hash)
|
||||
|
||||
// Add ticket to the voting wallet.
|
||||
|
||||
rawTicket, err := n.dcrdClient.GetRawTransaction(ticket.Hash)
|
||||
rawTicket, err := dcrdClient.GetRawTransaction(ticket.Hash)
|
||||
if err != nil {
|
||||
log.Errorf("GetRawTransaction error: %v", err)
|
||||
continue
|
||||
@ -173,23 +188,22 @@ func (n *NotificationHandler) Notify(method string, params json.RawMessage) erro
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NotificationHandler) Close() error {
|
||||
close(n.closed)
|
||||
close(notifierClosed)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NotificationHandler) connect(dcrdConnect rpc.DcrdConnect) error {
|
||||
var err error
|
||||
n.dcrdClient, err = dcrdConnect.Client(n.Ctx, n.NetParams)
|
||||
func connectNotifier(dcrdWithNotifs rpc.DcrdConnect) error {
|
||||
notifierClosed = make(chan struct{})
|
||||
|
||||
dcrdClient, err := dcrdWithNotifs.Client(ctx, netParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = n.dcrdClient.NotifyBlocks()
|
||||
err = dcrdClient.NotifyBlocks()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -199,34 +213,43 @@ func (n *NotificationHandler) connect(dcrdConnect rpc.DcrdConnect) error {
|
||||
// Wait until context is done (vspd is shutting down), or until the
|
||||
// notifier is closed.
|
||||
select {
|
||||
case <-n.Ctx.Done():
|
||||
return n.Ctx.Err()
|
||||
case <-n.closed:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-notifierClosed:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func Start(n *NotificationHandler, dcrdConnect rpc.DcrdConnect) {
|
||||
func Start(c context.Context, vdb *database.VspDatabase, drpc rpc.DcrdConnect,
|
||||
dcrdWithNotif rpc.DcrdConnect, wrpc rpc.WalletConnect, p *chaincfg.Params) {
|
||||
|
||||
// Loop forever attempting to create a connection to the dcrd server.
|
||||
ctx = c
|
||||
db = vdb
|
||||
dcrdRPC = drpc
|
||||
walletRPC = wrpc
|
||||
netParams = p
|
||||
|
||||
// Run the block connected handler now to catch up with any blocks mined
|
||||
// while vspd was shut down.
|
||||
blockConnected()
|
||||
|
||||
// Loop forever attempting to create a connection to the dcrd server for
|
||||
// notifications.
|
||||
go func() {
|
||||
for {
|
||||
n.closed = make(chan struct{})
|
||||
|
||||
err := n.connect(dcrdConnect)
|
||||
err := connectNotifier(dcrdWithNotif)
|
||||
if err != nil {
|
||||
log.Errorf("dcrd connect error: %v", err)
|
||||
|
||||
// If context is done (vspd is shutting down), return,
|
||||
// otherwise wait 15 seconds and to reconnect.
|
||||
// otherwise wait 15 seconds and try to reconnect.
|
||||
select {
|
||||
case <-n.Ctx.Done():
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(15 * time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}()
|
||||
}
|
||||
|
||||
23
main.go
23
main.go
@ -87,21 +87,6 @@ func run(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a dcrd client with an attached notification handler which will run
|
||||
// in the background.
|
||||
notifHandler := &background.NotificationHandler{
|
||||
Ctx: ctx,
|
||||
Db: db,
|
||||
Wallets: wallets,
|
||||
NetParams: cfg.netParams.Params,
|
||||
}
|
||||
dcrdWithNotifHandler := rpc.SetupDcrd(ctx, &shutdownWg, cfg.DcrdUser, cfg.DcrdPass,
|
||||
cfg.DcrdHost, cfg.dcrdCert, notifHandler)
|
||||
|
||||
// Start background process which will continually attempt to reconnect to
|
||||
// dcrd if the connection drops.
|
||||
background.Start(notifHandler, dcrdWithNotifHandler)
|
||||
|
||||
// Create and start webapi server.
|
||||
apiCfg := webapi.Config{
|
||||
VSPFee: cfg.VSPFee,
|
||||
@ -119,6 +104,14 @@ func run(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a dcrd client with a blockconnected notification handler.
|
||||
dcrdWithNotifs := rpc.SetupDcrd(ctx, &shutdownWg, cfg.DcrdUser, cfg.DcrdPass,
|
||||
cfg.DcrdHost, cfg.dcrdCert, &background.NotificationHandler{})
|
||||
|
||||
// Start background process which will continually attempt to reconnect to
|
||||
// dcrd if the connection drops.
|
||||
background.Start(ctx, db, dcrd, dcrdWithNotifs, wallets, cfg.netParams.Params)
|
||||
|
||||
// Wait for shutdown tasks to complete before returning.
|
||||
shutdownWg.Wait()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user