From 90f42caf66c9b3189c3c752c5aecb67cce7a96d3 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Wed, 27 Sep 2017 11:28:37 -0400 Subject: [PATCH] Begin abstraction of wallet syncing out of the wallet package. This change begins the removal of the dcrd JSON-RPC client dependency from the wallet package and type. All notification processing to keep the wallet up to date has been moved to the chain package. This change is being performed to make switching to other network syncing backends easier in the future. Most importantly, it is required for any kind of support for both RPC and SPV network backends. To avoid some churn, the wallet must still be associated with an object that provides basic network operations such as loading data filters, fetching headers, and transaction publishing. This object is now an interface type rather than the concrete RPC client type. At the moment, this interface is currently too narrow to allow an SPV backend as a drop in replacement, but the goal is to remove methods from it until this becomes possible. There are still some features in the wallet package which do require the RPC client. These features either take the RPC client as a method parameter or assert that the network backend is the RPC client. This change also uses this API break as an opportunity to begin passing around context.Context for request-scoped cancellation and timeouts. The methods of the network backend interface all take a context, and while it is not used by the backend, they will be used for any future implementations. --- apperrors/code_string.go | 6 +- apperrors/error.go | 12 ++ chain/backendwrapper.go | 118 +++++++++++++ chain/chain.go | 130 +++++++------- chain/sync.go | 237 +++++++++++++++++++++++++ dcrwallet.go | 307 ++++++++++++++++++-------------- ipc.go | 6 +- log.go | 8 +- rpc/legacyrpc/methods.go | 24 ++- rpc/legacyrpc/server.go | 29 +-- rpc/rpcserver/server.go | 127 ++++++++------ signal.go | 103 +++++------ wallet/addresses.go | 16 +- wallet/chainntfns.go | 189 +++++++------------- wallet/createtx.go | 77 +++++--- wallet/network.go | 55 ++++++ wallet/rescan.go | 135 +++++--------- wallet/sync.go | 38 ++-- wallet/txrules/rules.go | 15 ++ wallet/udb/txmined.go | 36 ++++ wallet/wallet.go | 370 ++++++++------------------------------- 21 files changed, 1102 insertions(+), 936 deletions(-) create mode 100644 chain/backendwrapper.go create mode 100644 chain/sync.go create mode 100644 wallet/network.go diff --git a/apperrors/code_string.go b/apperrors/code_string.go index 71fa36224..6ecfc7aa6 100644 --- a/apperrors/code_string.go +++ b/apperrors/code_string.go @@ -1,12 +1,12 @@ -// Code generated by "stringer -type=Code"; DO NOT EDIT +// Code generated by "stringer -type=Code"; DO NOT EDIT. package apperrors import "fmt" -const _Code_name = "ErrDatabaseErrUpgradeErrKeyChainErrCryptoErrInvalidKeyTypeErrNoExistErrAlreadyExistsErrCoinTypeTooHighErrAccountNumTooHighErrLockedErrWatchingOnlyErrInvalidAccountErrAddressNotFoundErrAccountNotFoundErrDuplicateAddressErrDuplicateAccountErrTooManyAddressesErrWrongPassphraseErrWrongNetErrCallBackBreakErrEmptyPassphraseErrCreateAddressErrMetaPoolIdxNoExistErrBranchErrDataErrInputErrValueNoExistsErrDoubleSpendErrNeedsUpgradeErrUnknownVersionErrIsClosedErrDuplicateErrSStxNotFoundErrSSGensNotFoundErrSSRtxsNotFoundErrPoolUserTicketsNotFoundErrPoolUserInvalTcktsNotFoundErrBadPoolUserAddrErrUnimplementedErrExceedsGapLimitErrExhaustedAccount" +const _Code_name = "ErrDatabaseErrUpgradeErrKeyChainErrCryptoErrInvalidKeyTypeErrNoExistErrAlreadyExistsErrCoinTypeTooHighErrAccountNumTooHighErrLockedErrWatchingOnlyErrInvalidAccountErrAddressNotFoundErrAccountNotFoundErrDuplicateAddressErrDuplicateAccountErrTooManyAddressesErrWrongPassphraseErrWrongNetErrCallBackBreakErrEmptyPassphraseErrCreateAddressErrMetaPoolIdxNoExistErrBranchErrDataErrInputErrValueNoExistsErrDoubleSpendErrNeedsUpgradeErrUnknownVersionErrIsClosedErrDuplicateErrSStxNotFoundErrSSGensNotFoundErrSSRtxsNotFoundErrPoolUserTicketsNotFoundErrPoolUserInvalTcktsNotFoundErrBadPoolUserAddrErrUnimplementedErrExceedsGapLimitErrExhaustedAccountErrDisconnectedErrUnsupportedErrHighFees" -var _Code_index = [...]uint16{0, 11, 21, 32, 41, 58, 68, 84, 102, 122, 131, 146, 163, 181, 199, 218, 237, 256, 274, 285, 301, 319, 335, 356, 365, 372, 380, 396, 410, 425, 442, 453, 465, 480, 497, 514, 540, 569, 587, 603, 621, 640} +var _Code_index = [...]uint16{0, 11, 21, 32, 41, 58, 68, 84, 102, 122, 131, 146, 163, 181, 199, 218, 237, 256, 274, 285, 301, 319, 335, 356, 365, 372, 380, 396, 410, 425, 442, 453, 465, 480, 497, 514, 540, 569, 587, 603, 621, 640, 655, 669, 680} func (i Code) String() string { if i < 0 || i >= Code(len(_Code_index)-1) { diff --git a/apperrors/error.go b/apperrors/error.go index 82e9ed8a6..c3e275820 100644 --- a/apperrors/error.go +++ b/apperrors/error.go @@ -191,6 +191,18 @@ const ( // ErrExhaustedAccount indicates that all possible addresses for an account // have been derived and no more can be created. ErrExhaustedAccount + + // ErrDisconnected indicates that the operation could not be completed due + // to being disconnected from the Decred network. + ErrDisconnected + + // ErrUnsupported indicates the operation is unsupported. + ErrUnsupported + + // ErrHighFees indicates the transaction pays much more fee that should be + // required, and an error is returned to prevent an accident where a + // significant amount of value is lost to fees. + ErrHighFees ) // E describes an application-level error. An error code is provided to diff --git a/chain/backendwrapper.go b/chain/backendwrapper.go new file mode 100644 index 000000000..f8aaef890 --- /dev/null +++ b/chain/backendwrapper.go @@ -0,0 +1,118 @@ +// Copyright (c) 2017 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package chain + +import ( + "context" + "encoding/hex" + + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/dcrutil" + rpcclient "github.com/decred/dcrd/rpcclient" + "github.com/decred/dcrd/wire" + "github.com/decred/dcrwallet/apperrors" + "github.com/decred/dcrwallet/wallet" + "github.com/jrick/bitset" +) + +type rpcBackend struct { + rpcClient *rpcclient.Client +} + +var _ wallet.NetworkBackend = (*rpcBackend)(nil) + +// BackendFromRPCClient creates a wallet network backend from an RPC client. +func BackendFromRPCClient(rpcClient *rpcclient.Client) wallet.NetworkBackend { + return &rpcBackend{rpcClient} +} + +// RPCClientFromBackend returns the RPC client used to create a wallet network +// backend. This errors if the backend was not created using +// BackendFromRPCClient. +func RPCClientFromBackend(n wallet.NetworkBackend) (*rpcclient.Client, error) { + b, ok := n.(*rpcBackend) + if !ok { + return nil, apperrors.New(apperrors.ErrUnsupported, + "this operation requires the network backend to be the consensus RPC server") + } + return b.rpcClient, nil +} + +func (b *rpcBackend) GetHeaders(ctx context.Context, blockLocators []chainhash.Hash, hashStop *chainhash.Hash) ([][]byte, error) { + r, err := b.rpcClient.GetHeaders(blockLocators, hashStop) + if err != nil { + return nil, err + } + headers := make([][]byte, 0, len(r.Headers)) + for _, hexHeader := range r.Headers { + header, err := hex.DecodeString(hexHeader) + if err != nil { + return nil, err + } + headers = append(headers, header) + } + return headers, nil +} + +func (b *rpcBackend) LoadTxFilter(ctx context.Context, reload bool, addrs []dcrutil.Address, outpoints []wire.OutPoint) error { + return b.rpcClient.LoadTxFilter(reload, addrs, outpoints) +} + +func (b *rpcBackend) PublishTransaction(ctx context.Context, tx *wire.MsgTx) error { + // High fees are hardcoded and allowed here since transactions created by + // the wallet perform their own high fee check if high fees are disabled. + // This matches the lack of any high fee checking when publishing + // transactions over the wire protocol. + _, err := b.rpcClient.SendRawTransaction(tx, true) + return err +} + +func (b *rpcBackend) AddressesUsed(ctx context.Context, addrs []dcrutil.Address) (bitset.Bytes, error) { + hexBitSet, err := b.rpcClient.ExistsAddresses(addrs) + if err != nil { + return nil, err + } + return hex.DecodeString(hexBitSet) +} + +func (b *rpcBackend) Rescan(ctx context.Context, blocks []chainhash.Hash) ([]*wallet.RescannedBlock, error) { + r, err := b.rpcClient.Rescan(blocks) + if err != nil { + return nil, err + } + discoveredData := make([]*wallet.RescannedBlock, 0, len(r.DiscoveredData)) + for _, d := range r.DiscoveredData { + blockHash, err := chainhash.NewHashFromStr(d.Hash) + if err != nil { + return nil, err + } + txs := make([][]byte, 0, len(d.Transactions)) + for _, txHex := range d.Transactions { + tx, err := hex.DecodeString(txHex) + if err != nil { + return nil, err + } + txs = append(txs, tx) + } + rescannedBlock := &wallet.RescannedBlock{ + BlockHash: *blockHash, + Transactions: txs, + } + discoveredData = append(discoveredData, rescannedBlock) + } + return discoveredData, nil +} + +func (b *rpcBackend) StakeDifficulty(ctx context.Context) (dcrutil.Amount, error) { + r, err := b.rpcClient.GetStakeDifficulty() + if err != nil { + return 0, err + } + return dcrutil.NewAmount(r.NextStakeDifficulty) +} + +func (b *rpcBackend) GetBlockHash(ctx context.Context, height int32) (*chainhash.Hash, error) { + return b.rpcClient.GetBlockHash(int64(height)) +} diff --git a/chain/chain.go b/chain/chain.go index 5a383bd36..1aa0638da 100644 --- a/chain/chain.go +++ b/chain/chain.go @@ -65,7 +65,6 @@ func NewRPCClient(chainParams *chaincfg.Params, connect, user, pass string, cert quit: make(chan struct{}), } ntfnCallbacks := &dcrrpcclient.NotificationHandlers{ - OnClientConnected: client.onClientConnect, OnBlockConnected: client.onBlockConnected, OnBlockDisconnected: client.onBlockDisconnected, OnRelevantTxAccepted: client.onRelevantTxAccepted, @@ -88,7 +87,7 @@ func NewRPCClient(chainParams *chaincfg.Params, connect, user, pass string, cert // function gives up, and therefore will not block forever waiting for the // connection to be established to a server that may not exist. func (c *RPCClient) Start(ctx context.Context, retry bool) (err error) { - err = c.Connect(ctx, retry) + err = c.Client.Connect(ctx, retry) if err != nil { return err } @@ -100,7 +99,7 @@ func (c *RPCClient) Start(ctx context.Context, retry bool) (err error) { }() // Verify that the server is running on the expected network. - net, err := c.GetCurrentNet() + net, err := c.Client.GetCurrentNet() if err != nil { return err } @@ -110,7 +109,7 @@ func (c *RPCClient) Start(ctx context.Context, retry bool) (err error) { // Ensure the RPC server has a compatible API version. var serverAPI semver - versions, err := c.Version() + versions, err := c.Client.Version() if err == nil { versionResult := versions["dcrdjsonrpcapi"] serverAPI = semver{ @@ -165,89 +164,78 @@ func (c *RPCClient) WaitForShutdown() { // dcrrpcclient callbacks, which isn't very Go-like and doesn't allow // blocking client calls. type ( - // ClientConnected is a notification for when a client connection is - // opened or reestablished to the chain server. - ClientConnected struct{} - - // BlockConnected is a notification for a newly-attached block to the + // blockConnected is a notification for a newly-attached block to the // best chain. - BlockConnected struct { - BlockHeader []byte - Transactions [][]byte + blockConnected struct { + blockHeader []byte + transactions [][]byte } - // BlockDisconnected is a notifcation that the block described by the - // BlockStamp was reorganized out of the best chain. - BlockDisconnected struct { - BlockHeader []byte + // blockDisconnected is a notifcation that the block described by the header + // was reorganized out of the best chain. + blockDisconnected struct { + blockHeader []byte } - // RelevantTxAccepted is a notification that a transaction accepted by + // relevantTxAccepted is a notification that a transaction accepted by // mempool passed the client's transaction filter. - RelevantTxAccepted struct { - Transaction []byte + relevantTxAccepted struct { + transaction []byte } - // Reorganization is a notification that a reorg has happen with the new + // reorganization is a notification that a reorg has happen with the new // old and new tip included. - Reorganization struct { - OldHash *chainhash.Hash - OldHeight int64 - NewHash *chainhash.Hash - NewHeight int64 + reorganization struct { + oldHash *chainhash.Hash + oldHeight int64 + newHash *chainhash.Hash + newHeight int64 } - // WinningTickets is a notification with the winning tickets (and the + // winningTickets is a notification with the winning tickets (and the // block they are in. - WinningTickets struct { - BlockHash *chainhash.Hash - BlockHeight int64 - Tickets []*chainhash.Hash + winningTickets struct { + blockHash *chainhash.Hash + blockHeight int64 + tickets []*chainhash.Hash } - // MissedTickets is a notifcation for tickets that have been missed. - MissedTickets struct { - BlockHash *chainhash.Hash - BlockHeight int64 - Tickets []*chainhash.Hash + // missedTickets is a notifcation for tickets that have been missed. + missedTickets struct { + blockHash *chainhash.Hash + blockHeight int64 + tickets []*chainhash.Hash } - // StakeDifficulty is a notification for the current stake difficulty. - StakeDifficulty struct { - BlockHash *chainhash.Hash - BlockHeight int64 - StakeDiff int64 + // stakeDifficulty is a notification for the current stake difficulty. + stakeDifficulty struct { + blockHash *chainhash.Hash + blockHeight int64 + stakeDiff int64 } ) -// Notifications returns a channel of parsed notifications sent by the remote +// notifications returns a channel of parsed notifications sent by the remote // decred RPC server. This channel must be continually read or the process // may abort for running out memory, as unread notifications are queued for // later reads. -func (c *RPCClient) Notifications() <-chan interface{} { +func (c *RPCClient) notifications() <-chan interface{} { return c.dequeueNotification } -// NotificationsVoting returns a channel of parsed voting notifications sent +// notificationsVoting returns a channel of parsed voting notifications sent // by the remote RPC server. This channel must be continually read or the // process may abort for running out memory, as unread notifications are // queued for later reads. -func (c *RPCClient) NotificationsVoting() <-chan interface{} { +func (c *RPCClient) notificationsVoting() <-chan interface{} { return c.dequeueVotingNotification } -func (c *RPCClient) onClientConnect() { - select { - case c.enqueueNotification <- ClientConnected{}: - case <-c.quit: - } -} - func (c *RPCClient) onBlockConnected(header []byte, transactions [][]byte) { select { - case c.enqueueNotification <- BlockConnected{ - BlockHeader: header, - Transactions: transactions, + case c.enqueueNotification <- blockConnected{ + blockHeader: header, + transactions: transactions, }: case <-c.quit: } @@ -255,8 +243,8 @@ func (c *RPCClient) onBlockConnected(header []byte, transactions [][]byte) { func (c *RPCClient) onBlockDisconnected(header []byte) { select { - case c.enqueueNotification <- BlockDisconnected{ - BlockHeader: header, + case c.enqueueNotification <- blockDisconnected{ + blockHeader: header, }: case <-c.quit: } @@ -264,8 +252,8 @@ func (c *RPCClient) onBlockDisconnected(header []byte) { func (c *RPCClient) onRelevantTxAccepted(transaction []byte) { select { - case c.enqueueNotification <- RelevantTxAccepted{ - Transaction: transaction, + case c.enqueueNotification <- relevantTxAccepted{ + transaction: transaction, }: case <-c.quit: } @@ -276,7 +264,7 @@ func (c *RPCClient) onRelevantTxAccepted(transaction []byte) { func (c *RPCClient) onReorganization(oldHash *chainhash.Hash, oldHeight int32, newHash *chainhash.Hash, newHeight int32) { select { - case c.enqueueNotification <- Reorganization{ + case c.enqueueNotification <- reorganization{ oldHash, int64(oldHeight), newHash, @@ -290,10 +278,10 @@ func (c *RPCClient) onReorganization(oldHash *chainhash.Hash, oldHeight int32, // downstream to the notifications queue. func (c *RPCClient) onWinningTickets(hash *chainhash.Hash, height int64, tickets []*chainhash.Hash) { select { - case c.enqueueVotingNotification <- WinningTickets{ - BlockHash: hash, - BlockHeight: height, - Tickets: tickets, + case c.enqueueVotingNotification <- winningTickets{ + blockHash: hash, + blockHeight: height, + tickets: tickets, }: case <-c.quit: } @@ -302,25 +290,25 @@ func (c *RPCClient) onWinningTickets(hash *chainhash.Hash, height int64, tickets // onSpentAndMissedTickets handles missed tickets notifications data and passes // it downstream to the notifications queue. func (c *RPCClient) onSpentAndMissedTickets(blockHash *chainhash.Hash, height int64, sdiff int64, tickets map[chainhash.Hash]bool) { - var missedTickets []*chainhash.Hash + var missed []*chainhash.Hash // Copy the missed ticket hashes to a slice. for ticketHash, spent := range tickets { if !spent { ticketHash := ticketHash - missedTickets = append(missedTickets, &ticketHash) + missed = append(missed, &ticketHash) } } - if len(missedTickets) == 0 { + if len(missed) == 0 { return } select { - case c.enqueueNotification <- MissedTickets{ - BlockHash: blockHash, - BlockHeight: height, - Tickets: missedTickets, + case c.enqueueNotification <- missedTickets{ + blockHash: blockHash, + blockHeight: height, + tickets: missed, }: case <-c.quit: } @@ -333,7 +321,7 @@ func (c *RPCClient) onStakeDifficulty(hash *chainhash.Hash, stakeDiff int64) { select { - case c.enqueueNotification <- StakeDifficulty{ + case c.enqueueNotification <- stakeDifficulty{ hash, height, stakeDiff, diff --git a/chain/sync.go b/chain/sync.go new file mode 100644 index 000000000..58bf3e3e0 --- /dev/null +++ b/chain/sync.go @@ -0,0 +1,237 @@ +// Copyright (c) 2017 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package chain + +import ( + "context" + "fmt" + + "github.com/decred/dcrwallet/apperrors" + "github.com/decred/dcrwallet/wallet" + "golang.org/x/sync/errgroup" +) + +// RPCSyncer implements wallet synchronization services by processing +// notifications from a dcrd JSON-RPC server. +type RPCSyncer struct { + wallet *wallet.Wallet + rpcClient *RPCClient +} + +// NewRPCSyncer creates an RPCSyncer that will sync the wallet using the RPC +// client. +func NewRPCSyncer(w *wallet.Wallet, rpcClient *RPCClient) *RPCSyncer { + return &RPCSyncer{w, rpcClient} +} + +// Run synchronizes the wallet, returning when synchronization fails or the +// context is cancelled. If startupSync is true, all synchronization tasks +// needed to fully register the wallet for notifications and synchronize it with +// the dcrd server are performed. Otherwise, it will listen for notifications +// but not register for any updates. +func (s *RPCSyncer) Run(ctx context.Context, startupSync bool) error { + // TODO: handling of voting notifications should be done sequentially with + // every other notification (voters must know the blocks they are voting + // on). Until then, a couple notification processing goroutines must be + // started and errors merged. + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + if startupSync { + err := s.startupSync(ctx) + if err != nil { + return err + } + } + return s.handleNotifications(ctx) + }) + g.Go(func() error { + return s.handleVoteNotifications(ctx) + }) + g.Go(func() error { + err := s.rpcClient.NotifySpentAndMissedTickets() + if err != nil { + return fmt.Errorf("failed to register for spentandmissedtickets "+ + "notifications: %v", err) + } + + if s.wallet.VotingEnabled() { + // Request notifications for winning tickets. + err := s.rpcClient.NotifyWinningTickets() + if err != nil { + return fmt.Errorf("failed to register for winningtickets "+ + "notifications: %v", err) + } + + vb := s.wallet.VoteBits() + log.Infof("Wallet voting enabled: vote bits = %#04x, "+ + "extended vote bits = %x", vb.Bits, vb.ExtendedBits) + log.Infof("Please ensure your wallet remains unlocked so it may vote") + } + + return nil + }) + return g.Wait() +} + +func (s *RPCSyncer) handleNotifications(ctx context.Context) error { + // connectingBlocks keeps track of whether any blocks have been successfully + // attached to the main chain. Once any blocks have attached, if a future + // block fails to attach, the error is fatal. Otherwise, errors are logged. + connectingBlocks := false + + c := s.rpcClient.notifications() + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case n, ok := <-c: + if !ok { + return apperrors.New(apperrors.ErrDisconnected, "RPC client disconnected") + } + + var notificationName string + var err error + nonFatal := false + switch n := n.(type) { + case blockConnected: + notificationName = "blockconnected" + err = s.wallet.ConnectBlock(n.blockHeader, n.transactions) + if err == nil { + connectingBlocks = true + } + nonFatal = !connectingBlocks + + case blockDisconnected: + continue // These notifications are ignored + + case reorganization: + notificationName = "reorganizing" + err = s.wallet.StartReorganize(n.oldHash, n.newHash, n.oldHeight, n.newHeight) + + case relevantTxAccepted: + notificationName = "relevanttxaccepted" + err = s.wallet.AcceptMempoolTx(n.transaction) + nonFatal = true + + case missedTickets: + notificationName = "spentandmissedtickets" + err = s.wallet.RevokeOwnedTickets(n.tickets) + nonFatal = true + + default: + log.Warnf("Notification handler received unknown notification type %T", n) + continue + } + + switch { + case err == nil: + case nonFatal: + log.Errorf("Failed to process consensus server notification "+ + "(name: `%s`, detail: `%v`)", notificationName, err) + default: + return fmt.Errorf("failed to process consensus server notification "+ + "(name: `%s`, detail: `%v`)", notificationName, err) + } + } + } +} + +func (s *RPCSyncer) handleVoteNotifications(ctx context.Context) error { + c := s.rpcClient.notificationsVoting() + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case n, ok := <-c: + if !ok { + return apperrors.New(apperrors.ErrDisconnected, "RPC client disconnected") + } + + var notificationName string + var err error + switch n := n.(type) { + case winningTickets: + notificationName = "winningtickets" + err = s.wallet.VoteOnOwnedTickets(n.tickets, n.blockHash, int32(n.blockHeight)) + default: + log.Warnf("Voting handler received unknown notification type %T", n) + } + if err != nil { + log.Errorf("Failed to handle %s notification: %v", notificationName, err) + } + } + } +} + +// startupSync brings the wallet up to date with the current chain server +// connection. It creates a rescan request and blocks until the rescan has +// finished. +func (s *RPCSyncer) startupSync(ctx context.Context) error { + // Request notifications for connected and disconnected blocks. + err := s.rpcClient.NotifyBlocks() + if err != nil { + return err + } + + n := BackendFromRPCClient(s.rpcClient.Client) + + // Discover any addresses for this wallet that have not yet been created. + err = s.wallet.DiscoverActiveAddresses(n, !s.wallet.Locked()) + if err != nil { + return err + } + + // Load transaction filters with all active addresses and watched outpoints. + err = s.wallet.LoadActiveDataFilters(n) + if err != nil { + return err + } + + // Fetch headers for unseen blocks in the main chain, determine whether a + // rescan is necessary, and when to begin it. + fetchedHeaderCount, rescanStart, _, _, _, err := s.wallet.FetchHeaders(n) + if err != nil { + return err + } + + // Rescan when necessary. + if fetchedHeaderCount != 0 { + err := s.wallet.Rescan(ctx, n, &rescanStart) + if err != nil { + return err + } + } + + unminedTxs, err := s.wallet.UnminedTransactions() + if err != nil { + log.Errorf("Cannot load unmined transactions for resending: %v", err) + unminedTxs = nil + } + for _, tx := range unminedTxs { + txHash := tx.TxHash() + err := n.PublishTransaction(ctx, tx) + if err != nil { + // TODO: Transactions should be removed if this is a double spend. + log.Tracef("Could not resend transaction %v: %v", &txHash, err) + continue + } + log.Tracef("Resent unmined transaction %v", &txHash) + } + + _, err = s.rpcClient.RawRequest("rebroadcastwinners", nil) + if err != nil { + return err + } + _, err = s.rpcClient.RawRequest("rebroadcastmissed", nil) + if err != nil { + return err + } + + log.Infof("Blockchain sync completed, wallet ready for general usage.") + + return nil +} diff --git a/dcrwallet.go b/dcrwallet.go index 52ad5f40b..e6fdc0a51 100644 --- a/dcrwallet.go +++ b/dcrwallet.go @@ -15,7 +15,6 @@ import ( "os" "runtime" "runtime/pprof" - "sync" "time" "github.com/decred/dcrd/chaincfg" @@ -33,21 +32,33 @@ var ( ) func main() { - // Use all processor cores. - runtime.GOMAXPROCS(runtime.NumCPU()) + // Create a context that is cancelled when a shutdown request is received + // through an interrupt signal or an RPC request. + ctx := withShutdownCancel(context.Background()) + go shutdownListener() - // Work around defer not working after os.Exit. - if err := walletMain(); err != nil { + // Run the wallet until permanent failure or shutdown is requested. + if err := run(ctx); err != nil && err != context.Canceled { os.Exit(1) } } -// walletMain is a work-around main function that is required since deferred -// functions (such as log flushing) are not called with calls to os.Exit. -// Instead, main runs this function and checks for a non-nil error, at which -// point any defers have already run, and if the error is non-nil, the program -// can be exited with an error exit status. -func walletMain() error { +// done returns whether the context's Done channel was closed due to +// cancellation or exceeded deadline. +func done(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + +// run is the main startup and teardown logic performed by the main package. It +// is responsible for parsing the config, starting RPC servers, loading and +// syncing the wallet (if necessary), and stopping all started services when the +// context is cancelled. +func run(ctx context.Context) error { // Load configuration and parse command line. This function also // initializes logging and configures it accordingly. tcfg, _, err := loadConfig() @@ -64,7 +75,19 @@ func walletMain() error { // Show version at startup. log.Infof("Version %s (Go version %s)", version(), runtime.Version()) + // Read IPC messages from the read end of a pipe created and passed by the + // parent process, if any. When this pipe is closed, shutdown is + // initialized. + if cfg.PipeRx != nil { + go serviceControlPipeRx(uintptr(*cfg.PipeRx)) + } + + // Run the pprof profiler if enabled. if len(cfg.Profile) > 0 { + if done(ctx) { + return ctx.Err() + } + profileRedirect := http.RedirectHandler("/debug/pprof", http.StatusSeeOther) http.Handle("/", profileRedirect) for _, listenAddr := range cfg.Profile { @@ -81,6 +104,10 @@ func walletMain() error { // Write mem profile if requested. if cfg.MemProfile != "" { + if done(ctx) { + return ctx.Err() + } + f, err := os.Create(cfg.MemProfile) if err != nil { log.Errorf("Unable to create cpu profile: %v", err) @@ -94,6 +121,13 @@ func walletMain() error { }() } + if done(ctx) { + return ctx.Err() + } + + // Create the loader which is used to load and unload the wallet. If + // --noinitialload is not set, this function is responsible for loading the + // wallet. Otherwise, loading is deferred so it can be performed over RPC. dbDir := networkDir(cfg.AppDataDir.Value, activeNet.Params) stakeOptions := &ldr.StakeOptions{ VotingEnabled: cfg.EnableVoting, @@ -107,6 +141,18 @@ func walletMain() error { loader := ldr.NewLoader(activeNet.Params, dbDir, stakeOptions, cfg.AddrIdxScanLen, cfg.AllowHighFees, cfg.RelayFee.ToCoin()) + // Stop any services started by the loader after the shutdown procedure is + // initialized and this function returns. + defer func() { + err := loader.UnloadWallet() + if err != nil && err != ldr.ErrWalletNotLoaded { + log.Errorf("Failed to close wallet: %v", err) + } else if err == nil { + log.Infof("Closed wallet") + } + }() + + // Open the wallet when --noinitialload was not set. passphrase := []byte{} if !cfg.NoInitialLoad { walletPass := []byte(cfg.WalletPass) @@ -125,14 +171,22 @@ func walletMain() error { } } - // Load the wallet database. It must have been created already - // or this will return an appropriate error. + if done(ctx) { + return ctx.Err() + } + + // Load the wallet. It must have been created already or this will + // return an appropriate error. w, err := loader.OpenExistingWallet(walletPass) if err != nil { - log.Errorf("Open failed: %v", err) + log.Errorf("Failed to open wallet: %v", err) return err } + if done(ctx) { + return ctx.Err() + } + // TODO(jrick): I think that this prompt should be removed // entirely instead of enabling it when --noinitialload is // unset. It can be replaced with an RPC request (either @@ -142,86 +196,78 @@ func walletMain() error { // // Until then, since --noinitialload users are expecting to use // the wallet only over RPC, disable this feature for them. - if !cfg.NoInitialLoad { - if cfg.Pass != "" { - passphrase = []byte(cfg.Pass) - w.SetInitiallyUnlocked(true) - var unlockAfter <-chan time.Time - err = w.Unlock(passphrase, unlockAfter) - if err != nil { - log.Errorf("Incorrect passphrase in pass config setting.") - return err - } - } else { - passphrase = startPromptPass(w) + if cfg.Pass != "" { + err = w.Unlock([]byte(cfg.Pass), nil) + if err != nil { + log.Errorf("Incorrect passphrase in pass config setting.") + return err } + w.SetInitiallyUnlocked(true) + } else { + passphrase = startPromptPass(w) } } - // Create and start HTTP server to serve wallet client connections. - // This will be updated with the wallet and chain server RPC client - // created below after each is created. - rpcs, legacyRPCServer, err := startRPCServers(loader) + if done(ctx) { + return ctx.Err() + } + + // Create and start the RPC servers to serve wallet client connections. If + // any of the servers can not be started, it will be nil. If none of them + // can be started, this errors since at least one server must run for the + // wallet to be useful. + // + // Servers will be associated with a loaded wallet if it has already been + // loaded, or after it is loaded later on. + gRPCServer, jsonRPCServer, err := startRPCServers(loader) if err != nil { log.Errorf("Unable to create RPC servers: %v", err) return err } - - // Create and start chain RPC client so it's ready to connect to - // the wallet when loaded later. - if !cfg.NoInitialLoad { - go rpcClientConnectLoop(passphrase, legacyRPCServer, loader) - } - - // Start wallet and voting gRPC services after a wallet is loaded if the - // gRPC server was created. - if rpcs != nil { + if gRPCServer != nil { + // Start wallet and voting gRPC services after a wallet is loaded. loader.RunAfterLoad(func(w *wallet.Wallet) { - rpcserver.StartWalletService(rpcs, w) - rpcserver.StartVotingService(rpcs, w) + rpcserver.StartWalletService(gRPCServer, w) + rpcserver.StartVotingService(gRPCServer, w) }) + defer func() { + log.Warn("Stopping gRPC server...") + gRPCServer.Stop() + log.Info("gRPC server shutdown") + }() } - - if cfg.PipeRx != nil { - go serviceControlPipeRx(uintptr(*cfg.PipeRx)) + if jsonRPCServer != nil { + go func() { + for range jsonRPCServer.RequestProcessShutdown() { + requestShutdown() + } + }() + defer func() { + log.Warn("Stopping JSON-RPC server...") + jsonRPCServer.Stop() + log.Info("JSON-RPC server shutdown") + }() } - // Add interrupt handlers to shutdown the various process components - // before exiting. Interrupt handlers run in LIFO order, so the wallet - // (which should be closed last) is added first. - addInterruptHandler(func() { - // The only possible err here is ErrTicketBuyerStopped, which can be - // safely ignored. - _ = loader.StopTicketPurchase() - err := loader.UnloadWallet() - if err != nil && err != ldr.ErrWalletNotLoaded { - log.Errorf("Failed to close wallet: %v", err) + // Stop the ticket buyer (if running) on shutdown. This returns an error + // that can be ignored when the ticket buyer was never started. + defer loader.StopTicketPurchase() + + // When not running with --noinitialload, it is the main package's + // responsibility to connect the loaded wallet to the dcrd RPC server for + // wallet synchronization. This function blocks until cancelled. + if !cfg.NoInitialLoad { + if done(ctx) { + return ctx.Err() } - }) - if rpcs != nil { - addInterruptHandler(func() { - // TODO: Does this need to wait for the grpc server to - // finish up any requests? - log.Warn("Stopping RPC server...") - rpcs.Stop() - log.Info("RPC server shutdown") - }) - } - if legacyRPCServer != nil { - addInterruptHandler(func() { - log.Warn("Stopping legacy RPC server...") - legacyRPCServer.Stop() - log.Info("Legacy RPC server shutdown") - }) - go func() { - <-legacyRPCServer.RequestProcessShutdown() - simulateInterrupt() - }() + + rpcClientConnectLoop(ctx, passphrase, jsonRPCServer, loader) } - <-interruptHandlersDone - log.Info("Shutdown complete") - return nil + // Wait until shutdown is signaled before returning and running deferred + // shutdown tasks. + <-ctx.Done() + return ctx.Err() } // startPromptPass prompts the user for a password to unlock their wallet in @@ -295,76 +341,65 @@ func startPromptPass(w *wallet.Wallet) []byte { } } -// rpcClientConnectLoop continuously attempts a connection to the consensus RPC -// server. When a connection is established, the client is used to sync the -// loaded wallet, either immediately or when loaded at a later time. +// rpcClientConnectLoop loops forever, attempting to create a connection to the +// consensus RPC server. If this connection succeeds, the RPC client is used as +// the loaded wallet's network backend and used to keep the wallet synchronized +// to the network. If/when the RPC connection is lost, the wallet is +// disassociated from the client and a new connection is attempmted. // -// The legacy RPC is optional. If set, the connected RPC client will be +// The JSON-RPC server is optional. If set, the connected RPC client will be // associated with the server for RPC passthrough and to enable additional // methods. -func rpcClientConnectLoop(passphrase []byte, legacyRPCServer *legacyrpc.Server, loader *ldr.Loader) { +// +// This function panics if the wallet has not already been loaded. +func rpcClientConnectLoop(ctx context.Context, passphrase []byte, jsonRPCServer *legacyrpc.Server, loader *ldr.Loader) { + w, ok := loader.LoadedWallet() + if !ok { + panic("rpcClientConnectLoop: called without loaded wallet") + } + certs := readCAFile() for { - chainClient, err := startChainRPC(certs) + chainClient, err := startChainRPC(ctx, certs) if err != nil { - log.Errorf("Unable to open connection to consensus RPC server: %v", err) - time.Sleep(30 * time.Second) - continue + return } - // Rather than inlining this logic directly into the loader - // callback, a function variable is used to avoid running any of - // this after the client disconnects by setting it to nil. This - // prevents the callback from associating a wallet loaded at a - // later time with a client that has already disconnected. A - // mutex is used to make this concurrent safe. - associateRPCClient := func(w *wallet.Wallet) { - w.SynchronizeRPC(chainClient) - if legacyRPCServer != nil { - legacyRPCServer.SetChainServer(chainClient) - } - loader.SetChainClient(chainClient.Client) - if cfg.EnableTicketBuyer { - err = loader.StartTicketPurchase(passphrase, &cfg.tbCfg) - if err != nil { - log.Errorf("Unable to start ticket buyer: %v", err) - } - } + n := chain.BackendFromRPCClient(chainClient.Client) + w.SetNetworkBackend(n) + if jsonRPCServer != nil { + jsonRPCServer.SetChainServer(chainClient) } - mu := new(sync.Mutex) - loader.RunAfterLoad(func(w *wallet.Wallet) { - mu.Lock() - associate := associateRPCClient - mu.Unlock() - if associate != nil { - associate(w) - } - }) + loader.SetChainClient(chainClient.Client) - chainClient.WaitForShutdown() - // The only possible err here is ErrTicketBuyerStopped, which can be - // safely ignored. - _ = loader.StopTicketPurchase() - - mu.Lock() - associateRPCClient = nil - mu.Unlock() - - loadedWallet, ok := loader.LoadedWallet() - if ok { - // Do not attempt a reconnect when the wallet was - // explicitly stopped. - if loadedWallet.ShuttingDown() { - return + if cfg.EnableTicketBuyer { + err = loader.StartTicketPurchase(passphrase, &cfg.tbCfg) + if err != nil { + log.Errorf("Unable to start ticket buyer: %v", err) } + } + + // Run wallet synchronization until it is cancelled or errors. If the + // context was cancelled, return immediately instead of trying to + // reconnect. + syncer := chain.NewRPCSyncer(w, chainClient) + err = syncer.Run(ctx, true) + if err == context.Canceled { + return + } + if err != nil { + syncLog.Errorf("Wallet synchronization stopped: %v", err) + } - // TODO: Rework the wallet so changing the RPC client - // does not require stopping and restarting everything. - loadedWallet.Stop() - loadedWallet.WaitForShutdown() - loadedWallet.Start() + // Disassociate the RPC client from all subsystems until reconnection + // occurs. + w.SetNetworkBackend(nil) + if jsonRPCServer != nil { + jsonRPCServer.SetChainServer(nil) } + loader.SetChainClient(nil) + loader.StopTicketPurchase() } } @@ -391,13 +426,13 @@ func readCAFile() []byte { // services. This function uses the RPC options from the global config and // there is no recovery in case the server is not available or if there is an // authentication error. Instead, all requests to the client will simply error. -func startChainRPC(certs []byte) (*chain.RPCClient, error) { +func startChainRPC(ctx context.Context, certs []byte) (*chain.RPCClient, error) { log.Infof("Attempting RPC client connection to %v", cfg.RPCConnect) rpcc, err := chain.NewRPCClient(activeNet.Params, cfg.RPCConnect, cfg.DcrdUsername, cfg.DcrdPassword, certs, cfg.DisableClientTLS) if err != nil { return nil, err } - err = rpcc.Start(context.TODO(), true) + err = rpcc.Start(ctx, true) return rpcc, err } diff --git a/ipc.go b/ipc.go index 56a0997ac..4b9aff6b1 100644 --- a/ipc.go +++ b/ipc.go @@ -1,4 +1,5 @@ // Copyright (c) 2016 The btcsuite developers +// Copyright (c) 2017 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -38,8 +39,5 @@ func serviceControlPipeRx(fd uintptr) { } } - select { - case shutdownRequestChannel <- struct{}{}: - default: - } + shutdownRequestChannel <- struct{}{} } diff --git a/log.go b/log.go index 41ea60b26..52b5ff06f 100644 --- a/log.go +++ b/log.go @@ -54,7 +54,7 @@ var ( loaderLog = backendLog.Logger("LODR") walletLog = backendLog.Logger("WLLT") tkbyLog = backendLog.Logger("TKBY") - chainLog = backendLog.Logger("CHNS") + syncLog = backendLog.Logger("SYNC") grpcLog = backendLog.Logger("GRPC") legacyRPCLog = backendLog.Logger("RPCS") ) @@ -65,8 +65,8 @@ func init() { wallet.UseLogger(walletLog) udb.UseLogger(walletLog) ticketbuyer.UseLogger(tkbyLog) - chain.UseLogger(chainLog) - dcrrpcclient.UseLogger(chainLog) + chain.UseLogger(syncLog) + dcrrpcclient.UseLogger(syncLog) rpcserver.UseLogger(grpcLog) legacyrpc.UseLogger(legacyRPCLog) } @@ -77,7 +77,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "LODR": loaderLog, "WLLT": walletLog, "TKBY": tkbyLog, - "CHNS": chainLog, + "SYNC": syncLog, "GRPC": grpcLog, "RPCS": legacyRPCLog, } diff --git a/rpc/legacyrpc/methods.go b/rpc/legacyrpc/methods.go index e5c23f633..e5db6bb2f 100644 --- a/rpc/legacyrpc/methods.go +++ b/rpc/legacyrpc/methods.go @@ -7,6 +7,7 @@ package legacyrpc import ( "bytes" + "context" "encoding/base64" "encoding/binary" "encoding/hex" @@ -27,6 +28,7 @@ import ( "github.com/decred/dcrd/txscript" "github.com/decred/dcrd/wire" "github.com/decred/dcrwallet/apperrors" + "github.com/decred/dcrwallet/chain" "github.com/decred/dcrwallet/loader" "github.com/decred/dcrwallet/wallet" "github.com/decred/dcrwallet/wallet/txrules" @@ -912,7 +914,8 @@ func importPrivKey(icmd interface{}, w *wallet.Wallet, chainClient *dcrrpcclient } if rescan { - w.RescanFromHeight(chainClient, scanFrom) + n := chain.BackendFromRPCClient(chainClient) + go w.RescanFromHeight(context.Background(), n, scanFrom) } return nil, err @@ -946,7 +949,8 @@ func importScript(icmd interface{}, w *wallet.Wallet, chainClient *dcrrpcclient. } if rescan { - w.RescanFromHeight(chainClient, int32(scanFrom)) + n := chain.BackendFromRPCClient(chainClient) + go w.RescanFromHeight(context.Background(), n, int32(scanFrom)) } return nil, nil @@ -2172,7 +2176,8 @@ func redeemMultiSigOuts(icmd interface{}, w *wallet.Wallet, chainClient *dcrrpcc // until the rescan completes or exits with an error. func rescanWallet(icmd interface{}, w *wallet.Wallet, chainClient *dcrrpcclient.Client) (interface{}, error) { cmd := icmd.(*dcrjson.RescanWalletCmd) - err := <-w.RescanFromHeight(chainClient, int32(*cmd.BeginHeight)) + n := chain.BackendFromRPCClient(chainClient) + err := w.RescanFromHeight(context.TODO(), n, int32(*cmd.BeginHeight)) return nil, err } @@ -3139,13 +3144,16 @@ func version(icmd interface{}, w *wallet.Wallet, chainClient *dcrrpcclient.Clien // is connected and fails to ping, the function will still return that the // daemon is disconnected. func walletInfo(icmd interface{}, w *wallet.Wallet, l *loader.Loader) (interface{}, error) { - chainClient := w.ChainClient() - connected := chainClient != nil + n, err := w.NetworkBackend() + connected := err == nil if connected { - err := chainClient.Ping() + chainClient, err := chain.RPCClientFromBackend(n) if err != nil { - log.Warnf("Ping failed on connected daemon client: %v", err) - connected = false + err := chainClient.Ping() + if err != nil { + log.Warnf("Ping failed on connected daemon client: %v", err) + connected = false + } } } diff --git a/rpc/legacyrpc/server.go b/rpc/legacyrpc/server.go index 862421b46..d56223672 100644 --- a/rpc/legacyrpc/server.go +++ b/rpc/legacyrpc/server.go @@ -201,8 +201,7 @@ func (s *Server) serve(lis net.Listener) { } // Stop gracefully shuts down the rpc server by stopping and disconnecting all -// clients, disconnecting the chain server connection, and closing the wallet's -// account files. This blocks until shutdown completes. +// clients. This blocks until shutdown completes. func (s *Server) Stop() { s.quitMtx.Lock() select { @@ -212,18 +211,6 @@ func (s *Server) Stop() { default: } - // Stop the connected wallet and chain server, if any. - wallet, ok := s.walletLoader.LoadedWallet() - if ok { - wallet.Stop() - } - s.handlerMu.Lock() - chainClient := s.chainClient - s.handlerMu.Unlock() - if chainClient != nil { - chainClient.Stop() - } - // Stop all the listeners. for _, listener := range s.listeners { err := listener.Close() @@ -237,15 +224,6 @@ func (s *Server) Stop() { close(s.quit) s.quitMtx.Unlock() - // First wait for the wallet and chain server to stop, if they - // were ever set. - if wallet != nil { - wallet.WaitForShutdown() - } - if chainClient != nil { - chainClient.WaitForShutdown() - } - // Wait for all remaining goroutines to exit. s.wg.Wait() } @@ -640,10 +618,7 @@ func (s *Server) postClientRPC(w http.ResponseWriter, r *http.Request) { } func (s *Server) requestProcessShutdown() { - select { - case s.requestShutdownChan <- struct{}{}: - default: - } + s.requestShutdownChan <- struct{}{} } // RequestProcessShutdown returns a channel that is sent to when an authorized diff --git a/rpc/rpcserver/server.go b/rpc/rpcserver/server.go index 61e0a6938..2aadb8133 100644 --- a/rpc/rpcserver/server.go +++ b/rpc/rpcserver/server.go @@ -93,6 +93,10 @@ func errorCode(err error) codes.Code { return codes.NotFound case apperrors.ErrInput: return codes.InvalidArgument + case apperrors.ErrUnsupported: + return codes.Unimplemented + case apperrors.ErrDisconnected: + return codes.Unavailable } err = e.Err @@ -271,15 +275,15 @@ func (s *walletServer) checkReady() bool { return atomic.LoadUint32(&s.ready) != 0 } -// requireChainClient checks whether the wallet has been associated with the +// requireNetworkBackend checks whether the wallet has been associated with the // consensus server RPC client, returning a gRPC error when it is not. -func (s *walletServer) requireChainClient() (*dcrrpcclient.Client, error) { - chainClient := s.wallet.ChainClient() - if chainClient == nil { +func (s *walletServer) requireNetworkBackend() (wallet.NetworkBackend, error) { + n, err := s.wallet.NetworkBackend() + if err != nil { return nil, status.Errorf(codes.FailedPrecondition, "wallet is not associated with a consensus server RPC client") } - return chainClient, nil + return n, nil } func (s *walletServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { @@ -339,7 +343,7 @@ func (s *walletServer) RenameAccount(ctx context.Context, req *pb.RenameAccountR } func (s *walletServer) Rescan(req *pb.RescanRequest, svr pb.WalletService_RescanServer) error { - chainClient, err := s.requireChainClient() + n, err := s.requireNetworkBackend() if err != nil { return err } @@ -349,35 +353,25 @@ func (s *walletServer) Rescan(req *pb.RescanRequest, svr pb.WalletService_Rescan } progress := make(chan wallet.RescanProgress, 1) - cancel := make(chan struct{}) - go s.wallet.RescanProgressFromHeight(chainClient, req.BeginHeight, progress, cancel) + go s.wallet.RescanProgressFromHeight(svr.Context(), n, req.BeginHeight, progress) - ctxDone := svr.Context().Done() - for { - select { - case p, ok := <-progress: - if !ok { - // finished or cancelled rescan without error - select { - case <-cancel: - return status.Errorf(codes.Canceled, "rescan canceled") - default: - return nil - } - } - if p.Err != nil { - return translateError(p.Err) - } - resp := &pb.RescanResponse{RescannedThrough: p.ScannedThrough} - err := svr.Send(resp) - if err != nil { - return translateError(err) - } - case <-ctxDone: - close(cancel) - ctxDone = nil + for p := range progress { + if p.Err != nil { + return translateError(p.Err) + } + resp := &pb.RescanResponse{RescannedThrough: p.ScannedThrough} + err := svr.Send(resp) + if err != nil { + return translateError(err) } } + // finished or cancelled rescan without error + select { + case <-svr.Context().Done(): + return status.Errorf(codes.Canceled, "rescan canceled") + default: + return nil + } } func (s *walletServer) NextAccount(ctx context.Context, req *pb.NextAccountRequest) ( @@ -496,7 +490,7 @@ func (s *walletServer) ImportPrivateKey(ctx context.Context, req *pb.ImportPriva "Passed a rescan height without rescan set") } - chainClient, err := s.requireChainClient() + n, err := s.requireNetworkBackend() if err != nil { return nil, err } @@ -507,7 +501,7 @@ func (s *walletServer) ImportPrivateKey(ctx context.Context, req *pb.ImportPriva } if req.Rescan { - s.wallet.RescanFromHeight(chainClient, req.ScanFrom) + go s.wallet.RescanFromHeight(context.Background(), n, req.ScanFrom) } return &pb.ImportPrivateKeyResponse{}, nil @@ -561,7 +555,7 @@ func (s *walletServer) ImportScript(ctx context.Context, "Passed a rescan height without rescan set") } - chainClient, err := s.requireChainClient() + n, err := s.requireNetworkBackend() if err != nil { return nil, err } @@ -572,7 +566,7 @@ func (s *walletServer) ImportScript(ctx context.Context, } if req.Rescan { - s.wallet.RescanFromHeight(chainClient, req.ScanFrom) + go s.wallet.RescanFromHeight(context.Background(), n, req.ScanFrom) } p2sh, err := dcrutil.NewAddressScriptHash(req.Script, s.wallet.ChainParams()) @@ -610,34 +604,39 @@ func (s *walletServer) Balance(ctx context.Context, req *pb.BalanceRequest) ( func (s *walletServer) TicketPrice(ctx context.Context, req *pb.TicketPriceRequest) (*pb.TicketPriceResponse, error) { - chainClient, err := s.requireChainClient() + n, err := s.requireNetworkBackend() if err != nil { return nil, err } - - tp, err := s.wallet.StakeDifficulty() + chainClient, err := chain.RPCClientFromBackend(n) if err != nil { - return nil, status.Errorf(codes.FailedPrecondition, - "Failed to query stake difficulty: %s", err.Error()) + return nil, translateError(err) } + ticketPrice, err := s.wallet.StakeDifficulty() + if err != nil { + return nil, translateError(err) + } _, blockHeight, err := chainClient.GetBestBlock() if err != nil { - return nil, status.Errorf(codes.FailedPrecondition, - "Failed to query block height: %s", err.Error()) + return nil, translateError(err) } return &pb.TicketPriceResponse{ - TicketPrice: int64(tp), + TicketPrice: int64(ticketPrice), Height: int32(blockHeight), }, nil } func (s *walletServer) StakeInfo(ctx context.Context, req *pb.StakeInfoRequest) (*pb.StakeInfoResponse, error) { - chainClient, err := s.requireChainClient() + n, err := s.requireNetworkBackend() if err != nil { return nil, err } + chainClient, err := chain.RPCClientFromBackend(n) + if err != nil { + return nil, translateError(err) + } si, err := s.wallet.StakeInfo(chainClient) if err != nil { @@ -1067,7 +1066,7 @@ func (s *walletServer) CreateSignature(ctx context.Context, req *pb.CreateSignat func (s *walletServer) PublishTransaction(ctx context.Context, req *pb.PublishTransactionRequest) ( *pb.PublishTransactionResponse, error) { - client, err := s.requireChainClient() + n, err := s.requireNetworkBackend() if err != nil { return nil, err } @@ -1079,7 +1078,7 @@ func (s *walletServer) PublishTransaction(ctx context.Context, req *pb.PublishTr "Bytes do not represent a valid raw transaction: %v", err) } - txHash, err := s.wallet.PublishTransaction(&msgTx, req.SignedTransaction, client) + txHash, err := s.wallet.PublishTransaction(&msgTx, req.SignedTransaction, n) if err != nil { return nil, translateError(err) } @@ -1174,10 +1173,14 @@ func (s *walletServer) PurchaseTickets(ctx context.Context, } func (s *walletServer) RevokeTickets(ctx context.Context, req *pb.RevokeTicketsRequest) (*pb.RevokeTicketsResponse, error) { - chainClient, err := s.requireChainClient() + n, err := s.requireNetworkBackend() if err != nil { return nil, err } + chainClient, err := chain.RPCClientFromBackend(n) + if err != nil { + return nil, translateError(err) + } lock := make(chan time.Time, 1) defer func() { @@ -1198,12 +1201,12 @@ func (s *walletServer) RevokeTickets(ctx context.Context, req *pb.RevokeTicketsR func (s *walletServer) LoadActiveDataFilters(ctx context.Context, req *pb.LoadActiveDataFiltersRequest) ( *pb.LoadActiveDataFiltersResponse, error) { - chainClient, err := s.requireChainClient() + n, err := s.requireNetworkBackend() if err != nil { return nil, err } - err = s.wallet.LoadActiveDataFilters(chainClient) + err = s.wallet.LoadActiveDataFilters(n) if err != nil { return nil, translateError(err) } @@ -1685,9 +1688,12 @@ func (s *loaderServer) StartConsensusRpc(ctx context.Context, req *pb.StartConse // Error if the wallet is already syncing with the network. wallet, walletLoaded := s.loader.LoadedWallet() - if walletLoaded && wallet.SynchronizingToNetwork() { - return nil, status.Errorf(codes.FailedPrecondition, - "wallet is loaded and already synchronizing") + if walletLoaded { + _, err := wallet.NetworkBackend() + if err == nil { + return nil, status.Errorf(codes.FailedPrecondition, + "wallet is loaded and already synchronizing") + } } rpcClient, err := chain.NewRPCClient(s.activeNet.Params, networkAddress, req.Username, @@ -1743,7 +1749,8 @@ func (s *loaderServer) DiscoverAddresses(ctx context.Context, req *pb.DiscoverAd } } - err := wallet.DiscoverActiveAddresses(chainClient.Client, req.DiscoverAccounts) + n := chain.BackendFromRPCClient(chainClient.Client) + err := wallet.DiscoverActiveAddresses(n, req.DiscoverAccounts) if err != nil { return nil, translateError(err) } @@ -1771,7 +1778,14 @@ func (s *loaderServer) SubscribeToBlockNotifications(ctx context.Context, req *p return nil, translateError(err) } - wallet.AssociateConsensusRPC(chainClient) + // TODO: instead of running the syncer in the background indefinitely, + // deprecate this RPC and introduce two new RPCs, one to subscribe to the + // notifications and one to perform the synchronization task. This would be + // a backwards-compatible way to improve error handling and provide more + // control over how long the synchronization task runs. + syncer := chain.NewRPCSyncer(wallet, chainClient) + go syncer.Run(context.Background(), false) + wallet.SetNetworkBackend(chain.BackendFromRPCClient(chainClient.Client)) return &pb.SubscribeToBlockNotificationsResponse{}, nil } @@ -1790,9 +1804,10 @@ func (s *loaderServer) FetchHeaders(ctx context.Context, req *pb.FetchHeadersReq if chainClient == nil { return nil, status.Errorf(codes.FailedPrecondition, "Consensus server RPC client has not been loaded") } + n := chain.BackendFromRPCClient(chainClient.Client) fetchedHeaderCount, rescanFrom, rescanFromHeight, - mainChainTipBlockHash, mainChainTipBlockHeight, err := wallet.FetchHeaders(chainClient.Client) + mainChainTipBlockHash, mainChainTipBlockHeight, err := wallet.FetchHeaders(n) if err != nil { return nil, translateError(err) } diff --git a/signal.go b/signal.go index 79a4002bf..872418330 100644 --- a/signal.go +++ b/signal.go @@ -5,6 +5,7 @@ package main import ( + "context" "os" "os/signal" ) @@ -13,80 +14,58 @@ import ( // subsystems using the same code paths as when an interrupt signal is received. var shutdownRequestChannel = make(chan struct{}) -// interruptChannel is used to receive SIGINT (Ctrl+C) signals. -var interruptChannel chan os.Signal - -// addHandlerChannel is used to add an interrupt handler to the list of handlers -// to be invoked on SIGINT (Ctrl+C) signals. -var addHandlerChannel = make(chan func()) - -// interruptHandlersDone is closed after all interrupt handlers run the first -// time an interrupt is signaled. -var interruptHandlersDone = make(chan struct{}) - -var simulateInterruptChannel = make(chan struct{}, 1) +// shutdownSignaled is closed whenever shutdown is invoked through an interrupt +// signal or from an JSON-RPC stop request. Any contexts created using +// withShutdownChannel are cancelled when this is closed. +var shutdownSignaled = make(chan struct{}) // signals defines the signals that are handled to do a clean shutdown. // Conditional compilation is used to also include SIGTERM on Unix. var signals = []os.Signal{os.Interrupt} -// simulateInterrupt requests invoking the clean termination process by an -// internal component instead of a SIGINT. -func simulateInterrupt() { - select { - case simulateInterruptChannel <- struct{}{}: - default: - } +// withShutdownCancel creates a copy of a context that is cancelled whenever +// shutdown is invoked through an interrupt signal or from an JSON-RPC stop +// request. +func withShutdownCancel(ctx context.Context) context.Context { + ctx, cancel := context.WithCancel(ctx) + go func() { + <-shutdownSignaled + cancel() + }() + return ctx } -// mainInterruptHandler listens for SIGINT (Ctrl+C) signals on the -// interruptChannel and invokes the registered interruptCallbacks accordingly. -// It also listens for callback registration. It must be run as a goroutine. -func mainInterruptHandler() { - // interruptCallbacks is a list of callbacks to invoke when a - // SIGINT (Ctrl+C) is received. - var interruptCallbacks []func() - invokeCallbacks := func() { - // run handlers in LIFO order. - for i := range interruptCallbacks { - idx := len(interruptCallbacks) - 1 - i - interruptCallbacks[idx]() - } - close(interruptHandlersDone) +// requestShutdown signals for starting the clean shutdown of the process +// through an internal component (such as through the JSON-RPC stop request). +func requestShutdown() { + shutdownRequestChannel <- struct{}{} +} + +// shutdownListener listens for shutdown requests and cancels all contexts +// created from withShutdownCancel. This function never returns and is intended +// to be spawned in a new goroutine. +func shutdownListener() { + interruptChannel := make(chan os.Signal, 1) + signal.Notify(interruptChannel, signals...) + + // Listen for the initial shutdown signal + select { + case sig := <-interruptChannel: + log.Infof("Received signal (%s). Shutting down...", sig) + case <-shutdownRequestChannel: + log.Info("Shutdown requested. Shutting down...") } + // Cancel all contexts created from withShutdownCancel. + close(shutdownSignaled) + + // Listen for any more shutdown signals and log that shutdown has already + // been signaled. for { select { - case sig := <-interruptChannel: - log.Infof("Received signal (%s). Shutting down...", sig) - invokeCallbacks() - return + case <-interruptChannel: case <-shutdownRequestChannel: - log.Info("Shutdown requested. Shutting down...") - invokeCallbacks() - return - - case <-simulateInterruptChannel: - log.Info("Received shutdown request. Shutting down...") - invokeCallbacks() - return - - case handler := <-addHandlerChannel: - interruptCallbacks = append(interruptCallbacks, handler) } + log.Info("Shutdown signaled. Already shutting down...") } } - -// addInterruptHandler adds a handler to call when a SIGINT (Ctrl+C) is -// received. -func addInterruptHandler(handler func()) { - // Create the channel and start the main interrupt handler which invokes - // all other callbacks and exits if not already done. - if interruptChannel == nil { - interruptChannel = make(chan os.Signal, 1) - signal.Notify(interruptChannel, signals...) - go mainInterruptHandler() - } - - addHandlerChannel <- handler -} diff --git a/wallet/addresses.go b/wallet/addresses.go index 0f34b163a..d9881963b 100644 --- a/wallet/addresses.go +++ b/wallet/addresses.go @@ -5,6 +5,8 @@ package wallet import ( + "context" + "github.com/decred/dcrd/chaincfg" "github.com/decred/dcrd/dcrutil" "github.com/decred/dcrd/hdkeychain" @@ -198,8 +200,8 @@ func (w *Wallet) nextAddress(persist persistReturnedChildFunc, account, branch u if alb.cursor%uint32(w.gapLimit) != 0 { break } - chainClient := w.ChainClient() - if chainClient == nil { + n, err := w.NetworkBackend() + if err != nil { break } addrs, err := deriveChildAddresses(alb.branchXpub, @@ -207,7 +209,7 @@ func (w *Wallet) nextAddress(persist persistReturnedChildFunc, account, branch u if err != nil { return nil, err } - err = chainClient.LoadTxFilter(false, addrs, nil) + err = n.LoadTxFilter(context.TODO(), false, addrs, nil) if err != nil { return nil, err } @@ -289,7 +291,7 @@ func (w *Wallet) watchFutureAddresses(dbtx walletdb.ReadTx) error { gapLimit := uint32(w.gapLimit) - client, err := w.requireChainClient() + n, err := w.NetworkBackend() if err != nil { return err } @@ -380,7 +382,7 @@ func (w *Wallet) watchFutureAddresses(dbtx walletdb.ReadTx) error { } go func() { - errs <- client.LoadTxFilter(false, addrs, nil) + errs <- n.LoadTxFilter(context.TODO(), false, addrs, nil) }() } @@ -479,7 +481,7 @@ func (w *Wallet) ExtendWatchedAddresses(account, branch, child uint32) error { return err } - if client := w.ChainClient(); client != nil { + if n, err := w.NetworkBackend(); err == nil { gapLimit := uint32(w.gapLimit) lastWatched := lastUsed + gapLimit if child <= lastWatched { @@ -492,7 +494,7 @@ func (w *Wallet) ExtendWatchedAddresses(account, branch, child uint32) error { if err != nil { return err } - err = client.LoadTxFilter(false, addrs, nil) + err = n.LoadTxFilter(context.TODO(), false, addrs, nil) if err != nil { return err } diff --git a/wallet/chainntfns.go b/wallet/chainntfns.go index 808063262..3016cc68f 100644 --- a/wallet/chainntfns.go +++ b/wallet/chainntfns.go @@ -7,6 +7,7 @@ package wallet import ( "bytes" + "context" "errors" "fmt" "time" @@ -17,90 +18,11 @@ import ( "github.com/decred/dcrd/txscript" "github.com/decred/dcrd/wire" "github.com/decred/dcrwallet/apperrors" - "github.com/decred/dcrwallet/chain" "github.com/decred/dcrwallet/wallet/txrules" "github.com/decred/dcrwallet/wallet/udb" "github.com/decred/dcrwallet/walletdb" ) -func (w *Wallet) handleConsensusRPCNotifications(chainClient *chain.RPCClient) { - for n := range chainClient.Notifications() { - var notificationName string - var err error - switch n := n.(type) { - case chain.ClientConnected: - log.Infof("The client has successfully connected to dcrd and " + - "is now handling websocket notifications") - case chain.BlockConnected: - notificationName = "blockconnected" - err = w.onBlockConnected(n.BlockHeader, n.Transactions) - if err == nil { - err = walletdb.View(w.db, func(tx walletdb.ReadTx) error { - return w.watchFutureAddresses(tx) - }) - } - case chain.Reorganization: - notificationName = "reorganizing" - err = w.handleReorganizing(n.OldHash, n.NewHash, n.OldHeight, n.NewHeight) - case chain.RelevantTxAccepted: - notificationName = "relevanttxaccepted" - err = walletdb.Update(w.db, func(dbtx walletdb.ReadWriteTx) error { - return w.processSerializedTransaction(dbtx, n.Transaction, nil, nil) - }) - if err == nil { - err = walletdb.View(w.db, func(tx walletdb.ReadTx) error { - return w.watchFutureAddresses(tx) - }) - } - case chain.MissedTickets: - notificationName = "spentandmissedtickets" - err = w.handleMissedTickets(n.BlockHash, int32(n.BlockHeight), n.Tickets) - } - if err != nil { - log.Errorf("Failed to process consensus server notification "+ - "(name: `%s`, detail: `%v`)", notificationName, err) - } - } -} - -// AssociateConsensusRPC associates the wallet with the consensus JSON-RPC -// server and begins handling all notifications in a background goroutine. Any -// previously associated client, if it is a different instance than the passed -// client, is stopped. -func (w *Wallet) AssociateConsensusRPC(chainClient *chain.RPCClient) { - w.chainClientLock.Lock() - defer w.chainClientLock.Unlock() - if w.chainClient != nil { - if w.chainClient != chainClient { - w.chainClient.Stop() - } - } - - w.chainClient = chainClient - - w.wg.Add(1) - go func() { - w.handleConsensusRPCNotifications(chainClient) - w.wg.Done() - }() -} - -// handleChainNotifications is the major chain notification handler that -// receives websocket notifications about the blockchain. -func (w *Wallet) handleChainNotifications(chainClient *chain.RPCClient) { - // At the moment there is no recourse if the rescan fails for - // some reason, however, the wallet will not be marked synced - // and many methods will error early since the wallet is known - // to be out of date. - err := w.syncWithChain(chainClient.Client) - if err != nil && !w.ShuttingDown() { - log.Warnf("Unable to synchronize wallet to chain: %v", err) - } - - w.handleConsensusRPCNotifications(chainClient) - w.wg.Done() -} - func (w *Wallet) extendMainChain(dbtx walletdb.ReadWriteTx, block *udb.BlockHeaderData, transactions [][]byte) error { txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey) @@ -211,11 +133,23 @@ func copyHeaderSliceToArray(array *udb.RawBlockHeader, slice []byte) error { return nil } -// onBlockConnected is the entry point for processing chain server -// blockconnected notifications. -func (w *Wallet) onBlockConnected(serializedBlockHeader []byte, transactions [][]byte) error { +// ConnectBlock attaches a block and relevant wallet transactions to the +// wallet's main chain or side chain depending on whether the wallet is +// reorganizing. +func (w *Wallet) ConnectBlock(serializedBlockHeader []byte, transactions [][]byte) (err error) { + defer func() { + if err == nil { + err := walletdb.View(w.db, func(tx walletdb.ReadTx) error { + return w.watchFutureAddresses(tx) + }) + if err != nil { + log.Errorf("Failed to watch for future address usage: %v", err) + } + } + }() + var blockHeader wire.BlockHeader - err := blockHeader.Deserialize(bytes.NewReader(serializedBlockHeader)) + err = blockHeader.Deserialize(bytes.NewReader(serializedBlockHeader)) if err != nil { return err } @@ -299,10 +233,10 @@ func (w *Wallet) onBlockConnected(serializedBlockHeader []byte, transactions [][ return nil } -// handleReorganizing handles a blockchain reorganization notification. It -// sets the chain server to indicate that currently the wallet state is in -// reorganizing, and what the final block of the reorganization is by hash. -func (w *Wallet) handleReorganizing(oldHash, newHash *chainhash.Hash, oldHeight, newHeight int64) error { +// StartReorganize sets the wallet to a reorganizing state where all attached +// blocks will attach to a sidechain until the final block is reached, at which +// point a chain switch occurs. +func (w *Wallet) StartReorganize(oldHash, newHash *chainhash.Hash, oldHeight, newHeight int64) error { w.reorganizingLock.Lock() if w.reorganizing { reorganizeToHash := w.reorganizeToHash @@ -403,6 +337,22 @@ func (w *Wallet) evaluateStakePoolTicket(rec *udb.TxRecord, return true, nil } +// AcceptMempoolTx adds a relevant unmined transaction to the wallet. +func (w *Wallet) AcceptMempoolTx(serializedTx []byte) error { + err := walletdb.Update(w.db, func(dbtx walletdb.ReadWriteTx) error { + return w.processSerializedTransaction(dbtx, serializedTx, nil, nil) + }) + if err == nil { + err := walletdb.View(w.db, func(tx walletdb.ReadTx) error { + return w.watchFutureAddresses(tx) + }) + if err != nil { + log.Errorf("Failed to watch for future address usage: %v", err) + } + } + return err +} + func (w *Wallet) processSerializedTransaction(dbtx walletdb.ReadWriteTx, serializedTx []byte, serializedHeader *udb.RawBlockHeader, blockMeta *udb.BlockMeta) error { @@ -655,10 +605,10 @@ func (w *Wallet) processTransactionRecord(dbtx walletdb.ReadWriteTx, rec *udb.Tx return err } } else { - chainClient := w.ChainClient() - if chainClient != nil { - err := chainClient.LoadTxFilter(false, - []dcrutil.Address{mscriptaddr.Address()}, nil) + if n, err := w.NetworkBackend(); err == nil { + addr := mscriptaddr.Address() + err := n.LoadTxFilter(context.TODO(), + false, []dcrutil.Address{addr}, nil) if err != nil { return err } @@ -822,26 +772,6 @@ func (w *Wallet) processTransactionRecord(dbtx walletdb.ReadWriteTx, rec *udb.Tx return nil } -func (w *Wallet) handleChainVotingNotifications(chainClient *chain.RPCClient) { - for n := range chainClient.NotificationsVoting() { - var err error - strErrType := "" - - switch n := n.(type) { - case chain.WinningTickets: - err = w.handleWinningTickets(n.BlockHash, int32(n.BlockHeight), n.Tickets) - strErrType = "WinningTickets" - default: - err = fmt.Errorf("voting handler received unknown ntfn type") - } - if err != nil { - log.Errorf("Cannot handle chain server voting "+ - "notification %v: %v", strErrType, err) - } - } - w.wg.Done() -} - // selectOwnedTickets returns a slice of tickets hashes from the tickets // argument that are owned by the wallet. // @@ -857,16 +787,18 @@ func selectOwnedTickets(w *Wallet, dbtx walletdb.ReadTx, tickets []*chainhash.Ha return owned } -// handleWinningTickets receives a list of hashes and some block information -// and submits it to the wstakemgr to handle SSGen production. -func (w *Wallet) handleWinningTickets(blockHash *chainhash.Hash, - blockHeight int32, winningTicketHashes []*chainhash.Hash) error { +// VoteOnOwnedTickets creates and publishes vote transactions for all owned +// tickets in the winningTicketHashes slice if wallet voting is enabled. The +// vote is only valid when voting on the block described by the passed block +// hash and height. +func (w *Wallet) VoteOnOwnedTickets(winningTicketHashes []*chainhash.Hash, + blockHash *chainhash.Hash, blockHeight int32) error { if !w.votingEnabled || blockHeight < int32(w.chainParams.StakeValidationHeight)-1 { return nil } - chainClient, err := w.requireChainClient() + n, err := w.NetworkBackend() if err != nil { return err } @@ -950,8 +882,7 @@ func (w *Wallet) handleWinningTickets(blockHash *chainhash.Hash, if err != nil { return err } - _, err = chainClient.SendRawTransaction(vote, true) - return err + return n.PublishTransaction(context.TODO(), vote) }) if err != nil { log.Errorf("Failed to send vote for ticket hash %v: %v", @@ -966,16 +897,10 @@ func (w *Wallet) handleWinningTickets(blockHash *chainhash.Hash, return nil } -// handleMissedTickets receives a list of hashes and some block information -// and submits it to the wstakemgr to handle SSRtx production. -func (w *Wallet) handleMissedTickets(blockHash *chainhash.Hash, blockHeight int32, - missedTicketHashes []*chainhash.Hash) error { - - if blockHeight < int32(w.chainParams.StakeValidationHeight)-1 { - return nil - } - - chainClient, err := w.requireChainClient() +// RevokeOwnedTickets revokes any owned tickets specified in the +// missedTicketHashes slice. +func (w *Wallet) RevokeOwnedTickets(missedTicketHashes []*chainhash.Hash) error { + n, err := w.NetworkBackend() if err != nil { return err } @@ -1029,6 +954,11 @@ func (w *Wallet) handleMissedTickets(blockHash *chainhash.Hash, blockHeight int3 ticketHash, err) continue } + err = w.checkHighFees(dcrutil.Amount(ticketPurchase.TxOut[0].Value), revocation) + if err != nil { + log.Errorf("Revocation pays exceedingly high fees") + continue + } revocations[i] = revocation } return nil @@ -1053,8 +983,7 @@ func (w *Wallet) handleMissedTickets(blockHash *chainhash.Hash, blockHeight int3 if err != nil { return err } - _, err = chainClient.SendRawTransaction(revocation, true) - return err + return n.PublishTransaction(context.TODO(), revocation) }) if err != nil { log.Errorf("Failed to send revocation %v for ticket hash %v: %v", diff --git a/wallet/createtx.go b/wallet/createtx.go index bd54eb078..a567c8a93 100644 --- a/wallet/createtx.go +++ b/wallet/createtx.go @@ -6,10 +6,10 @@ package wallet import ( + "context" "encoding/binary" "errors" "fmt" - dcrrpcclient "github.com/decred/dcrd/rpcclient" "time" "github.com/decred/dcrd/blockchain" @@ -364,18 +364,30 @@ func (w *Wallet) insertMultisigOutIntoTxMgr(ns walletdb.ReadWriteBucket, msgTx * return w.TxStore.AddMultisigOut(ns, rec, nil, index) } +// checkHighFees performs a high fee check if enabled and possible, returning an +// error if the transaction pays high fees. +func (w *Wallet) checkHighFees(totalInput dcrutil.Amount, tx *wire.MsgTx) error { + if w.AllowHighFees { + return nil + } + if !txrules.PaysHighFees(totalInput, tx) { + return nil + } + return apperrors.New(apperrors.ErrHighFees, "transaction pays exceedingly high fees") +} + // txToOutputs creates a transaction, selecting previous outputs from an account // with no less than minconf confirmations, and creates a signed transaction // that pays to each of the outputs. func (w *Wallet) txToOutputs(outputs []*wire.TxOut, account uint32, minconf int32, randomizeChangeIdx bool) (*txauthor.AuthoredTx, error) { - chainClient, err := w.requireChainClient() + n, err := w.NetworkBackend() if err != nil { return nil, err } - return w.txToOutputsInternal(outputs, account, minconf, chainClient, + return w.txToOutputsInternal(outputs, account, minconf, n, randomizeChangeIdx, w.RelayFee()) } @@ -391,7 +403,7 @@ func (w *Wallet) txToOutputs(outputs []*wire.TxOut, account uint32, minconf int3 // into the database, rather than delegating this work to the caller as // btcwallet does. func (w *Wallet) txToOutputsInternal(outputs []*wire.TxOut, account uint32, minconf int32, - chainClient *dcrrpcclient.Client, randomizeChangeIdx bool, txFee dcrutil.Amount) (*txauthor.AuthoredTx, error) { + n NetworkBackend, randomizeChangeIdx bool, txFee dcrutil.Amount) (*txauthor.AuthoredTx, error) { var atx *txauthor.AuthoredTx var changeSourceUpdates []func(walletdb.ReadWriteTx) error @@ -445,6 +457,11 @@ func (w *Wallet) txToOutputsInternal(outputs []*wire.TxOut, account uint32, minc " %v from imported account into default account.", changeAmount) } + err = w.checkHighFees(atx.TotalInput, atx.Tx) + if err != nil { + return nil, err + } + rec, err := udb.NewTxRecordFromMsgTx(atx.Tx, time.Now()) if err != nil { return nil, err @@ -467,8 +484,7 @@ func (w *Wallet) txToOutputsInternal(outputs []*wire.TxOut, account uint32, minc return err } - _, err = chainClient.SendRawTransaction(atx.Tx, w.AllowHighFees) - return err + return n.PublishTransaction(context.TODO(), atx.Tx) }) if err != nil { return nil, err @@ -517,7 +533,7 @@ func (w *Wallet) txToMultisigInternal(dbtx walletdb.ReadWriteTx, account uint32, return nil, nil, nil, err } - chainClient, err := w.requireChainClient() + n, err := w.NetworkBackend() if err != nil { return txToMultisigError(err) } @@ -627,7 +643,12 @@ func (w *Wallet) txToMultisigInternal(dbtx walletdb.ReadWriteTx, account uint32, return txToMultisigError(err) } - _, err = chainClient.SendRawTransaction(msgtx, w.AllowHighFees) + err = w.checkHighFees(totalInput, msgtx) + if err != nil { + return txToMultisigError(err) + } + + err = n.PublishTransaction(context.TODO(), msgtx) if err != nil { return txToMultisigError(err) } @@ -636,7 +657,7 @@ func (w *Wallet) txToMultisigInternal(dbtx walletdb.ReadWriteTx, account uint32, // script hash address. utilAddrs := make([]dcrutil.Address, 1) utilAddrs[0] = scAddr - err = chainClient.LoadTxFilter(false, []dcrutil.Address{scAddr}, nil) + err = n.LoadTxFilter(context.TODO(), false, []dcrutil.Address{scAddr}, nil) if err != nil { return txToMultisigError(err) } @@ -699,7 +720,7 @@ func (w *Wallet) compressWalletInternal(dbtx walletdb.ReadWriteTx, maxNumIns int addrmgrNs := dbtx.ReadWriteBucket(waddrmgrNamespaceKey) txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey) - chainClient, err := w.requireChainClient() + n, err := w.NetworkBackend() if err != nil { return nil, err } @@ -785,7 +806,12 @@ func (w *Wallet) compressWalletInternal(dbtx walletdb.ReadWriteTx, maxNumIns int return nil, err } - txSha, err := chainClient.SendRawTransaction(msgtx, w.AllowHighFees) + err = w.checkHighFees(totalAdded, msgtx) + if err != nil { + return nil, err + } + + err = n.PublishTransaction(context.TODO(), msgtx) if err != nil { return nil, err } @@ -800,9 +826,10 @@ func (w *Wallet) compressWalletInternal(dbtx walletdb.ReadWriteTx, maxNumIns int return nil, err } - log.Infof("Successfully consolidated funds in transaction %v", txSha) + txHash := msgtx.TxHash() + log.Infof("Successfully consolidated funds in transaction %v", &txHash) - return txSha, nil + return &txHash, nil } // makeTicket creates a ticket from a split transaction output. It can optionally @@ -921,7 +948,7 @@ func makeTicket(params *chaincfg.Params, inputPool *extendedOutPoint, // greater than or equal to 0, tickets that cost more than that limit will // return an error that not enough funds are available. func (w *Wallet) purchaseTickets(req purchaseTicketRequest) ([]*chainhash.Hash, error) { - chainClient, err := w.requireChainClient() + n, err := w.NetworkBackend() if err != nil { return nil, err } @@ -973,12 +1000,8 @@ func (w *Wallet) purchaseTickets(req purchaseTicketRequest) ([]*chainhash.Hash, // address this better and prevent address burning. account := req.account - // Get the current ticket price from the daemon. - ticketPricesF64, err := w.ChainClient().GetStakeDifficulty() - if err != nil { - return nil, err - } - ticketPrice, err := dcrutil.NewAmount(ticketPricesF64.NextStakeDifficulty) + // Get the current ticket price. + ticketPrice, err := n.StakeDifficulty(context.TODO()) if err != nil { return nil, err } @@ -1111,7 +1134,7 @@ func (w *Wallet) purchaseTickets(req purchaseTicketRequest) ([]*chainhash.Hash, txFeeIncrement = w.RelayFee() } splitTx, err := w.txToOutputsInternal(splitOuts, account, req.minConf, - chainClient, false, txFeeIncrement) + n, false, txFeeIncrement) if err != nil { return nil, fmt.Errorf("failed to send split transaction: %v", err) } @@ -1243,6 +1266,11 @@ func (w *Wallet) purchaseTickets(req purchaseTicketRequest) ([]*chainhash.Hash, return ticketHashes, err } + err = w.checkHighFees(dcrutil.Amount(eop.amt), ticket) + if err != nil { + return nil, err + } + rec, err := udb.NewTxRecordFromMsgTx(ticket, time.Now()) if err != nil { return ticketHashes, err @@ -1250,19 +1278,18 @@ func (w *Wallet) purchaseTickets(req purchaseTicketRequest) ([]*chainhash.Hash, // Open a DB update to insert and publish the transaction. If // publishing fails, the update is rolled back. - var ticketHash *chainhash.Hash err = walletdb.Update(w.db, func(dbtx walletdb.ReadWriteTx) error { err = w.processTransactionRecord(dbtx, rec, nil, nil) if err != nil { return err } - ticketHash, err = chainClient.SendRawTransaction(ticket, w.AllowHighFees) - return err + return n.PublishTransaction(context.TODO(), ticket) }) if err != nil { return ticketHashes, err } - ticketHashes = append(ticketHashes, ticketHash) + ticketHash := ticket.TxHash() + ticketHashes = append(ticketHashes, &ticketHash) log.Infof("Successfully sent SStx purchase transaction %v", ticketHash) } diff --git a/wallet/network.go b/wallet/network.go new file mode 100644 index 000000000..c1d5ea18e --- /dev/null +++ b/wallet/network.go @@ -0,0 +1,55 @@ +// Copyright (c) 2017 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wallet + +import ( + "context" + + "github.com/decred/dcrd/chaincfg/chainhash" + "github.com/decred/dcrd/dcrutil" + "github.com/decred/dcrd/wire" + "github.com/decred/dcrwallet/apperrors" + "github.com/jrick/bitset" +) + +// NetworkBackend provides wallets with Decred network functionality. Some +// wallet operations require the wallet to be associated with a network backend +// to complete. +type NetworkBackend interface { + // Should be no issue for spv + GetHeaders(ctx context.Context, blockLocators []chainhash.Hash, hashStop *chainhash.Hash) ([][]byte, error) + LoadTxFilter(ctx context.Context, reload bool, addrs []dcrutil.Address, outpoints []wire.OutPoint) error + PublishTransaction(ctx context.Context, tx *wire.MsgTx) error + + // Tricky but not impossible for spv + AddressesUsed(ctx context.Context, addrs []dcrutil.Address) (bitset.Bytes, error) + Rescan(ctx context.Context, blocks []chainhash.Hash) ([]*RescannedBlock, error) + + // TODO: these should be known directly by the wallet. + StakeDifficulty(ctx context.Context) (dcrutil.Amount, error) + + // TODO: only used to work around a hack for broken getheaders json-rpc + GetBlockHash(ctx context.Context, height int32) (*chainhash.Hash, error) +} + +// NetworkBackend returns the currently associated network backend of the +// wallet, or an error if the no backend is currently set. +func (w *Wallet) NetworkBackend() (NetworkBackend, error) { + w.networkBackendMu.Lock() + n := w.networkBackend + w.networkBackendMu.Unlock() + if n == nil { + return nil, apperrors.New(apperrors.ErrDisconnected, "no network backend set") + } + return n, nil +} + +// SetNetworkBackend sets the network backend used by various functions of the +// wallet. +func (w *Wallet) SetNetworkBackend(n NetworkBackend) { + w.networkBackendMu.Lock() + w.networkBackend = n + w.networkBackendMu.Unlock() +} diff --git a/wallet/rescan.go b/wallet/rescan.go index e1d13f3dc..200a8e804 100644 --- a/wallet/rescan.go +++ b/wallet/rescan.go @@ -6,16 +6,22 @@ package wallet import ( - "encoding/hex" + "context" "github.com/decred/dcrd/chaincfg/chainhash" - dcrrpcclient "github.com/decred/dcrd/rpcclient" "github.com/decred/dcrwallet/wallet/udb" "github.com/decred/dcrwallet/walletdb" ) const maxBlocksPerRescan = 2000 +// RescannedBlock models the relevant data returned during a rescan from a +// single block. +type RescannedBlock struct { + BlockHash chainhash.Hash + Transactions [][]byte +} + // TODO: track whether a rescan is already in progress, and cancel either it or // this new rescan, keeping the one that still has the most blocks to scan. @@ -23,16 +29,16 @@ const maxBlocksPerRescan = 2000 // startHash and height up through the recorded main chain tip block. The // progress channel, if non-nil, is sent non-error progress notifications with // the heights the rescan has completed through, starting with the start height. -func (w *Wallet) rescan(chainClient *dcrrpcclient.Client, startHash *chainhash.Hash, height int32, - p chan<- RescanProgress, cancel <-chan struct{}) error { +func (w *Wallet) rescan(ctx context.Context, n NetworkBackend, + startHash *chainhash.Hash, height int32, p chan<- RescanProgress) error { blockHashStorage := make([]chainhash.Hash, maxBlocksPerRescan) rescanFrom := *startHash inclusive := true for { select { - case <-cancel: - return nil + case <-ctx.Done(): + return ctx.Err() default: } @@ -54,24 +60,20 @@ func (w *Wallet) rescan(chainClient *dcrrpcclient.Client, startHash *chainhash.H scanningThrough := height + int32(len(rescanBlocks)) - 1 log.Infof("Rescanning blocks %v-%v...", height, scanningThrough) - rescanResults, err := chainClient.Rescan(rescanBlocks) + rescanResults, err := n.Rescan(ctx, rescanBlocks) if err != nil { return err } var rawBlockHeader udb.RawBlockHeader err = walletdb.Update(w.db, func(dbtx walletdb.ReadWriteTx) error { txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey) - for _, r := range rescanResults.DiscoveredData { - blockHash, err := chainhash.NewHashFromStr(r.Hash) - if err != nil { - return err - } - blockMeta, err := w.TxStore.GetBlockMetaForHash(txmgrNs, blockHash) + for _, r := range rescanResults { + blockMeta, err := w.TxStore.GetBlockMetaForHash(txmgrNs, &r.BlockHash) if err != nil { return err } serHeader, err := w.TxStore.GetSerializedBlockHeader(txmgrNs, - blockHash) + &r.BlockHash) if err != nil { return err } @@ -80,12 +82,8 @@ func (w *Wallet) rescan(chainClient *dcrrpcclient.Client, startHash *chainhash.H return err } - for _, hexTx := range r.Transactions { - serTx, err := hex.DecodeString(hexTx) - if err != nil { - return err - } - err = w.processSerializedTransaction(dbtx, serTx, + for _, tx := range r.Transactions { + err = w.processSerializedTransaction(dbtx, tx, &rawBlockHeader, &blockMeta) if err != nil { return err @@ -107,79 +105,40 @@ func (w *Wallet) rescan(chainClient *dcrrpcclient.Client, startHash *chainhash.H } // Rescan starts a rescan of the wallet for all blocks on the main chain -// beginning at startHash. -// -// An error channel is returned for consumers of this API, but it is not -// required to be read. If the error can not be immediately written to the -// returned channel, the error will be logged and the channel will be closed. -func (w *Wallet) Rescan(chainClient *dcrrpcclient.Client, startHash *chainhash.Hash) <-chan error { - errc := make(chan error) - - go func() (err error) { - defer func() { - select { - case errc <- err: - default: - if err != nil { - log.Errorf("Rescan failed: %v", err) - } - close(errc) - } - }() - - var startHeight int32 - err = walletdb.View(w.db, func(tx walletdb.ReadTx) error { - txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey) - header, err := w.TxStore.GetSerializedBlockHeader(txmgrNs, startHash) - if err != nil { - return err - } - startHeight = udb.ExtractBlockHeaderHeight(header) - return nil - }) +// beginning at startHash. This function blocks until the rescan completes. +func (w *Wallet) Rescan(ctx context.Context, n NetworkBackend, startHash *chainhash.Hash) error { + var startHeight int32 + err := walletdb.View(w.db, func(tx walletdb.ReadTx) error { + txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey) + header, err := w.TxStore.GetSerializedBlockHeader(txmgrNs, startHash) if err != nil { return err } + startHeight = udb.ExtractBlockHeaderHeight(header) + return nil + }) + if err != nil { + return err + } - return w.rescan(chainClient, startHash, startHeight, nil, nil) - }() - - return errc + return w.rescan(ctx, n, startHash, startHeight, nil) } // RescanFromHeight is an alternative to Rescan that takes a block height // instead of a hash. See Rescan for more details. -func (w *Wallet) RescanFromHeight(chainClient *dcrrpcclient.Client, startHeight int32) <-chan error { - errc := make(chan error) - - go func() (err error) { - defer func() { - select { - case errc <- err: - default: - if err != nil { - log.Errorf("Rescan failed: %v", err) - } - close(errc) - } - }() - - var startHash chainhash.Hash - err = walletdb.View(w.db, func(tx walletdb.ReadTx) error { - txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey) - var err error - startHash, err = w.TxStore.GetMainChainBlockHashForHeight( - txmgrNs, startHeight) - return err - }) - if err != nil { - return err - } - - return w.rescan(chainClient, &startHash, startHeight, nil, nil) - }() - - return errc +func (w *Wallet) RescanFromHeight(ctx context.Context, n NetworkBackend, startHeight int32) error { + var startHash chainhash.Hash + err := walletdb.View(w.db, func(tx walletdb.ReadTx) error { + txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey) + var err error + startHash, err = w.TxStore.GetMainChainBlockHashForHeight( + txmgrNs, startHeight) + return err + }) + if err != nil { + return err + } + return w.rescan(ctx, n, &startHash, startHeight, nil) } // RescanProgress records the height the rescan has completed through and any @@ -193,7 +152,9 @@ type RescanProgress struct { // the main chain starting at startHeight. Progress notifications and any // errors are sent to the channel p. This function blocks until the rescan // completes or ends in an error. p is closed before returning. -func (w *Wallet) RescanProgressFromHeight(chainClient *dcrrpcclient.Client, startHeight int32, p chan<- RescanProgress, cancel <-chan struct{}) { +func (w *Wallet) RescanProgressFromHeight(ctx context.Context, n NetworkBackend, + startHeight int32, p chan<- RescanProgress) { + defer close(p) var startHash chainhash.Hash @@ -209,7 +170,7 @@ func (w *Wallet) RescanProgressFromHeight(chainClient *dcrrpcclient.Client, star return } - err = w.rescan(chainClient, &startHash, startHeight, p, cancel) + err = w.rescan(ctx, n, &startHash, startHeight, p) if err != nil { p <- RescanProgress{Err: err} } diff --git a/wallet/sync.go b/wallet/sync.go index 0211bb9af..9bd052b3f 100644 --- a/wallet/sync.go +++ b/wallet/sync.go @@ -5,19 +5,17 @@ package wallet import ( - "encoding/hex" + "context" "fmt" "sync" "github.com/decred/dcrd/hdkeychain" - dcrrpcclient "github.com/decred/dcrd/rpcclient" "github.com/decred/dcrwallet/wallet/udb" "github.com/decred/dcrwallet/walletdb" - "github.com/jrick/bitset" "golang.org/x/sync/errgroup" ) -func (w *Wallet) findLastUsedAccount(client *dcrrpcclient.Client, coinTypeXpriv *hdkeychain.ExtendedKey) (uint32, error) { +func (w *Wallet) findLastUsedAccount(n NetworkBackend, coinTypeXpriv *hdkeychain.ExtendedKey) (uint32, error) { const scanLen = 100 var ( lastUsed uint32 @@ -50,7 +48,7 @@ Bsearch: } wg.Add(1) go func() { - used, err := w.accountUsed(client, xpub) + used, err := w.accountUsed(n, xpub) xpriv.Zero() results[i] = result{used, account, err} wg.Done() @@ -75,7 +73,7 @@ Bsearch: return lastUsed, nil } -func (w *Wallet) accountUsed(client *dcrrpcclient.Client, xpub *hdkeychain.ExtendedKey) (bool, error) { +func (w *Wallet) accountUsed(n NetworkBackend, xpub *hdkeychain.ExtendedKey) (bool, error) { extKey, intKey, err := deriveBranches(xpub) if err != nil { return false, err @@ -88,8 +86,8 @@ func (w *Wallet) accountUsed(client *dcrrpcclient.Client, xpub *hdkeychain.Exten merge := func(used bool, err error) { results <- result{used, err} } - go func() { merge(w.branchUsed(client, extKey)) }() - go func() { merge(w.branchUsed(client, intKey)) }() + go func() { merge(w.branchUsed(n, extKey)) }() + go func() { merge(w.branchUsed(n, intKey)) }() for i := 0; i < 2; i++ { r := <-results if r.err != nil { @@ -102,17 +100,17 @@ func (w *Wallet) accountUsed(client *dcrrpcclient.Client, xpub *hdkeychain.Exten return false, nil } -func (w *Wallet) branchUsed(client *dcrrpcclient.Client, branchXpub *hdkeychain.ExtendedKey) (bool, error) { +func (w *Wallet) branchUsed(n NetworkBackend, branchXpub *hdkeychain.ExtendedKey) (bool, error) { addrs, err := deriveChildAddresses(branchXpub, 0, uint32(w.gapLimit), w.chainParams) if err != nil { return false, err } - existsBitsHex, err := client.ExistsAddresses(addrs) + bits, err := n.AddressesUsed(context.TODO(), addrs) if err != nil { return false, err } - for _, r := range existsBitsHex { - if r != '0' { + for _, b := range bits { + if b != 0 { return true, nil } } @@ -122,7 +120,7 @@ func (w *Wallet) branchUsed(client *dcrrpcclient.Client, branchXpub *hdkeychain. // findLastUsedAddress returns the child index of the last used child address // derived from a branch key. If no addresses are found, ^uint32(0) is // returned. -func (w *Wallet) findLastUsedAddress(client *dcrrpcclient.Client, xpub *hdkeychain.ExtendedKey) (uint32, error) { +func (w *Wallet) findLastUsedAddress(n NetworkBackend, xpub *hdkeychain.ExtendedKey) (uint32, error) { var ( lastUsed = ^uint32(0) scanLen = uint32(w.gapLimit) @@ -136,16 +134,12 @@ Bsearch: if err != nil { return 0, err } - existsBitsHex, err := client.ExistsAddresses(addrs) - if err != nil { - return 0, err - } - existsBits, err := hex.DecodeString(existsBitsHex) + existsBits, err := n.AddressesUsed(context.TODO(), addrs) if err != nil { return 0, err } for i := len(addrs) - 1; i >= 0; i-- { - if bitset.Bytes(existsBits).Get(i) { + if existsBits.Get(i) { lastUsed = mid*scanLen + uint32(i) lo = mid + 1 continue Bsearch @@ -166,7 +160,7 @@ Bsearch: // account extended pubkeys. // // A transaction filter (re)load and rescan should be performed after discovery. -func (w *Wallet) DiscoverActiveAddresses(chainClient *dcrrpcclient.Client, discoverAccts bool) error { +func (w *Wallet) DiscoverActiveAddresses(n NetworkBackend, discoverAccts bool) error { // Start by rescanning the accounts and determining what the // current account index is. This scan should only ever be // performed if we're restoring our wallet from seed. @@ -186,7 +180,7 @@ func (w *Wallet) DiscoverActiveAddresses(chainClient *dcrrpcclient.Client, disco if err != nil { return err } - lastUsed, err := w.findLastUsedAccount(chainClient, coinTypePrivKey) + lastUsed, err := w.findLastUsedAccount(n, coinTypePrivKey) if err != nil { return err } @@ -266,7 +260,7 @@ func (w *Wallet) DiscoverActiveAddresses(chainClient *dcrrpcclient.Client, disco return err } - lastUsed, err := w.findLastUsedAddress(chainClient, branchXpub) + lastUsed, err := w.findLastUsedAddress(n, branchXpub) if err != nil { return err } diff --git a/wallet/txrules/rules.go b/wallet/txrules/rules.go index 1cce9d33c..d7ae8e16e 100644 --- a/wallet/txrules/rules.go +++ b/wallet/txrules/rules.go @@ -11,6 +11,7 @@ import ( "github.com/decred/dcrd/dcrutil" "github.com/decred/dcrd/txscript" "github.com/decred/dcrd/wire" + h "github.com/decred/dcrwallet/internal/helpers" ) // DefaultRelayFeePerKb is the default minimum relay fee policy for a mempool. @@ -89,3 +90,17 @@ func FeeForSerializeSize(relayFeePerKb dcrutil.Amount, txSerializeSize int) dcru return fee } + +// PaysHighFees checks whether the signed transaction pays insanely high fees. +// Transactons are defined to have a high fee if they have pay a fee rate that +// is 1000 time higher than the default fee. +func PaysHighFees(totalInput dcrutil.Amount, tx *wire.MsgTx) bool { + fee := totalInput - h.SumOutputValues(tx.TxOut) + if fee <= 0 { + // Impossible to determine + return false + } + + maxFee := FeeForSerializeSize(1000*DefaultRelayFeePerKb, tx.SerializeSize()) + return fee > maxFee +} diff --git a/wallet/udb/txmined.go b/wallet/udb/txmined.go index 1bb0d6dc5..83628c1ca 100644 --- a/wallet/udb/txmined.go +++ b/wallet/udb/txmined.go @@ -3684,3 +3684,39 @@ func (s *Store) storedTxScripts(ns walletdb.ReadBucket) ([][]byte, error) { } return scripts, err } + +// TotalInput calculates the input value referenced by all transaction inputs. +// If this is not calculable, this returns 0. +func (s *Store) TotalInput(dbtx walletdb.ReadTx, tx *wire.MsgTx) (dcrutil.Amount, error) { + ns := dbtx.ReadBucket(wtxmgrBucketKey) + + var total dcrutil.Amount + for _, in := range tx.TxIn { + var tx wire.MsgTx + if v := existsRawUnmined(ns, in.PreviousOutPoint.Hash[:]); v != nil { + err := tx.Deserialize(bytes.NewReader(extractRawUnminedTx(v))) + if err != nil { + desc := fmt.Sprintf("failed to deserialize unmined tx %v", + &in.PreviousOutPoint.Hash) + return 0, apperrors.Wrap(err, apperrors.ErrData, desc) + } + } else if _, v := latestTxRecord(ns, in.PreviousOutPoint.Hash[:]); v != nil { + err := readRawTxRecordMsgTx(&in.PreviousOutPoint.Hash, v, &tx) + if err != nil { + return 0, err + } + } else { + return 0, nil + } + + if in.PreviousOutPoint.Index >= uint32(len(tx.TxOut)) { + desc := fmt.Sprintf("previous output index %d does not exist in "+ + "transaction %v", in.PreviousOutPoint.Index, &in.PreviousOutPoint.Hash) + return 0, apperrors.New(apperrors.ErrInput, desc) + } + + total += dcrutil.Amount(tx.TxOut[in.PreviousOutPoint.Index].Value) + } + + return total, nil +} diff --git a/wallet/wallet.go b/wallet/wallet.go index 2b7343879..5bd09090e 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -7,6 +7,7 @@ package wallet import ( "bytes" + "context" "encoding/binary" "encoding/hex" "errors" @@ -30,7 +31,6 @@ import ( "github.com/decred/dcrd/txscript" "github.com/decred/dcrd/wire" "github.com/decred/dcrwallet/apperrors" - "github.com/decred/dcrwallet/chain" "github.com/decred/dcrwallet/wallet/txauthor" "github.com/decred/dcrwallet/wallet/txrules" "github.com/decred/dcrwallet/wallet/udb" @@ -96,8 +96,8 @@ type Wallet struct { initiallyUnlocked bool gapLimit int - chainClient *chain.RPCClient - chainClientLock sync.Mutex + networkBackend NetworkBackend + networkBackendMu sync.Mutex lockedOutpoints map[wire.OutPoint]struct{} @@ -239,24 +239,13 @@ func newWallet(votingEnabled bool, addressReuse bool, ticketAddress dcrutil.Addr return w } -// StakeDifficulty is used to get the current stake difficulty from the daemon. +// StakeDifficulty is used to get the next block's stake difficulty. func (w *Wallet) StakeDifficulty() (dcrutil.Amount, error) { - chainClient, err := w.requireChainClient() + n, err := w.NetworkBackend() if err != nil { return 0, err } - - sdResp, err := chainClient.GetStakeDifficulty() - if err != nil { - return 0, err - } - - sd, err := dcrutil.NewAmount(sdResp.NextStakeDifficulty) - if err != nil { - return 0, err - } - - return sd, nil + return n.StakeDifficulty(context.TODO()) } // BalanceToMaintain is used to get the current balancetomaintain for the wallet. @@ -504,89 +493,6 @@ func (w *Wallet) Start() { go w.walletLocker() } -// SynchronizeRPC associates the wallet with the consensus RPC client, -// synchronizes the wallet with the latest changes to the blockchain, and -// continuously updates the wallet through RPC notifications. -// -// This method is unstable and will be removed when all syncing logic is moved -// outside of the wallet package. -func (w *Wallet) SynchronizeRPC(chainClient *chain.RPCClient) { - w.quitMu.Lock() - select { - case <-w.quit: - w.quitMu.Unlock() - return - default: - } - w.quitMu.Unlock() - - // TODO: Ignoring the new client when one is already set breaks callers - // who are replacing the client, perhaps after a disconnect. - w.chainClientLock.Lock() - if w.chainClient != nil { - w.chainClientLock.Unlock() - return - } - w.chainClient = chainClient - w.chainClientLock.Unlock() - - // TODO: It would be preferable to either run these goroutines - // separately from the wallet (use wallet mutator functions to - // make changes from the RPC client) and not have to stop and - // restart them each time the client disconnects and reconnets. - w.wg.Add(2) - go w.handleChainNotifications(chainClient) - go w.handleChainVotingNotifications(chainClient) - - // Request notifications for winning tickets. - err := chainClient.NotifyWinningTickets() - if err != nil { - log.Error("Unable to request transaction updates for "+ - "winning tickets. Error: ", err.Error()) - } - - // Request notifications for spent and missed tickets. - err = chainClient.NotifySpentAndMissedTickets() - if err != nil { - log.Error("Unable to request transaction updates for spent "+ - "and missed tickets. Error: ", err.Error()) - } - - if w.votingEnabled { - log.Infof("Wallet voting enabled") - log.Infof("Please ensure your wallet remains unlocked so it may vote") - } -} - -// requireChainClient marks that a wallet method can only be completed when the -// consensus RPC server is set. This function and all functions that call it -// are unstable and will need to be moved when the syncing code is moved out of -// the wallet. -func (w *Wallet) requireChainClient() (*dcrrpcclient.Client, error) { - w.chainClientLock.Lock() - chainClient := w.chainClient - w.chainClientLock.Unlock() - if chainClient == nil { - return nil, errors.New("blockchain RPC is inactive") - } - return chainClient.Client, nil -} - -// ChainClient returns the optional consensus RPC client associated with the -// wallet. -// -// This function is unstable and will be removed once sync logic is moved out of -// the wallet. -func (w *Wallet) ChainClient() *dcrrpcclient.Client { - w.chainClientLock.Lock() - chainClient := w.chainClient - w.chainClientLock.Unlock() - if chainClient == nil { - return nil - } - return chainClient.Client -} - // RelayFee returns the current minimum relay fee (per kB of serialized // transaction) used when constructing transactions. func (w *Wallet) RelayFee() dcrutil.Amount { @@ -638,49 +544,14 @@ func (w *Wallet) Stop() { case <-quit: default: close(quit) - w.chainClientLock.Lock() - if w.chainClient != nil { - w.chainClient.Stop() - w.chainClient = nil - } - w.chainClientLock.Unlock() - } -} - -// ShuttingDown returns whether the wallet is currently in the process of -// shutting down or not. -func (w *Wallet) ShuttingDown() bool { - select { - case <-w.quitChan(): - return true - default: - return false } } // WaitForShutdown blocks until all wallet goroutines have finished executing. func (w *Wallet) WaitForShutdown() { - w.chainClientLock.Lock() - if w.chainClient != nil { - w.chainClient.WaitForShutdown() - } - w.chainClientLock.Unlock() w.wg.Wait() } -// SynchronizingToNetwork returns whether the wallet is currently synchronizing -// with the Bitcoin network. -func (w *Wallet) SynchronizingToNetwork() bool { - // At the moment, RPC is the only synchronization method. In the - // future, when SPV is added, a separate check will also be needed, or - // SPV could always be enabled if RPC was not explicitly specified when - // creating the wallet. - w.chainClientLock.Lock() - syncing := w.chainClient != nil - w.chainClientLock.Unlock() - return syncing -} - // MainChainTip returns the hash and height of the tip-most block in the main // chain that the wallet is synchronized to. func (w *Wallet) MainChainTip() (hash chainhash.Hash, height int32) { @@ -699,7 +570,7 @@ func (w *Wallet) MainChainTip() (hash chainhash.Hash, height int32) { // loadActiveAddrs loads the consensus RPC server with active addresses for // transaction notifications. For logging purposes, it returns the total number // of addresses loaded. -func (w *Wallet) loadActiveAddrs(dbtx walletdb.ReadTx, chainClient *dcrrpcclient.Client) (uint64, error) { +func (w *Wallet) loadActiveAddrs(dbtx walletdb.ReadTx, nb NetworkBackend) (uint64, error) { // loadBranchAddrs loads addresses for the branch with the child range [0,n]. loadBranchAddrs := func(branchKey *hdkeychain.ExtendedKey, n uint32, errs chan<- error) { const step = 256 @@ -719,7 +590,7 @@ func (w *Wallet) loadActiveAddrs(dbtx walletdb.ReadTx, chainClient *dcrrpcclient } addrs = append(addrs, addr) } - return chainClient.LoadTxFilter(false, addrs, nil) + return nb.LoadTxFilter(context.TODO(), false, addrs, nil) }) } errs <- g.Wait() @@ -772,7 +643,7 @@ func (w *Wallet) loadActiveAddrs(dbtx walletdb.ReadTx, chainClient *dcrrpcclient return } importedAddrCount = uint64(len(addrs)) - errs <- chainClient.LoadTxFilter(false, addrs, nil) + errs <- nb.LoadTxFilter(context.TODO(), false, addrs, nil) }() for i := 0; i < cap(errs); i++ { err := <-errs @@ -784,16 +655,15 @@ func (w *Wallet) loadActiveAddrs(dbtx walletdb.ReadTx, chainClient *dcrrpcclient return bip0044AddrCount + importedAddrCount, nil } -// LoadActiveDataFilters loads the consensus RPC server's websocket client -// transaction filter with all active addresses and unspent outpoints for this -// wallet. -func (w *Wallet) LoadActiveDataFilters(chainClient *dcrrpcclient.Client) error { +// LoadActiveDataFilters loads filters for all active addresses and unspent +// outpoints for this wallet. +func (w *Wallet) LoadActiveDataFilters(n NetworkBackend) error { log.Infof("Loading active addresses and unspent outputs...") var addrCount, utxoCount uint64 err := walletdb.View(w.db, func(dbtx walletdb.ReadTx) error { var err error - addrCount, err = w.loadActiveAddrs(dbtx, chainClient) + addrCount, err = w.loadActiveAddrs(dbtx, n) if err != nil { return err } @@ -804,7 +674,7 @@ func (w *Wallet) LoadActiveDataFilters(chainClient *dcrrpcclient.Client) error { return err } utxoCount = uint64(len(unspent)) - err = chainClient.LoadTxFilter(false, nil, unspent) + err = n.LoadTxFilter(context.TODO(), false, nil, unspent) return err }) if err != nil { @@ -818,19 +688,13 @@ func (w *Wallet) LoadActiveDataFilters(chainClient *dcrrpcclient.Client) error { // createHeaderData creates the header data to process from hex-encoded // serialized block headers. -func createHeaderData(headers []string) ([]udb.BlockHeaderData, error) { +func createHeaderData(headers [][]byte) ([]udb.BlockHeaderData, error) { data := make([]udb.BlockHeaderData, len(headers)) - hexbuf := make([]byte, len(udb.RawBlockHeader{})*2) var decodedHeader wire.BlockHeader for i, header := range headers { var headerData udb.BlockHeaderData - copy(hexbuf, header) - _, err := hex.Decode(headerData.SerializedHeader[:], hexbuf) - if err != nil { - return nil, err - } - r := bytes.NewReader(headerData.SerializedHeader[:]) - err = decodedHeader.Deserialize(r) + copy(headerData.SerializedHeader[:], header) + err := decodedHeader.Deserialize(bytes.NewReader(header)) if err != nil { return nil, err } @@ -840,7 +704,7 @@ func createHeaderData(headers []string) ([]udb.BlockHeaderData, error) { return data, nil } -func (w *Wallet) fetchHeaders(chainClient *dcrrpcclient.Client) (int, error) { +func (w *Wallet) fetchHeaders(n NetworkBackend) (int, error) { fetchedHeaders := 0 var blockLocators []chainhash.Hash @@ -856,16 +720,16 @@ func (w *Wallet) fetchHeaders(chainClient *dcrrpcclient.Client) (int, error) { // Fetch and process headers until no more are returned. hashStop := chainhash.Hash{} for { - response, err := chainClient.GetHeaders(blockLocators, &hashStop) + headers, err := n.GetHeaders(context.TODO(), blockLocators, &hashStop) if err != nil { return 0, err } - if len(response.Headers) == 0 { + if len(headers) == 0 { return fetchedHeaders, nil } - headerData, err := createHeaderData(response.Headers) + headerData, err := createHeaderData(headers) if err != nil { return 0, err } @@ -885,7 +749,7 @@ func (w *Wallet) fetchHeaders(chainClient *dcrrpcclient.Client) (int, error) { return 0, err } - fetchedHeaders += len(response.Headers) + fetchedHeaders += len(headers) } } @@ -894,7 +758,7 @@ func (w *Wallet) fetchHeaders(chainClient *dcrrpcclient.Client) (int, error) { // returned, along with the hash of the first previously-unseen block hash now // in the main chain. This is the block a rescan should begin at (inclusive), // and is only relevant when the number of fetched headers is not zero. -func (w *Wallet) FetchHeaders(chainClient *dcrrpcclient.Client) (count int, rescanFrom chainhash.Hash, rescanFromHeight int32, +func (w *Wallet) FetchHeaders(n NetworkBackend) (count int, rescanFrom chainhash.Hash, rescanFromHeight int32, mainChainTipBlockHash chainhash.Hash, mainChainTipBlockHeight int32, err error) { // Unfortunately, getheaders is broken and needs a workaround when wallet's @@ -917,7 +781,7 @@ func (w *Wallet) FetchHeaders(chainClient *dcrrpcclient.Client) (count int, resc hash, height := commonAncestor, commonAncestorHeight for height != 0 { - mainChainHash, err := chainClient.GetBlockHash(int64(height)) + mainChainHash, err := n.GetBlockHash(context.TODO(), height) if err == nil && hash == *mainChainHash { // found it break @@ -945,7 +809,7 @@ func (w *Wallet) FetchHeaders(chainClient *dcrrpcclient.Client) (count int, resc } log.Infof("Fetching headers") - fetchedHeaderCount, err := w.fetchHeaders(chainClient) + fetchedHeaderCount, err := w.fetchHeaders(n) if err != nil { return } @@ -998,65 +862,6 @@ func (w *Wallet) FetchHeaders(chainClient *dcrrpcclient.Client) (count int, resc mainChainTipBlockHeight, nil } -// syncWithChain brings the wallet up to date with the current chain server -// connection. It creates a rescan request and blocks until the rescan has -// finished. -func (w *Wallet) syncWithChain(chainClient *dcrrpcclient.Client) error { - // Request notifications for connected and disconnected blocks. - err := chainClient.NotifyBlocks() - if err != nil { - return err - } - - // Discover any addresses for this wallet that have not yet been created. - err = w.DiscoverActiveAddresses(chainClient, w.initiallyUnlocked) - if err != nil { - return err - } - - // Load transaction filters with all active addresses and watched outpoints. - err = w.LoadActiveDataFilters(chainClient) - if err != nil { - return err - } - - // Fetch headers for unseen blocks in the main chain, determine whether a - // rescan is necessary, and when to begin it. - fetchedHeaderCount, rescanStart, _, _, _, err := w.FetchHeaders(chainClient) - if err != nil { - return err - } - - // Rescan when necessary. - if fetchedHeaderCount != 0 { - err = <-w.Rescan(chainClient, &rescanStart) - if err != nil { - return err - } - } - - w.resendUnminedTxs(chainClient) - - // Send winning and missed ticket notifications out so that the wallet - // can immediately vote and redeem any tickets it may have missed on - // startup. - // TODO A proper pass through for dcrrpcclient for these cmds. - if w.initiallyUnlocked { - _, err = chainClient.RawRequest("rebroadcastwinners", nil) - if err != nil { - return err - } - _, err = chainClient.RawRequest("rebroadcastmissed", nil) - if err != nil { - return err - } - } - - log.Infof("Blockchain sync completed, wallet ready for general usage.") - - return nil -} - type ( consolidateRequest struct { inputs int @@ -1716,27 +1521,6 @@ func VerifyMessage(msg string, addr dcrutil.Address, sig []byte) (bool, error) { return recoveredAddr.EncodeAddress() == addr.EncodeAddress(), nil } -// existsAddressOnChain checks the chain on daemon to see if the given address -// has been used before on the main chain. -func (w *Wallet) existsAddressOnChain(address dcrutil.Address) (bool, error) { - chainClient, err := w.requireChainClient() - if err != nil { - return false, err - } - exists, err := chainClient.ExistsAddress(address) - if err != nil { - return false, err - } - - return exists, nil -} - -// ExistsAddressOnChain is the exported version of existsAddressOnChain that is -// safe for concurrent access. -func (w *Wallet) ExistsAddressOnChain(address dcrutil.Address) (bool, error) { - return w.existsAddressOnChain(address) -} - // HaveAddress returns whether the wallet is the owner of the address a. func (w *Wallet) HaveAddress(a dcrutil.Address) (bool, error) { err := walletdb.View(w.db, func(tx walletdb.ReadTx) error { @@ -1913,8 +1697,7 @@ func (w *Wallet) NextAccount(name string) (uint32, error) { } w.addressBuffersMu.Unlock() - client := w.ChainClient() - if client != nil { + if n, err := w.NetworkBackend(); err == nil { errs := make(chan error, 2) for _, branchKey := range []*hdkeychain.ExtendedKey{extKey, intKey} { branchKey := branchKey @@ -1925,7 +1708,7 @@ func (w *Wallet) NextAccount(name string) (uint32, error) { errs <- err return } - errs <- client.LoadTxFilter(false, addrs, nil) + errs <- n.LoadTxFilter(context.TODO(), false, addrs, nil) }() } for i := 0; i < cap(errs); i++ { @@ -2815,15 +2598,10 @@ func (w *Wallet) DumpWIFPrivateKey(addr dcrutil.Address) (string, error) { // ImportPrivateKey imports a private key to the wallet and writes the new // wallet to disk. func (w *Wallet) ImportPrivateKey(wif *dcrutil.WIF) (string, error) { - chainClient, err := w.requireChainClient() - if err != nil { - return "", err - } - // Attempt to import private key into wallet. var addr dcrutil.Address var props *udb.AccountProperties - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + err := walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { addrmgrNs := tx.ReadWriteBucket(waddrmgrNamespaceKey) maddr, err := w.Manager.ImportPrivateKey(addrmgrNs, wif) if err == nil { @@ -2837,10 +2615,12 @@ func (w *Wallet) ImportPrivateKey(wif *dcrutil.WIF) (string, error) { return "", err } - err = chainClient.LoadTxFilter(false, []dcrutil.Address{addr}, nil) - if err != nil { - return "", fmt.Errorf("Failed to subscribe for address ntfns for "+ - "address %s: %s", addr.EncodeAddress(), err) + if n, err := w.NetworkBackend(); err == nil { + err := n.LoadTxFilter(context.TODO(), false, []dcrutil.Address{addr}, nil) + if err != nil { + return "", fmt.Errorf("Failed to subscribe for address ntfns for "+ + "address %v: %s", addr, err) + } } addrStr := addr.EncodeAddress() @@ -2856,11 +2636,6 @@ func (w *Wallet) ImportPrivateKey(wif *dcrutil.WIF) (string, error) { // user to specify whether or not they want the redeemscript to be rescanned, // and how far back they wish to rescan. func (w *Wallet) ImportScript(rs []byte) error { - chainClient, err := w.requireChainClient() - if err != nil { - return err - } - return walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { addrmgrNs := tx.ReadWriteBucket(waddrmgrNamespaceKey) txmgrNs := tx.ReadWriteBucket(wtxmgrNamespaceKey) @@ -2870,7 +2645,6 @@ func (w *Wallet) ImportScript(rs []byte) error { return err } - // Get current block's height and hash. mscriptaddr, err := w.Manager.ImportScript(addrmgrNs, rs) if err != nil { switch { @@ -2885,18 +2659,17 @@ func (w *Wallet) ImportScript(rs []byte) error { return err } } - err = chainClient.LoadTxFilter(false, - []dcrutil.Address{mscriptaddr.Address()}, nil) - if err != nil { - return fmt.Errorf("Failed to subscribe for address ntfns for "+ - "address %s: %s", mscriptaddr.Address().EncodeAddress(), - err.Error()) - } + addr := mscriptaddr.Address() - log.Infof("Redeem script hash %x (address %v) successfully added.", - mscriptaddr.Address().ScriptAddress(), - mscriptaddr.Address().EncodeAddress()) + if n, err := w.NetworkBackend(); err == nil { + err := n.LoadTxFilter(context.TODO(), false, []dcrutil.Address{addr}, nil) + if err != nil { + return fmt.Errorf("failed to subscribe for address ntfns for "+ + "address %v: %v", addr, err) + } + } + log.Infof("Imported script with P2SH address %v", addr) return nil }) } @@ -3188,10 +2961,10 @@ func (w *Wallet) LockedOutpoints() []dcrjson.TransactionInput { return locked } -// resendUnminedTxs iterates through all transactions that spend from wallet -// credits that are not known to have been mined into a block, and attempts -// to send each to the chain server for relay. -func (w *Wallet) resendUnminedTxs(chainClient *dcrrpcclient.Client) { +// UnminedTransactions returns all unmined transactions from the wallet. +// Transactions are sorted in dependency order making it suitable to range them +// in order to broadcast at wallet startup. +func (w *Wallet) UnminedTransactions() ([]*wire.MsgTx, error) { var txs []*wire.MsgTx err := walletdb.View(w.db, func(tx walletdb.ReadTx) error { txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey) @@ -3199,22 +2972,7 @@ func (w *Wallet) resendUnminedTxs(chainClient *dcrrpcclient.Client) { txs, err = w.TxStore.UnminedTxs(txmgrNs) return err }) - if err != nil { - log.Errorf("Cannot load unmined transactions for resending: %v", err) - return - } - - for _, tx := range txs { - resp, err := chainClient.SendRawTransaction(tx, w.AllowHighFees) - if err != nil { - // TODO(jrick): Check error for if this tx is a double spend, - // remove it if so. - log.Tracef("Could not resend transaction %v: %v", - tx.TxHash(), err) - continue - } - log.Tracef("Resent unmined transaction %v", resp) - } + return txs, err } // SortedActivePaymentAddresses returns a slice of all active payment @@ -3675,10 +3433,24 @@ func (w *Wallet) isRelevantTx(dbtx walletdb.ReadTx, tx *wire.MsgTx) bool { // PublishTransaction saves (if relevant) and sends the transaction to the // consensus RPC server so it can be propigated to other nodes and eventually // mined. If the send fails, the transaction is not added to the wallet. -func (w *Wallet) PublishTransaction(tx *wire.MsgTx, serializedTx []byte, client *dcrrpcclient.Client) (*chainhash.Hash, error) { +func (w *Wallet) PublishTransaction(tx *wire.MsgTx, serializedTx []byte, n NetworkBackend) (*chainhash.Hash, error) { var relevant bool err := walletdb.View(w.db, func(dbtx walletdb.ReadTx) error { relevant = w.isRelevantTx(dbtx, tx) + + // Prevent high fee transactions from being published, if disabled and + // the fee can be calculated. + if relevant && !w.AllowHighFees { + totalInput, err := w.TxStore.TotalInput(dbtx, tx) + if err != nil { + return err + } + err = w.checkHighFees(totalInput, tx) + if err != nil { + return err + } + } + return nil }) if err != nil { @@ -3686,7 +3458,12 @@ func (w *Wallet) PublishTransaction(tx *wire.MsgTx, serializedTx []byte, client } if !relevant { - return client.SendRawTransaction(tx, w.AllowHighFees) + err := n.PublishTransaction(context.TODO(), tx) + if err != nil { + return nil, err + } + txHash := tx.TxHash() + return &txHash, nil } var txHash *chainhash.Hash @@ -3695,8 +3472,13 @@ func (w *Wallet) PublishTransaction(tx *wire.MsgTx, serializedTx []byte, client if err != nil { return err } - txHash, err = client.SendRawTransaction(tx, w.AllowHighFees) - return err + err = n.PublishTransaction(context.TODO(), tx) + if err != nil { + return err + } + h := tx.TxHash() + txHash = &h + return nil }) return txHash, err }