From 45a49282eba557a3d7ff777838d527444e50d04d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 13 Jan 2023 08:03:58 +0100 Subject: [PATCH] Handle requests future drop --- ntex-grpc/CHANGES.md | 4 ++++ ntex-grpc/Cargo.toml | 4 ++-- ntex-grpc/src/client/transport.rs | 18 +++++++++++++++--- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/ntex-grpc/CHANGES.md b/ntex-grpc/CHANGES.md index 70c4fe1..7b65378 100644 --- a/ntex-grpc/CHANGES.md +++ b/ntex-grpc/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.3] - 2023-01-13 + +* Handle request's future drop + ## [0.3.2] - 2023-01-10 * Handle default values in HashMap diff --git a/ntex-grpc/Cargo.toml b/ntex-grpc/Cargo.toml index 99e3d41..b5b9418 100644 --- a/ntex-grpc/Cargo.toml +++ b/ntex-grpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-grpc" -version = "0.3.2" +version = "0.3.3" license = "MIT" authors = ["Nikolay Kim "] description = "GRPC Client/Server framework" @@ -31,8 +31,8 @@ thiserror = "1.0" pin-project-lite = "0.2" [dev-dependencies] -env_logger = { version = "0.10", default-features = false } openssl = "0.10" ntex = { version = "0.6.0", features = ["openssl", "tokio"] } ntex-tls = { version = "0.2.0", features = ["openssl"] } ntex-connect = { version = "0.2.0", features = ["openssl", "tokio"] } +env_logger = { version = "0.10", default-features = false } diff --git a/ntex-grpc/src/client/transport.rs b/ntex-grpc/src/client/transport.rs index cdbe360..c6c5017 100644 --- a/ntex-grpc/src/client/transport.rs +++ b/ntex-grpc/src/client/transport.rs @@ -1,7 +1,7 @@ use std::{cell::RefCell, convert::TryFrom, str::FromStr}; use ntex_bytes::{Buf, BufMut, Bytes, BytesMut}; -use ntex_h2::{self as h2, client, frame::StreamId, Stream}; +use ntex_h2::{self as h2, client, frame::Reason, frame::StreamId, Stream, StreamRef}; use ntex_http::{header, HeaderMap, Method, StatusCode}; use ntex_util::{channel::oneshot, future::BoxFuture, HashMap}; @@ -67,9 +67,10 @@ impl Transport for Client { data: Data::Empty, }, ); + let hnd = StreamHnd(&s_ref, &self.0); s_ref.send_payload(buf.freeze(), true).await?; - match rx.await { + let result = match rx.await { Ok(Ok((status, mut data, headers, trailers))) => { match status { Some(st) => { @@ -92,11 +93,22 @@ impl Transport for Client { } Ok(Err(err)) => Err(err), Err(_) => Err(ServiceError::Canceled), - } + }; + drop(hnd); + result }) } } +struct StreamHnd<'a>(&'a StreamRef, &'a Inner); + +impl<'a> Drop for StreamHnd<'a> { + fn drop(&mut self) { + self.0.reset(Reason::CANCEL); + self.1.inflight.borrow_mut().remove(&self.0.id()); + } +} + impl Inner { pub(super) fn handle_message(&self, mut msg: h2::Message) -> Result<(), ()> { let id = msg.id();