Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spv: Refactor initial cfilter fetching to be done in smaller batches #2307

Merged
merged 2 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 100 additions & 19 deletions p2p/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,29 +1515,110 @@ type filterProof = struct {
// CFiltersV2 requests version 2 cfilters for all blocks described by
// blockHashes. This is currently implemented by making many separate
// getcfilter requests concurrently and waiting on every result.
//
// Note: returning a []func() is an ugly hack to prevent a cyclical dependency
// between the rpc package and the wallet package.
func (rp *RemotePeer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash) ([]filterProof, error) {
// TODO: this is spammy and would be better implemented with a single
// request/response.
const opf = "remotepeer(%v).CFiltersV2(%v)"

ctxSend, cancelSend := context.WithCancel(ctx)
defer cancelSend()

type request struct {
t time.Time
c chan *wire.MsgCFilterV2
}

// Send the requests on a separate goroutine, as fast as the network
// accepts them.
errChan := make(chan error, 1)
requests := make(chan request, len(blockHashes))
go func() {
defer close(requests)
for _, blockHash := range blockHashes {
m := wire.NewMsgGetCFilterV2(blockHash)
c := make(chan *wire.MsgCFilterV2, 1)
if !rp.addRequestedCFilterV2(blockHash, c) {
op := errors.Opf(opf, rp.raddr, blockHash)
errChan <- errors.E(op, errors.Invalid, "cfilterv2 is already being requested from this peer for this block")
return
}
now := time.Now()
select {
case rp.out <- &msgAck{m, nil}:
requests <- request{t: now, c: c}
case <-ctxSend.Done():
return
case <-rp.errc:
return
}
}
}()

stalled := time.NewTimer(stallTimeout)

// Helper func that stops the sending goroutine and removes all requests
// made starting at index `start`.
cleanup := func(start int, stopStalled bool) {
cancelSend()
for range requests { // Drain until it signals closed.
}
for i := start; i < len(blockHashes); i++ {
rp.deleteRequestedCFilterV2(blockHashes[i])
}
if stopStalled && !stalled.Stop() {
<-stalled.C
}
}

// Receive the responses.
filters := make([]filterProof, len(blockHashes))
g, ctx := errgroup.WithContext(ctx)
for i := range blockHashes {
i := i
g.Go(func() error {
f, pi, prf, err := rp.CFilterV2(ctx, blockHashes[i])
filters[i] = filterProof{
Filter: f,
ProofIndex: pi,
Proof: prf,
// Alternate between waiting for the next request to be sent
// and waiting for its response by switching which of req.c
// and q channels are not nil.
var req request
q := requests
for req.c != nil || q != nil {
select {
case <-ctx.Done():
cleanup(i, true)
return nil, ctx.Err()
case <-stalled.C:
cleanup(i, false)
op := errors.Opf(opf, rp.raddr, blockHashes[i])
err := errors.E(op, errors.IO, "peer appears stalled")
rp.Disconnect(err)
return nil, err
case <-rp.errc:
cleanup(i, true)
return nil, rp.err
case err := <-errChan:
cleanup(i, true)
return nil, err
case req = <-q:
q = nil

// Request was sent. Reset the stall timer to
// be relative to the sending time.
if !stalled.Stop() {
<-stalled.C
}
stalled.Reset(stallTimeout - time.Now().Sub(req.t))
case m := <-req.c:
var f *gcs.FilterV2
var err error
f, err = gcs.FromBytesV2(blockcf.B, blockcf.M, m.Data)
if err != nil {
cleanup(i, true)
op := errors.Opf(opf, rp.raddr, blockHashes[i])
return nil, errors.E(op, err)
}
filters[i] = filterProof{
Filter: f,
ProofIndex: m.ProofIndex,
Proof: m.ProofHashes,
}
req = request{}
}
return err
})
}
err := g.Wait()
if err != nil {
return nil, err
}
}
return filters, nil
}
Expand Down
70 changes: 38 additions & 32 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,49 +99,55 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, rp *p2p.RemotePeer, no
}

cnet := s.wallet.ChainParams().Net
g, ctx := errgroup.WithContext(ctx)
res := make([]*gcs.FilterV2, len(nodes))
for i := range nodes {
i := i
g.Go(func() error {
node := nodes[i]
filter, proofIndex, proof, err := rp.CFilterV2(ctx, node.Hash)
if err != nil {
log.Tracef("Unable to fetch cfilter for "+
"block %v (height %d) from %v: %v",
node.Hash, node.Header.Height,
rp, err)
return err
}

err = validate.CFilterV2HeaderCommitment(cnet, node.Header,
filter, proofIndex, proof)
if err != nil {
errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+
"received from %v failed validation: %v",
node.Hash, node.Header.Height,
rp, err)
log.Warnf(errMsg)
err := errors.E(errors.Protocol, errMsg)
rp.Disconnect(err)
return err
}
// Split fetching into batches of a max size.
const cfilterBatchSize = 100
if len(nodes) > cfilterBatchSize {
g, ctx := errgroup.WithContext(ctx)
for len(nodes) > cfilterBatchSize {
batch := nodes[:cfilterBatchSize]
g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, batch) })
nodes = nodes[cfilterBatchSize:]
}
g.Go(func() error { return s.cfiltersV2FromNodes(ctx, rp, nodes) })
return g.Wait()
}

res[i] = filter
return nil
})
nodeHashes := make([]*chainhash.Hash, len(nodes))
for i := range nodes {
nodeHashes[i] = nodes[i].Hash
}
err := g.Wait()

// TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching.

filters, err := rp.CFiltersV2(ctx, nodeHashes)
if err != nil {
log.Tracef("Unable to fetch cfilter batch for "+
"from %v: %v", rp, err)
return err
}

for i := range nodes {
err = validate.CFilterV2HeaderCommitment(cnet, nodes[i].Header,
filters[i].Filter, filters[i].ProofIndex, filters[i].Proof)
if err != nil {
errMsg := fmt.Sprintf("CFilter for block %v (height %d) "+
"received from %v failed validation: %v",
nodes[i].Hash, nodes[i].Header.Height,
rp, err)
log.Warnf(errMsg)
err := errors.E(errors.Protocol, errMsg)
rp.Disconnect(err)
return err
}
}

s.sidechainMu.Lock()
for i := range nodes {
nodes[i].FilterV2 = res[i]
nodes[i].FilterV2 = filters[i].Filter
}
s.sidechainMu.Unlock()
log.Debugf("Fetched %d new cfilters(s) ending at height %d from %v",
log.Tracef("Fetched %d new cfilters(s) ending at height %d from %v",
len(nodes), nodes[len(nodes)-1].Header.Height, rp)
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,10 @@ nextbatch:
batch.rp, err)
continue nextbatch
}
if len(missingCfilter) > 0 {
log.Debugf("Fetched %d new cfilters(s) ending at height %d",
len(missingCfilter), missingCfilter[len(missingCfilter)-1].Header.Height)
}

// Switch the best chain, now that all cfilters have been
// fetched for it.
Expand Down