Skip to content

Commit 02c1783

Browse files
committed
review: handler, request info on each new substream,
instead of caching the behaviour info.
1 parent a84178f commit 02c1783

File tree

2 files changed

+68
-74
lines changed

2 files changed

+68
-74
lines changed

protocols/identify/src/behaviour.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub struct Behaviour {
4949
/// For each peer we're connected to, the observed address to send back to it.
5050
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
5151
/// Information requests from the handlers to be fullfiled.
52-
requests: VecDeque<PeerId>,
52+
requests: VecDeque<Request>,
5353
/// Pending events to be emitted when polled.
5454
events: VecDeque<NetworkBehaviourAction<Event, Proto>>,
5555
/// Peers to which an active push with current information about
@@ -59,6 +59,12 @@ pub struct Behaviour {
5959
discovered_peers: PeerCache,
6060
}
6161

62+
/// A `Handler` request for `BehaviourInfo`.
63+
struct Request {
64+
peer_id: PeerId,
65+
connection_id: ConnectionId,
66+
}
67+
6268
/// Configuration for the [`identify::Behaviour`](Behaviour).
6369
#[non_exhaustive]
6470
#[derive(Debug, Clone)]
@@ -234,7 +240,7 @@ impl NetworkBehaviour for Behaviour {
234240
fn on_connection_handler_event(
235241
&mut self,
236242
peer_id: PeerId,
237-
_connection: ConnectionId,
243+
connection_id: ConnectionId,
238244
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
239245
) {
240246
match event {
@@ -272,7 +278,10 @@ impl NetworkBehaviour for Behaviour {
272278
}));
273279
}
274280
handler::Event::Identify => {
275-
self.requests.push_back(peer_id);
281+
self.requests.push_back(Request {
282+
peer_id,
283+
connection_id,
284+
});
276285
}
277286
handler::Event::IdentificationError(error) => {
278287
self.events
@@ -328,14 +337,18 @@ impl NetworkBehaviour for Behaviour {
328337
}
329338

330339
// Check for information requests from the handlers.
331-
if let Some(peer) = self.requests.pop_front() {
340+
if let Some(Request {
341+
peer_id,
342+
connection_id,
343+
}) = self.requests.pop_front()
344+
{
332345
let info = BehaviourInfo {
333346
listen_addrs: listen_addrs(params),
334347
protocols: supported_protocols(params),
335348
};
336349
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
337-
peer_id: peer,
338-
handler: NotifyHandler::Any,
350+
peer_id,
351+
handler: NotifyHandler::One(connection_id),
339352
event: InEvent::Identify(info),
340353
});
341354
}

protocols/identify/src/handler.rs

+49-68
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,10 @@ impl IntoConnectionHandler for Proto {
9090
}
9191
}
9292

93-
/// A pending reply to an inbound identification request.
94-
enum Pending {
95-
/// The reply is queued for sending.
96-
Queued(ReplySubstream<NegotiatedSubstream>),
97-
/// The reply is being sent.
98-
Sending {
99-
peer: PeerId,
100-
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
101-
},
93+
/// A reply to an inbound identification request.
94+
struct Sending {
95+
peer: PeerId,
96+
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
10297
}
10398

10499
/// Protocol handler for sending and receiving identification requests.
@@ -119,8 +114,11 @@ pub struct Handler {
119114
>; 4],
120115
>,
121116

122-
/// Pending replies to send.
123-
pending_replies: VecDeque<Pending>,
117+
/// Streams awaiting `BehaviourInfo` to then send identify requests.
118+
reply_streams: VecDeque<ReplySubstream<NegotiatedSubstream>>,
119+
120+
/// Pending identification replies, awaiting being sent.
121+
pending_replies: VecDeque<Sending>,
124122

125123
/// Future that fires when we need to identify the node again.
126124
trigger_next_identify: Delay,
@@ -144,9 +142,6 @@ pub struct Handler {
144142

145143
/// Address observed by or for the remote.
146144
observed_addr: Multiaddr,
147-
148-
/// Information provided by the `Behaviour` upon requesting.
149-
behaviour_info: Option<BehaviourInfo>,
150145
}
151146

152147
/// Information provided by the `Behaviour` upon requesting.
@@ -199,6 +194,7 @@ impl Handler {
199194
remote_peer_id,
200195
inbound_identify_push: Default::default(),
201196
events: SmallVec::new(),
197+
reply_streams: VecDeque::new(),
202198
pending_replies: VecDeque::new(),
203199
trigger_next_identify: Delay::new(initial_delay),
204200
keep_alive: KeepAlive::Yes,
@@ -207,7 +203,6 @@ impl Handler {
207203
protocol_version,
208204
agent_version,
209205
observed_addr,
210-
behaviour_info: None,
211206
}
212207
}
213208

@@ -222,20 +217,16 @@ impl Handler {
222217
) {
223218
match output {
224219
EitherOutput::First(substream) => {
225-
// If we already have `BehaviourInfo` we can proceed responding to the Identify request,
226-
// if not, we request it .
227-
if self.behaviour_info.is_none() {
228-
self.events
229-
.push(ConnectionHandlerEvent::Custom(Event::Identify));
230-
}
231-
if !self.pending_replies.is_empty() {
220+
self.events
221+
.push(ConnectionHandlerEvent::Custom(Event::Identify));
222+
if !self.reply_streams.is_empty() {
232223
warn!(
233224
"New inbound identify request from {} while a previous one \
234225
is still pending. Queueing the new one.",
235226
self.remote_peer_id,
236227
);
237228
}
238-
self.pending_replies.push_back(Pending::Queued(substream));
229+
self.reply_streams.push_back(substream);
239230
}
240231
EitherOutput::Second(fut) => {
241232
if self.inbound_identify_push.replace(fut).is_some() {
@@ -319,8 +310,24 @@ impl ConnectionHandler for Handler {
319310
),
320311
});
321312
}
322-
InEvent::Identify(info) => {
323-
self.behaviour_info = Some(info);
313+
InEvent::Identify(behaviour_info) => {
314+
let info = Info {
315+
public_key: self.public_key.clone(),
316+
protocol_version: self.protocol_version.clone(),
317+
agent_version: self.agent_version.clone(),
318+
listen_addrs: behaviour_info.listen_addrs.clone(),
319+
protocols: behaviour_info.protocols.clone(),
320+
observed_addr: self.observed_addr.clone(),
321+
};
322+
let substream = self
323+
.reply_streams
324+
.pop_front()
325+
.expect("A BehaviourInfo reply should have a matching substream.");
326+
let io = Box::pin(substream.send(info));
327+
self.pending_replies.push_back(Sending {
328+
peer: self.remote_peer_id,
329+
io,
330+
});
324331
}
325332
}
326333
}
@@ -364,49 +371,23 @@ impl ConnectionHandler for Handler {
364371
}
365372

366373
// Check for pending replies to send.
367-
if let Some(ref info) = self.behaviour_info {
368-
if let Some(mut pending) = self.pending_replies.pop_front() {
369-
loop {
370-
match pending {
371-
Pending::Queued(io) => {
372-
let info = Info {
373-
public_key: self.public_key.clone(),
374-
protocol_version: self.protocol_version.clone(),
375-
agent_version: self.agent_version.clone(),
376-
listen_addrs: info.listen_addrs.clone(),
377-
protocols: info.protocols.clone(),
378-
observed_addr: self.observed_addr.clone(),
379-
};
380-
let io = Box::pin(io.send(info.clone()));
381-
pending = Pending::Sending {
382-
peer: self.remote_peer_id,
383-
io,
384-
};
385-
}
386-
Pending::Sending { peer, mut io } => {
387-
match Future::poll(Pin::new(&mut io), cx) {
388-
Poll::Pending => {
389-
self.pending_replies
390-
.push_front(Pending::Sending { peer, io });
391-
return Poll::Pending;
392-
}
393-
Poll::Ready(Ok(())) => {
394-
return Poll::Ready(ConnectionHandlerEvent::Custom(
395-
Event::Identification(peer),
396-
));
397-
}
398-
Poll::Ready(Err(err)) => {
399-
return Poll::Ready(ConnectionHandlerEvent::Custom(
400-
Event::IdentificationError(
401-
ConnectionHandlerUpgrErr::Upgrade(
402-
libp2p_core::upgrade::UpgradeError::Apply(err),
403-
),
404-
),
405-
))
406-
}
407-
}
408-
}
409-
}
374+
if let Some(mut sending) = self.pending_replies.pop_front() {
375+
match Future::poll(Pin::new(&mut sending.io), cx) {
376+
Poll::Pending => {
377+
self.pending_replies.push_front(sending);
378+
return Poll::Pending;
379+
}
380+
Poll::Ready(Ok(())) => {
381+
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Identification(
382+
sending.peer,
383+
)));
384+
}
385+
Poll::Ready(Err(err)) => {
386+
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::IdentificationError(
387+
ConnectionHandlerUpgrErr::Upgrade(
388+
libp2p_core::upgrade::UpgradeError::Apply(err),
389+
),
390+
)))
410391
}
411392
}
412393
}

0 commit comments

Comments
 (0)