Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle implicit resets at the right time #833

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Counts {
}
}

if stream.is_counted {
if !stream.state.is_scheduled_reset() && stream.is_counted {
tracing::trace!("dec_num_streams; stream={:?}", stream.id);
// Decrement the number of active streams.
self.dec_num_streams(&mut stream);
Expand Down
5 changes: 4 additions & 1 deletion src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,11 @@ impl Prioritize {
}

pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_send.pop(store) {
while let Some(mut stream) = self.pending_send.pop(store) {
let is_pending_reset = stream.is_pending_reset_expiration();
if let Some(reason) = stream.state.get_scheduled_reset() {
stream.set_reset(reason, Initiator::Library);
}
counts.transition_after(stream, is_pending_reset);
}
}
Expand Down
137 changes: 137 additions & 0 deletions tests/h2-tests/tests/stream_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,3 +1218,140 @@ async fn reset_new_stream_before_send() {

join(srv, client).await;
}

#[tokio::test]
async fn explicit_reset_with_max_concurrent_stream() {
h2_support::trace_init!();

let (io, mut srv) = mock::new();

let mock = async move {
let settings = srv
.assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1))
.await;
assert_default_settings!(settings);

srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::headers(1).response(200)).await;

srv.recv_frame(frames::reset(1).cancel()).await;

srv.recv_frame(
frames::headers(3)
.request("POST", "https://www.example.com/")
.eos(),
)
.await;
srv.send_frame(frames::headers(3).response(200)).await;
};

let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();

{
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (resp, mut stream) = client.send_request(request, false).unwrap();

{
let resp = h2.drive(resp).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}

stream.send_reset(Reason::CANCEL);
};

{
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (resp, _) = client.send_request(request, true).unwrap();

{
let resp = h2.drive(resp).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
};

h2.await.unwrap();
};

join(mock, h2).await;
}

#[tokio::test]
async fn implicit_cancel_with_max_concurrent_stream() {
h2_support::trace_init!();

let (io, mut srv) = mock::new();

let mock = async move {
let settings = srv
.assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1))
.await;
assert_default_settings!(settings);

srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::headers(1).response(200)).await;

srv.recv_frame(frames::reset(1).cancel()).await;

srv.recv_frame(
frames::headers(3)
.request("POST", "https://www.example.com/")
.eos(),
)
.await;
srv.send_frame(frames::headers(3).response(200)).await;
};

let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();

{
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (resp, stream) = client.send_request(request, false).unwrap();

{
let resp = h2.drive(resp).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}

// This implicitly resets the stream with CANCEL.
drop(stream);
};

{
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (resp, _) = client.send_request(request, true).unwrap();

{
let resp = h2.drive(resp).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
};

h2.await.unwrap();
};

join(mock, h2).await;
}
Loading