Skip to content

Commit

Permalink
Revert "do not send cancel message to peer that sent block"
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Jan 22, 2025
1 parent ab6cdc0 commit 6f65ed7
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 64 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ The following emojis are used to highlight certain changes:

### Changed

- `bitswap/client`: Do not send CANCEL to peer that block was received from, as this is redundant. [#784](https://github.com/ipfs/boxo/pull/784)
- `gateway`: The default DNSLink resolver for `.eth` TLD changed to `https://dns.eth.limo/dns-query` [#781](https://github.com/ipfs/boxo/pull/781)
- `gateway`: The default DNSLink resolver for `.crypto` TLD changed to `https://resolver.unstoppable.io/dns-query` [#782](https://github.com/ipfs/boxo/pull/782)
- upgrade to `go-libp2p-kad-dht` [v0.28.2](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.28.2)
Expand Down
4 changes: 2 additions & 2 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci

// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) {
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

// Send a CANCEL to each peer that has been sent a want-block or want-have
pm.pwm.sendCancels(cancelKs, excludePeer)
pm.pwm.sendCancels(cancelKs)
}

// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
Expand Down
49 changes: 3 additions & 46 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) {
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "")
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]})
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) {
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids, "")
peerManager.SendCancels(ctx, cids)
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
Expand All @@ -261,49 +261,6 @@ func TestSendCancels(t *testing.T) {
}
}

func TestSendCancelsExclude(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
self, peer1, peer2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)
cids := random.Cids(4)

// Connect to peer1 and peer2
peerManager.Connected(peer1)
peerManager.Connected(peer2)

// Send 2 want-blocks and 1 want-have to peer1
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})

// Clear messages
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1)
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 0 {
t.Fatal("Expected no cancels to be sent to excluded peer")
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids, "")
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 3 {
t.Fatal("Expected cancel to be sent for want-blocks")
}
}

func (s *sess) ID() uint64 {
return s.id
}
Expand Down Expand Up @@ -419,7 +376,7 @@ func BenchmarkPeerManager(b *testing.B) {
limit := len(wanted) / 10
cancel := wanted[:limit]
wanted = wanted[limit:]
peerManager.SendCancels(ctx, cancel, "")
peerManager.SendCancels(ctx, cancel)
}
}
}
3 changes: 1 addition & 2 deletions bitswap/client/internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves

// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID) {
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
if len(cancelKs) == 0 {
return
}
Expand Down Expand Up @@ -298,7 +298,6 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID)
cancelPeers[p] = struct{}{}
}
}
delete(cancelPeers, excludePeer)
for p := range cancelPeers {
pws, ok := pwm.peerWants[p]
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions bitswap/client/internal/peermanager/peerwantmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestPWMSendCancels(t *testing.T) {

// Cancel 1 want-block and 1 want-have that were sent to p0
clearSent(peerQueues)
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}, "")
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]})
// Should cancel the want-block and want-have
require.Empty(t, pq1.cancels, "Expected no cancels sent to p1")
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[0], wh1[0]}, "Expected 2 cids to be cancelled")
Expand All @@ -255,7 +255,7 @@ func TestPWMSendCancels(t *testing.T) {
// Cancel everything
clearSent(peerQueues)
allCids := append(allwb, allwh...)
pwm.sendCancels(allCids, "")
pwm.sendCancels(allCids)
// Should cancel the remaining want-blocks and want-haves for p0
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[1], wh1[1]}, "Expected un-cancelled cids to be cancelled")

Expand Down Expand Up @@ -312,7 +312,7 @@ func TestStats(t *testing.T) {
// Cancel 1 want-block that was sent to p0
// and 1 want-block that was not sent
cids5 := random.Cids(1)
pwm.sendCancels(append(cids5, cids[0]), "")
pwm.sendCancels(append(cids5, cids[0]))

require.Equal(t, 7, g.count, "Expected 7 wants")
require.Equal(t, 3, wbg.count, "Expected 3 want-blocks")
Expand All @@ -332,7 +332,7 @@ func TestStats(t *testing.T) {
require.Zero(t, wbg.count, "Expected 0 want-blocks")

// Cancel one remaining broadcast want-have
pwm.sendCancels(cids2[:1], "")
pwm.sendCancels(cids2[:1])
require.Equal(t, 2, g.count, "Expected 2 wants")
require.Zero(t, wbg.count, "Expected 0 want-blocks")
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) {
require.Equal(t, 4, wbg.count, "Expected 4 want-blocks")

// Cancel 1 of each group of cids
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}, "")
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]})

require.Equal(t, 2, g.count, "Expected 2 wants")
require.Equal(t, 2, wbg.count, "Expected 2 want-blocks")
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type PeerManager interface {
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
// SendCancels tells the PeerManager to send cancels to all peers
SendCancels(context.Context, []cid.Cid, peer.ID)
SendCancels(context.Context, []cid.Cid)
}

// SessionManager manages all the sessions
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci
case <-ctx.Done():
}
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand Down
6 changes: 3 additions & 3 deletions bitswap/client/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool {
return false
}

func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid, peer.ID) {}
func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {}

func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool {
pm.lk.Lock()
Expand Down
4 changes: 2 additions & 2 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid
}

// Send CANCEL to all peers with want-have / want-block
sm.peerManager.SendCancels(ctx, blks, p)
sm.peerManager.SendCancels(ctx, blks)
}

// CancelSessionWants is called when a session cancels wants because a call to
Expand All @@ -193,5 +193,5 @@ func (sm *SessionManager) cancelWants(wants []cid.Cid) {
// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context may already be Done.
sm.peerManager.SendCancels(sm.ctx, wants, "")
sm.peerManager.SendCancels(sm.ctx, wants)
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session)
func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true }
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
fpm.lk.Lock()
defer fpm.lk.Unlock()
fpm.cancels = append(fpm.cancels, cancels...)
Expand Down

0 comments on commit 6f65ed7

Please sign in to comment.