Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: Add batched cfilter fetching messages #3211

Merged
merged 4 commits into from
May 13, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
multi: Respond to getcfsv2 message.
This adds the appropriate processing to the peer and server structs to
respond to the recently introduced getcfsv2 message.  It also bumps the
peer and server max supported protocol versions to version 10
(BatchedCFiltersV2Version).

This message queries the chain for a batch of committed filters spanning
a set of sequential blocks and will be used by SPV clients to fetch
committed filters during their initial sync process.
matheusd committed May 13, 2024
commit 5a28304e53f8618e6479b3f337d2e5e44116acff
2 changes: 2 additions & 0 deletions peer/go.mod
Original file line number Diff line number Diff line change
@@ -12,6 +12,8 @@ require (
github.com/decred/slog v1.2.0
)

replace github.com/decred/dcrd/wire => ../wire

require (
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect
github.com/dchest/siphash v1.2.3 // indirect
19 changes: 18 additions & 1 deletion peer/peer.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ import (

const (
// MaxProtocolVersion is the max protocol version the peer supports.
MaxProtocolVersion = wire.RemoveRejectVersion
MaxProtocolVersion = wire.BatchedCFiltersV2Version

// outputBufferSize is the number of elements the output channels use.
outputBufferSize = 5000
@@ -129,6 +129,9 @@ type MessageListeners struct {
// OnCFilterV2 is invoked when a peer receives a cfilterv2 wire message.
OnCFilterV2 func(p *Peer, msg *wire.MsgCFilterV2)

// OnCFiltersV2 is invoked when a peer receives a cfiltersv2 wire message.
OnCFiltersV2 func(p *Peer, msg *wire.MsgCFiltersV2)

// OnCFHeaders is invoked when a peer receives a cfheaders wire
// message.
OnCFHeaders func(p *Peer, msg *wire.MsgCFHeaders)
@@ -163,6 +166,10 @@ type MessageListeners struct {
// message.
OnGetCFilterV2 func(p *Peer, msg *wire.MsgGetCFilterV2)

// OnGetCFiltersV2 is invoked when a peer receives a getcfsv2 wire
// message.
OnGetCFiltersV2 func(p *Peer, msg *wire.MsgGetCFsV2)

// OnGetCFHeaders is invoked when a peer receives a getcfheaders
// wire message.
OnGetCFHeaders func(p *Peer, msg *wire.MsgGetCFHeaders)
@@ -1423,6 +1430,16 @@ out:
p.cfg.Listeners.OnCFilterV2(p, msg)
}

case *wire.MsgGetCFsV2:
if p.cfg.Listeners.OnGetCFiltersV2 != nil {
p.cfg.Listeners.OnGetCFiltersV2(p, msg)
}

case *wire.MsgCFiltersV2:
if p.cfg.Listeners.OnCFiltersV2 != nil {
p.cfg.Listeners.OnCFiltersV2(p, msg)
}

case *wire.MsgGetInitState:
if p.cfg.Listeners.OnGetInitState != nil {
p.cfg.Listeners.OnGetInitState(p, msg)
14 changes: 14 additions & 0 deletions peer/peer_test.go
Original file line number Diff line number Diff line change
@@ -411,6 +411,12 @@ func TestPeerListeners(t *testing.T) {
OnInitState: func(p *Peer, msg *wire.MsgInitState) {
ok <- msg
},
OnGetCFiltersV2: func(p *Peer, msg *wire.MsgGetCFsV2) {
ok <- msg
},
OnCFiltersV2: func(p *Peer, msg *wire.MsgCFiltersV2) {
ok <- msg
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
@@ -565,6 +571,14 @@ func TestPeerListeners(t *testing.T) {
"OnInitState",
wire.NewMsgInitState(),
},
{
"OnGetCFiltersV2",
wire.NewMsgGetCFsV2(&chainhash.Hash{}, &chainhash.Hash{}),
},
{
"OnCFiltersV2",
wire.NewMsgCFiltersV2([]wire.MsgCFilterV2{}),
},
}
t.Logf("Running %d tests", len(tests))
for _, test := range tests {
13 changes: 12 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ const (
connectionRetryInterval = time.Second * 5

// maxProtocolVersion is the max protocol version the server supports.
maxProtocolVersion = wire.RemoveRejectVersion
maxProtocolVersion = wire.BatchedCFiltersV2Version

// These fields are used to track known addresses on a per-peer basis.
//
@@ -1522,6 +1522,16 @@ func (sp *serverPeer) OnGetCFilterV2(_ *peer.Peer, msg *wire.MsgGetCFilterV2) {
sp.QueueMessage(filterMsg, nil)
}

// OnGetCFiltersV2 is invoked when a peer receives a getcfsv2 wire message.
func (sp *serverPeer) OnGetCFiltersV2(_ *peer.Peer, msg *wire.MsgGetCFsV2) {
filtersMsg, err := sp.server.chain.LocateCFiltersV2(&msg.StartHash, &msg.EndHash)
if err != nil {
return
}

sp.QueueMessage(filtersMsg, nil)
}

// OnGetCFHeaders is invoked when a peer receives a getcfheader wire message.
func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
// Disconnect and/or ban depending on the node cf services flag and
@@ -2308,6 +2318,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
OnGetHeaders: sp.OnGetHeaders,
OnGetCFilter: sp.OnGetCFilter,
OnGetCFilterV2: sp.OnGetCFilterV2,
OnGetCFiltersV2: sp.OnGetCFiltersV2,
OnGetCFHeaders: sp.OnGetCFHeaders,
OnGetCFTypes: sp.OnGetCFTypes,
OnGetAddr: sp.OnGetAddr,