Skip to content

Commit

Permalink
Merge pull request #809 from ipfs/revert-peer-exclude-cancel
Browse files Browse the repository at this point in the history
Revert peer exclude cancel
gammazero authored Jan 23, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 13fd890 + 6f65ed7 commit 3e3de8f
Showing 10 changed files with 24 additions and 75 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -39,7 +39,6 @@ The following emojis are used to highlight certain changes:

### Changed

- `bitswap/client`: Do not send CANCEL to peer that block was received from, as this is redundant. [#784](https://github.com/ipfs/boxo/pull/784)
- `gateway`: The default DNSLink resolver for `.eth` TLD changed to `https://dns.eth.limo/dns-query` [#781](https://github.com/ipfs/boxo/pull/781)
- `gateway`: The default DNSLink resolver for `.crypto` TLD changed to `https://resolver.unstoppable.io/dns-query` [#782](https://github.com/ipfs/boxo/pull/782)
- upgrade to `go-libp2p-kad-dht` [v0.28.2](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.28.2)
4 changes: 2 additions & 2 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
@@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci

// SendCancels sends cancels for the given keys to all peers who had previously
// received a want for those keys.
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) {
func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

// Send a CANCEL to each peer that has been sent a want-block or want-have
pm.pwm.sendCancels(cancelKs, excludePeer)
pm.pwm.sendCancels(cancelKs)
}

// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
49 changes: 3 additions & 46 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
@@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) {
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "")
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]})
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
@@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) {
}

// Send cancels for all cids
peerManager.SendCancels(ctx, cids, "")
peerManager.SendCancels(ctx, cids)
collected = collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
@@ -261,49 +261,6 @@ func TestSendCancels(t *testing.T) {
}
}

func TestSendCancelsExclude(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
msgs := make(chan msg, 16)
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
self, peer1, peer2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)
cids := random.Cids(4)

// Connect to peer1 and peer2
peerManager.Connected(peer1)
peerManager.Connected(peer2)

// Send 2 want-blocks and 1 want-have to peer1
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})

// Clear messages
collectMessages(msgs, 2*time.Millisecond)

// Send cancels for 1 want-block and 1 want-have, excluding peer1.
peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1)
collected := collectMessages(msgs, 2*time.Millisecond)

if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 0 {
t.Fatal("Expected no cancels to be sent to excluded peer")
}

// Send cancels for all cids. Expect cancels for the 1 remaining sid that
// was not previously canceled.
peerManager.SendCancels(ctx, cids, "")
collected = collectMessages(msgs, 2*time.Millisecond)
if _, ok := collected[peer2]; ok {
t.Fatal("Expected no cancels to be sent to peer that was not sent messages")
}
if len(collected[peer1].cancels) != 1 {
t.Fatalf("Expected cancel to be sent for 1 want-blocks, got %d", len(collected[peer1].cancels))
}
}

func (s *sess) ID() uint64 {
return s.id
}
@@ -419,7 +376,7 @@ func BenchmarkPeerManager(b *testing.B) {
limit := len(wanted) / 10
cancel := wanted[:limit]
wanted = wanted[limit:]
peerManager.SendCancels(ctx, cancel, "")
peerManager.SendCancels(ctx, cancel)
}
}
}
19 changes: 6 additions & 13 deletions bitswap/client/internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
@@ -233,7 +233,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves

// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID) {
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
if len(cancelKs) == 0 {
return
}
@@ -257,15 +257,8 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID)

// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
noSend := p == excludePeer

var toCancel []cid.Cid

// If peer is not excluded, then send broadcast cancels to this peer.
if !noSend {
// Start from the broadcast cancels
toCancel = broadcastCancels
}
// Start from the broadcast cancels
toCancel := broadcastCancels

// For each key to be cancelled
for _, c := range cancelKs {
@@ -278,9 +271,9 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID)
pws.wantBlocks.Remove(c)
pws.wantHaves.Remove(c)

// If peer is not excluded and this a broadcast want is not already
// added it to the peer cancels, then add the cancel.
if !noSend && !pwm.broadcastWants.Has(c) {
// If it's a broadcast want, we've already added it to
// the peer cancels.
if !pwm.broadcastWants.Has(c) {
toCancel = append(toCancel, c)
}
}
10 changes: 5 additions & 5 deletions bitswap/client/internal/peermanager/peerwantmanager_test.go
Original file line number Diff line number Diff line change
@@ -245,7 +245,7 @@ func TestPWMSendCancels(t *testing.T) {

// Cancel 1 want-block and 1 want-have that were sent to p0
clearSent(peerQueues)
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}, "")
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]})
// Should cancel the want-block and want-have
require.Empty(t, pq1.cancels, "Expected no cancels sent to p1")
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[0], wh1[0]}, "Expected 2 cids to be cancelled")
@@ -255,7 +255,7 @@ func TestPWMSendCancels(t *testing.T) {
// Cancel everything
clearSent(peerQueues)
allCids := append(allwb, allwh...)
pwm.sendCancels(allCids, "")
pwm.sendCancels(allCids)
// Should cancel the remaining want-blocks and want-haves for p0
require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[1], wh1[1]}, "Expected un-cancelled cids to be cancelled")

@@ -312,7 +312,7 @@ func TestStats(t *testing.T) {
// Cancel 1 want-block that was sent to p0
// and 1 want-block that was not sent
cids5 := random.Cids(1)
pwm.sendCancels(append(cids5, cids[0]), "")
pwm.sendCancels(append(cids5, cids[0]))

require.Equal(t, 7, g.count, "Expected 7 wants")
require.Equal(t, 3, wbg.count, "Expected 3 want-blocks")
@@ -332,7 +332,7 @@ func TestStats(t *testing.T) {
require.Zero(t, wbg.count, "Expected 0 want-blocks")

// Cancel one remaining broadcast want-have
pwm.sendCancels(cids2[:1], "")
pwm.sendCancels(cids2[:1])
require.Equal(t, 2, g.count, "Expected 2 wants")
require.Zero(t, wbg.count, "Expected 0 want-blocks")
}
@@ -362,7 +362,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) {
require.Equal(t, 4, wbg.count, "Expected 4 want-blocks")

// Cancel 1 of each group of cids
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}, "")
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]})

require.Equal(t, 2, g.count, "Expected 2 wants")
require.Equal(t, 2, wbg.count, "Expected 2 want-blocks")
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ type PeerManager interface {
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
// SendCancels tells the PeerManager to send cancels to all peers
SendCancels(context.Context, []cid.Cid, peer.ID)
SendCancels(context.Context, []cid.Cid)
}

// SessionManager manages all the sessions
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
@@ -151,7 +151,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci
case <-ctx.Done():
}
}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {}
func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
6 changes: 3 additions & 3 deletions bitswap/client/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
@@ -78,9 +78,9 @@ func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool {
return false
}

func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid, peer.ID) {}
func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {}

func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool {
pm.lk.Lock()
4 changes: 2 additions & 2 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
@@ -173,7 +173,7 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid
}

// Send CANCEL to all peers with want-have / want-block
sm.peerManager.SendCancels(ctx, blks, p)
sm.peerManager.SendCancels(ctx, blks)
}

// CancelSessionWants is called when a session cancels wants because a call to
@@ -193,5 +193,5 @@ func (sm *SessionManager) cancelWants(wants []cid.Cid) {
// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context may already be Done.
sm.peerManager.SendCancels(sm.ctx, wants, "")
sm.peerManager.SendCancels(sm.ctx, wants)
}
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session)
func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true }
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
fpm.lk.Lock()
defer fpm.lk.Unlock()
fpm.cancels = append(fpm.cancels, cancels...)

0 comments on commit 3e3de8f

Please sign in to comment.