Skip to content

Commit

Permalink
multi: Refactor wallet.NetworkBackend.
Browse files Browse the repository at this point in the history
This unifies the wallet's Peer and NetworkBackend interfaces into a
single interface, given the SPV peer syncer now fulfills both roles.

In order to complete the refactor, the following changes are made:

- The funcs that were in Peer are moved to NetworkBackend.
- The missing funcs are added to chain.Syncer, forwading the calls to
  the dcrd client connection.
- chain.Syncer assigns itself as NetworkBackend to the wallet, instead
  of assigning the client connection.
- deployments package is modified to take a more limited interface
  (Querier) instead of the syncer var directly, to avoid circular pkg
  dependencies.
- The live ticket querier (in tickets.go) is modified to take a more
  limited interface instead of the syncer var directly, to avoid
  circular pkg dependencies.
- Call sites that type selected on *dcrd.RPC are modified to type select
  on *chain.Syncer.
- A Synced() function is added to the NetworkBackend interface so that
  the JSON-RPC server can more accurately query for synced status from
  the underlying syncer.

In the future, the dependency on concrete type checking could be further
reduced on other call sites by using the same technique that was
employed in this commit for the deployments package.
  • Loading branch information
matheusd committed Jan 31, 2024
1 parent 5a412fb commit 6cde2cc
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 209 deletions.
82 changes: 82 additions & 0 deletions chain/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package chain

import (
"context"

"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrd/gcs/v4"
dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v4"
"github.com/decred/dcrd/txscript/v4/stdaddr"
"github.com/decred/dcrd/wire"
"github.com/jrick/bitset"
)

// Blocks is part of the wallet.NetworkBackend interface.
func (s *Syncer) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*wire.MsgBlock, error) {
return s.rpc.Blocks(ctx, blockHashes)
}

type filterProof = struct {
Filter *gcs.FilterV2
ProofIndex uint32
Proof []chainhash.Hash
}

// CFiltersV2 is part of the wallet.NetworkBackend interface.
func (s *Syncer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) ([]filterProof, error) {
return s.rpc.CFiltersV2(ctx, blockHashes)
}

// PublishTransactions is part of the wallet.NetworkBackend interface.
func (s *Syncer) PublishTransactions(ctx context.Context, txs ...*wire.MsgTx) error {
return s.rpc.PublishTransactions(ctx, txs...)
}

// LoadTxFilter is part of the wallet.NetworkBackend interface.
func (s *Syncer) LoadTxFilter(ctx context.Context, reload bool, addrs []stdaddr.Address, outpoints []wire.OutPoint) error {
return s.rpc.LoadTxFilter(ctx, reload, addrs, outpoints)
}

// Rescan is part of the wallet.NetworkBackend interface.
func (s *Syncer) Rescan(ctx context.Context, blocks []chainhash.Hash, save func(block *chainhash.Hash, txs []*wire.MsgTx) error) error {
return s.rpc.Rescan(ctx, blocks, save)
}

// StakeDifficulty is part of the wallet.NetworkBackend interface.
func (s *Syncer) StakeDifficulty(ctx context.Context) (dcrutil.Amount, error) {
return s.rpc.StakeDifficulty(ctx)
}

// Deployments fulfills the DeploymentQuerier interface.
func (s *Syncer) Deployments(ctx context.Context) (map[string]dcrdtypes.AgendaInfo, error) {
info, err := s.rpc.GetBlockchainInfo(ctx)
if err != nil {
return nil, err
}
return info.Deployments, nil
}

// GetTxOut fulfills the LiveTicketQuerier interface.
func (s *Syncer) GetTxOut(ctx context.Context, txHash *chainhash.Hash, index uint32, tree int8, includeMempool bool) (*dcrdtypes.GetTxOutResult, error) {
return s.rpc.GetTxOut(ctx, txHash, index, tree, includeMempool)
}

// GetConfirmationHeight fulfills the LiveTicketQuerier interface.
func (s *Syncer) GetConfirmationHeight(ctx context.Context, txHash *chainhash.Hash) (int32, error) {
return s.rpc.GetConfirmationHeight(ctx, txHash)
}

// ExistsLiveTickets fulfills the LiveTicketQuerier interface.
func (s *Syncer) ExistsLiveTickets(ctx context.Context, tickets []*chainhash.Hash) (bitset.Bytes, error) {
return s.rpc.ExistsLiveTickets(ctx, tickets)
}

// UsedAddresses fulfills the usedAddressesQuerier interface.
func (s *Syncer) UsedAddresses(ctx context.Context, addrs []stdaddr.Address) (bitset.Bytes, error) {
return s.rpc.UsedAddresses(ctx, addrs)
}
59 changes: 50 additions & 9 deletions chain/sync.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2020 The Decred developers
// Copyright (c) 2017-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -30,7 +30,8 @@ var requiredAPIVersion = semver{Major: 8, Minor: 0, Patch: 0}
// Syncer implements wallet synchronization services by processing
// notifications from a dcrd JSON-RPC server.
type Syncer struct {
atomicWalletSynced atomic.Uint32 // CAS (synced=1) when wallet syncing complete
atomicWalletSynced atomic.Uint32 // CAS (synced=1) when wallet syncing complete
atomicTargetSyncHeight atomic.Int32

wallet *wallet.Wallet
opts *RPCOptions
Expand Down Expand Up @@ -94,6 +95,11 @@ func (s *Syncer) SetCallbacks(cb *Callbacks) {
s.cb = cb
}

// RPC returns the JSON-RPC client to the underlying dcrd node.
func (s *Syncer) RPC() *dcrd.RPC {
return s.rpc
}

// DisableDiscoverAccounts disables account discovery. This has an effect only
// if called before the main Run() executes the account discovery process.
func (s *Syncer) DisableDiscoverAccounts() {
Expand All @@ -102,6 +108,19 @@ func (s *Syncer) DisableDiscoverAccounts() {
s.mu.Unlock()
}

// Synced returns whether the syncer has completed syncing to the backend and
// the target height it is attempting to sync to.
func (s *Syncer) Synced(ctx context.Context) (bool, int32) {
synced := s.atomicWalletSynced.Load() == 1
var targetHeight int32
if !synced {
targetHeight = s.atomicTargetSyncHeight.Load()
} else {
_, targetHeight = s.wallet.MainChainTip(ctx)
}
return synced, targetHeight
}

// synced checks the atomic that controls wallet syncness and if previously
// unsynced, updates to synced and notifies the callback, if set.
func (s *Syncer) synced() {
Expand All @@ -111,6 +130,15 @@ func (s *Syncer) synced() {
}
}

// unsynced checks the atomic that controls wallet syncness and if previously
// synced, updates to unsynced and notifies the callback, if set.
func (s *Syncer) unsynced() {
swapped := s.atomicWalletSynced.CompareAndSwap(1, 0)
if swapped && s.cb != nil && s.cb.Synced != nil {
s.cb.Synced(false)
}
}

func (s *Syncer) fetchMissingCfiltersStart() {
if s.cb != nil && s.cb.FetchMissingCFiltersStarted != nil {
s.cb.FetchMissingCFiltersStarted()
Expand Down Expand Up @@ -185,12 +213,24 @@ func (s *Syncer) getHeaders(ctx context.Context) error {
return err
}

startedSynced := s.atomicWalletSynced.Load() == 1

cnet := s.wallet.ChainParams().Net
s.fetchHeadersStart()
for {
if err := ctx.Err(); err != nil {
return err
}

// If unsynced, update the target sync height.
if !startedSynced {
info, err := s.rpc.GetBlockchainInfo(ctx)
if err != nil {
return err
}
s.atomicTargetSyncHeight.Store(int32(info.Headers))
}

headers, err := s.rpc.Headers(ctx, locators, &hashStop)
if err != nil {
return err
Expand Down Expand Up @@ -312,15 +352,15 @@ func (s *Syncer) getHeaders(ctx context.Context) error {
discoverAccts := s.discoverAccts
s.mu.Unlock()
s.discoverAddressesStart()
err = s.wallet.DiscoverActiveAddresses(ctx, s.rpc, rescanPoint, discoverAccts, s.wallet.GapLimit())
err = s.wallet.DiscoverActiveAddresses(ctx, s, rescanPoint, discoverAccts, s.wallet.GapLimit())
if err != nil {
return err
}
s.discoverAddressesFinished()
s.mu.Lock()
s.discoverAccts = false
s.mu.Unlock()
err = s.wallet.LoadActiveDataFilters(ctx, s.rpc, true)
err = s.wallet.LoadActiveDataFilters(ctx, s, true)
if err != nil {
return err
}
Expand All @@ -331,7 +371,7 @@ func (s *Syncer) getHeaders(ctx context.Context) error {
return err
}
progress := make(chan wallet.RescanProgress, 1)
go s.wallet.RescanProgressFromHeight(ctx, s.rpc, int32(rescanBlock.Height), progress)
go s.wallet.RescanProgressFromHeight(ctx, s, int32(rescanBlock.Height), progress)

for p := range progress {
if p.Err != nil {
Expand All @@ -342,7 +382,7 @@ func (s *Syncer) getHeaders(ctx context.Context) error {
s.rescanFinished()

} else {
err = s.wallet.LoadActiveDataFilters(ctx, s.rpc, true)
err = s.wallet.LoadActiveDataFilters(ctx, s, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -454,7 +494,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

// Associate the RPC client with the wallet and remove the association on return.
s.wallet.SetNetworkBackend(s.rpc)
s.wallet.SetNetworkBackend(s)
defer s.wallet.SetNetworkBackend(nil)

tipHash, tipHeight := s.wallet.MainChainTip(ctx)
Expand Down Expand Up @@ -489,7 +529,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
// Fetch any missing main chain compact filters.
s.fetchMissingCfiltersStart()
progress := make(chan wallet.MissingCFilterProgress, 1)
go s.wallet.FetchMissingCFiltersWithProgress(ctx, s.rpc, progress)
go s.wallet.FetchMissingCFiltersWithProgress(ctx, s, progress)
for p := range progress {
if p.Err != nil {
return p.Err
Expand All @@ -503,6 +543,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
return err
}
defer s.unsynced()

// Request notifications for connected and disconnected blocks.
err = s.rpc.Call(ctx, "notifyblocks", nil)
Expand All @@ -517,7 +558,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

// Rebroadcast unmined transactions
err = s.wallet.PublishUnminedTransactions(ctx, s.rpc)
err = s.wallet.PublishUnminedTransactions(ctx, s)
if err != nil {
// Returning this error would end and (likely) restart sync in
// an endless loop. It's possible a transaction should be
Expand Down
29 changes: 16 additions & 13 deletions deployments/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"

"decred.org/dcrwallet/v4/errors"
"decred.org/dcrwallet/v4/rpc/client/dcrd"
"github.com/decred/dcrd/chaincfg/v3"
dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v4"
"github.com/decred/dcrd/wire"
Expand Down Expand Up @@ -78,12 +77,20 @@ const (
activeStatus = dcrdtypes.AgendaInfoStatusActive
)

// Querier defines the interface for a chain backend that can (trustfully)
// query for agenda deployment information.
type Querier interface {
// Deployments should return information about existing agendas,
// including their deployment status.
Deployments(context.Context) (map[string]dcrdtypes.AgendaInfo, error)
}

// DCP0010Active returns whether the consensus rules for the next block with the
// current chain tip height requires the subsidy split as specified in DCP0010.
// DCP0010 is always active on simnet, and requires the RPC syncer to detect
// activation on mainnet and testnet3.
func DCP0010Active(ctx context.Context, height int32, params *chaincfg.Params,
syncer any) (bool, error) {
querier Querier) (bool, error) {

net := params.Net
rcai := int32(params.RuleChangeActivationInterval)
Expand All @@ -94,16 +101,14 @@ func DCP0010Active(ctx context.Context, height int32, params *chaincfg.Params,
if net != wire.MainNet && net != wire.TestNet3 {
return false, nil
}
rpc, ok := syncer.(*dcrd.RPC)
if !ok {
if querier == nil {
return false, errors.E(errors.Bug, "DCP0010 activation check requires RPC syncer")
}
var resp dcrdtypes.GetBlockChainInfoResult
err := rpc.Call(ctx, "getblockchaininfo", &resp)
deployments, err := querier.Deployments(ctx)
if err != nil {
return false, err
}
d, ok := resp.Deployments[chaincfg.VoteIDChangeSubsidySplit]
d, ok := deployments[chaincfg.VoteIDChangeSubsidySplit]
if !ok {
return false, nil
}
Expand All @@ -122,24 +127,22 @@ func DCP0010Active(ctx context.Context, height int32, params *chaincfg.Params,
// DCP0012. DCP0012 requires the RPC syncer to detect activation on mainnet,
// testnet3 and simnet.
func DCP0012Active(ctx context.Context, height int32, params *chaincfg.Params,
syncer any) (bool, error) {
querier Querier) (bool, error) {

net := params.Net
rcai := int32(params.RuleChangeActivationInterval)

if net != wire.MainNet && net != wire.TestNet3 && net != wire.SimNet {
return false, nil
}
rpc, ok := syncer.(*dcrd.RPC)
if !ok {
if querier == nil {
return false, errors.E(errors.Bug, "DCP0012 activation check requires RPC syncer")
}
var resp dcrdtypes.GetBlockChainInfoResult
err := rpc.Call(ctx, "getblockchaininfo", &resp)
deployments, err := querier.Deployments(ctx)
if err != nil {
return false, err
}
d, ok := resp.Deployments[chaincfg.VoteIDChangeSubsidySplitR2]
d, ok := deployments[chaincfg.VoteIDChangeSubsidySplitR2]
if !ok {
return false, nil
}
Expand Down
Loading

0 comments on commit 6cde2cc

Please sign in to comment.