Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

accelerated-dht: cleanup peer from message sender on disconnection #1009

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
"github.com/multiformats/go-multihash"

"github.com/libp2p/go-libp2p-routing-helpers/tracing"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -98,6 +100,8 @@
bulkSendParallelism int

self peer.ID

peerConnectednessSubscriber event.Subscription
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
Expand Down Expand Up @@ -151,6 +155,11 @@
}
}

sub, err := h.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("fullrt-dht"))
if err != nil {
return nil, fmt.Errorf("peer connectedness subscription failed: %w", err)
}

Check warning on line 161 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L158-L161

Added lines #L158 - L161 were not covered by tests

ctx, cancel := context.WithCancel(context.Background())

self := h.ID()
Expand Down Expand Up @@ -195,14 +204,14 @@

crawlerInterval: fullrtcfg.crawlInterval,

bulkSendParallelism: fullrtcfg.bulkSendParallelism,

self: self,
bulkSendParallelism: fullrtcfg.bulkSendParallelism,
self: self,
peerConnectednessSubscriber: sub,

Check warning on line 209 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L207-L209

Added lines #L207 - L209 were not covered by tests
}

rt.wg.Add(1)
rt.wg.Add(2)

Check warning on line 212 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L212

Added line #L212 was not covered by tests
go rt.runCrawler(ctx)

go rt.runSubscriber()

Check warning on line 214 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L214

Added line #L214 was not covered by tests
return rt, nil
}

Expand All @@ -211,6 +220,29 @@
key kadkey.Key
}

func (dht *FullRT) runSubscriber() {
defer dht.wg.Done()
ms, ok := dht.messageSender.(dht_pb.MessageSenderWithDisconnect)
if !ok {
return
}
defer dht.peerConnectednessSubscriber.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved up so that dht.peerConnectednessSubscriber.Close() gets called even if the type assertion returns false?

for {
select {
case e := <-dht.peerConnectednessSubscriber.Out():
pc, ok := e.(event.EvtPeerConnectednessChanged)
if !ok {
logger.Errorf("invalid event message type: %T", e)
}
if pc.Connectedness != network.Connected {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
if pc.Connectedness != network.Connected {
} else if pc.Connectedness != network.Connected {

gammazero marked this conversation as resolved.
Show resolved Hide resolved
ms.OnDisconnect(dht.ctx, pc.Peer)
}
case <-dht.ctx.Done():
return

Check warning on line 241 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L223-L241

Added lines #L223 - L241 were not covered by tests
}
}
}

func (dht *FullRT) TriggerRefresh(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down
Loading