Skip to content

Commit

Permalink
Handle requests future drop
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jan 13, 2023
1 parent dcd060b commit 45a4928
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
4 changes: 4 additions & 0 deletions ntex-grpc/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.3] - 2023-01-13

* Handle request's future drop

## [0.3.2] - 2023-01-10

* Handle default values in HashMap
Expand Down
4 changes: 2 additions & 2 deletions ntex-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-grpc"
version = "0.3.2"
version = "0.3.3"
license = "MIT"
authors = ["Nikolay Kim <[email protected]>"]
description = "GRPC Client/Server framework"
Expand Down Expand Up @@ -31,8 +31,8 @@ thiserror = "1.0"
pin-project-lite = "0.2"

[dev-dependencies]
env_logger = { version = "0.10", default-features = false }
openssl = "0.10"
ntex = { version = "0.6.0", features = ["openssl", "tokio"] }
ntex-tls = { version = "0.2.0", features = ["openssl"] }
ntex-connect = { version = "0.2.0", features = ["openssl", "tokio"] }
env_logger = { version = "0.10", default-features = false }
18 changes: 15 additions & 3 deletions ntex-grpc/src/client/transport.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{cell::RefCell, convert::TryFrom, str::FromStr};

use ntex_bytes::{Buf, BufMut, Bytes, BytesMut};
use ntex_h2::{self as h2, client, frame::StreamId, Stream};
use ntex_h2::{self as h2, client, frame::Reason, frame::StreamId, Stream, StreamRef};
use ntex_http::{header, HeaderMap, Method, StatusCode};
use ntex_util::{channel::oneshot, future::BoxFuture, HashMap};

Expand Down Expand Up @@ -67,9 +67,10 @@ impl<T: MethodDef> Transport<T> for Client {
data: Data::Empty,
},
);
let hnd = StreamHnd(&s_ref, &self.0);
s_ref.send_payload(buf.freeze(), true).await?;

match rx.await {
let result = match rx.await {
Ok(Ok((status, mut data, headers, trailers))) => {
match status {
Some(st) => {
Expand All @@ -92,11 +93,22 @@ impl<T: MethodDef> Transport<T> for Client {
}
Ok(Err(err)) => Err(err),
Err(_) => Err(ServiceError::Canceled),
}
};
drop(hnd);
result
})
}
}

struct StreamHnd<'a>(&'a StreamRef, &'a Inner);

impl<'a> Drop for StreamHnd<'a> {
fn drop(&mut self) {
self.0.reset(Reason::CANCEL);
self.1.inflight.borrow_mut().remove(&self.0.id());
}
}

impl Inner {
pub(super) fn handle_message(&self, mut msg: h2::Message) -> Result<(), ()> {
let id = msg.id();
Expand Down

0 comments on commit 45a4928

Please sign in to comment.