Skip to content

Commit

Permalink
multistream: Do not wait for negotiation in poll_close (#319)
Browse files Browse the repository at this point in the history
This PR backports https://github.com/libp2p/rust-libp2p/pull/4019/files,
effectively ensuring that our `poll_close` flushes any data immediately
without waiting for negotiation to complete.

Closes an outdated PR: #62

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv authored Jan 29, 2025
1 parent 2412f3d commit 78d934f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
4 changes: 1 addition & 3 deletions src/multistream_select/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,9 +548,7 @@ mod tests {
io.close().await.unwrap();
});

// TODO: Once https://github.com/paritytech/litep2p/pull/62 is merged, this
// should be changed to `is_ok`.
assert!(tokio::time::timeout(Duration::from_secs(10), client).await.is_err());
assert!(tokio::time::timeout(Duration::from_secs(10), client).await.is_ok());
}

#[tokio::test]
Expand Down
15 changes: 11 additions & 4 deletions src/multistream_select/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,22 @@ where
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
// Ensure all data has been flushed and expected negotiation messages
// have been received.
ready!(self.as_mut().poll(cx).map_err(Into::<io::Error>::into)?);
// Ensure all data has been flushed, including optimistic multistream-select messages.
ready!(self.as_mut().poll_flush(cx).map_err(Into::<io::Error>::into)?);

// Continue with the shutdown of the underlying I/O stream.
match self.project().state.project() {
StateProj::Completed { io, .. } => io.poll_close(cx),
StateProj::Expecting { io, .. } => io.poll_close(cx),
StateProj::Expecting { io, .. } => {
let close_poll = io.poll_close(cx);
if let Poll::Ready(Ok(())) = close_poll {
tracing::debug!(
target: LOG_TARGET,
"Stream closed. Confirmation from remote for optimstic protocol negotiation still pending."
);
}
close_poll
}
StateProj::Invalid => panic!("Negotiated: Invalid state"),
}
}
Expand Down

0 comments on commit 78d934f

Please sign in to comment.