Skip to content

Commit

Permalink
Add disconnect on request drop for client (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Jan 30, 2025
1 parent acb5734 commit 114afac
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 93 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.7.0] - 2025-01-30

* Add disconnect on drop request for client

## [1.6.1] - 2025-01-14

* Expose client internal connection object
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-h2"
version = "1.6.2"
version = "1.7.0"
license = "MIT OR Apache-2.0"
authors = ["Nikolay Kim <[email protected]>"]
description = "An HTTP/2 client and server"
Expand All @@ -20,7 +20,7 @@ default = []
unstable = []

[package.metadata.docs.rs]
features = ["ntex-net/tokio"]
features = []

[dependencies]
ntex-net = "2"
Expand Down
189 changes: 103 additions & 86 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,48 +66,7 @@ impl Client {
eof: bool,
) -> Result<(SendStream, RecvStream), ClientError> {
loop {
let (client, num) = {
let mut connections = self.inner.connections.borrow_mut();

// cleanup connections
let mut idx = 0;
while idx < connections.len() {
if connections[idx].is_closed() {
connections.remove(idx);
} else {
idx += 1;
}
}
let num = connections.len();
if self.inner.minconn > 0 && num < self.inner.minconn {
// create new connection
(None, num)
} else {
// first search for connections with less than 50% capacity usage
let client = connections.iter().find(|item| {
let cap = item.max_streams().unwrap_or(self.inner.max_streams) >> 1;
item.active_streams() <= cap
});
if let Some(client) = client {
(Some(client.clone()), num)
} else {
// check existing connections
let available = connections.iter().filter(|item| item.is_ready()).count();
let client = if available > 0 {
let idx = WyRand::new().generate_range(0_usize..available);
connections
.iter()
.filter(|item| item.is_ready())
.nth(idx)
.cloned()
} else {
None
};

(client, num)
}
}
};
let (client, num) = self.get_client();

if let Some(client) = client {
return client
Expand All @@ -123,47 +82,10 @@ impl Client {
{
// create new connection
self.inner.connecting.set(true);
let (tx, rx) = oneshot::channel();
let inner = self.inner.clone();
let waiters = self.waiters.clone();
let _ = ntex_util::spawn(async move {
let res = match timeout_checked(inner.conn_timeout, (*inner.connector)()).await
{
Ok(Ok(io)) => {
// callbacks for end of stream
let waiters2 = waiters.clone();
let storage = InflightStorage::new(move |_| {
notify(&mut waiters2.borrow_mut());
});
// construct client
let client = SimpleClient::with_params(
io,
inner.config.clone(),
inner.scheme.clone(),
inner.authority.clone(),
storage,
);
inner.connections.borrow_mut().push(client.clone());
inner
.total_connections
.set(inner.total_connections.get() + 1);
Ok(client)
}
Ok(Err(err)) => Err(ClientError::from(err)),
Err(_) => Err(ClientError::HandshakeTimeout),
};
inner.connecting.set(false);
for waiter in waiters.borrow_mut().drain(..) {
let _ = waiter.send(());
}

if res.is_err() {
inner.connect_errors.set(inner.connect_errors.get() + 1);
}
let _ = tx.send(res);
});
return rx
.await??
return self
.create_connection()
.await?
.send(method, path, headers, eof)
.await
.map_err(From::from);
Expand All @@ -180,6 +102,100 @@ impl Client {
}
}

fn get_client(&self) -> (Option<SimpleClient>, usize) {
let mut connections = self.inner.connections.borrow_mut();

// cleanup connections
let mut idx = 0;
while idx < connections.len() {
if connections[idx].is_closed() {
connections.remove(idx);
} else if connections[idx].is_disconnecting() {
let con = connections.remove(idx);
let timeout = self.inner.disconnect_timeout;
ntex_util::spawn(async move {
let _ = con.disconnect().disconnect_timeout(timeout).await;
});
} else {
idx += 1;
}
}
let num = connections.len();
if self.inner.minconn > 0 && num < self.inner.minconn {
// create new connection
(None, num)
} else {
// first search for connections with less than 50% capacity usage
let client = connections.iter().find(|item| {
let cap = item.max_streams().unwrap_or(self.inner.max_streams) >> 1;
item.active_streams() <= cap
});
if let Some(client) = client {
(Some(client.clone()), num)
} else {
// check existing connections
let available = connections.iter().filter(|item| item.is_ready()).count();
let client = if available > 0 {
let idx = WyRand::new().generate_range(0_usize..available);
connections
.iter()
.filter(|item| item.is_ready())
.nth(idx)
.cloned()
} else {
None
};

(client, num)
}
}
}

async fn create_connection(&self) -> Result<SimpleClient, ClientError> {
let (tx, rx) = oneshot::channel();

let inner = self.inner.clone();
let waiters = self.waiters.clone();

let _ = ntex_util::spawn(async move {
let res = match timeout_checked(inner.conn_timeout, (*inner.connector)()).await {
Ok(Ok(io)) => {
// callbacks for end of stream
let waiters2 = waiters.clone();
let storage = InflightStorage::new(move |_| {
notify(&mut waiters2.borrow_mut());
});
// construct client
let client = SimpleClient::with_params(
io,
inner.config.clone(),
inner.scheme.clone(),
inner.authority.clone(),
storage,
);
inner.connections.borrow_mut().push(client.clone());
inner
.total_connections
.set(inner.total_connections.get() + 1);
Ok(client)
}
Ok(Err(err)) => Err(ClientError::from(err)),
Err(_) => Err(ClientError::HandshakeTimeout),
};
inner.connecting.set(false);
for waiter in waiters.borrow_mut().drain(..) {
let _ = waiter.send(());
}

if res.is_err() {
inner.connect_errors.set(inner.connect_errors.get() + 1);
}
let _ = tx.send(res);
});

rx.await?
}

#[inline]
/// Check if client is allowed to send new request
///
Expand Down Expand Up @@ -289,7 +305,7 @@ impl ClientBuilder {
connector,
conn_timeout: Millis(1_000),
conn_lifetime: Duration::from_secs(0),
disconnect_timeout: Millis(3_000),
disconnect_timeout: Millis(15_000),
max_streams: 100,
minconn: 1,
maxconn: 16,
Expand Down Expand Up @@ -365,14 +381,15 @@ impl ClientBuilder {
self
}

/// Set server connection disconnect timeout.
/// Set client connection disconnect timeout.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// Defines a timeout for disconnect connection. Disconnecting connection
/// involes closing all active streams. If a disconnect procedure does not complete
/// within this time, the socket get dropped.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3 seconds.
/// By default disconnect timeout is set to 15 seconds.
pub fn disconnect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.0.disconnect_timeout = timeout.into();
self
Expand Down
70 changes: 69 additions & 1 deletion src/client/simple.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{fmt, rc::Rc};
use std::{fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};

use ntex_bytes::ByteString;
use ntex_http::{uri::Scheme, HeaderMap, Method};
use ntex_io::{Dispatcher as IoDispatcher, IoBoxed, IoRef, OnDisconnect};
use ntex_util::time::{Millis, Sleep};

use crate::connection::Connection;
use crate::default::DefaultControlService;
Expand Down Expand Up @@ -124,12 +125,26 @@ impl SimpleClient {
self.0.con.close()
}

#[inline]
/// Gracefully disconnect connection
///
/// Connection force closes if `ClientDisconnect` get dropped
pub fn disconnect(&self) -> ClientDisconnect {
ClientDisconnect::new(self.clone())
}

#[inline]
/// Check if connection is closed
pub fn is_closed(&self) -> bool {
self.0.con.is_closed()
}

#[inline]
/// Check if connection is disconnecting
pub fn is_disconnecting(&self) -> bool {
self.0.con.is_disconnecting()
}

#[inline]
/// Notify when connection get closed
pub fn on_disconnect(&self) -> OnDisconnect {
Expand Down Expand Up @@ -188,3 +203,56 @@ impl fmt::Debug for SimpleClient {
.finish()
}
}

#[derive(Debug)]
pub struct ClientDisconnect {
client: SimpleClient,
disconnect: OnDisconnect,
timeout: Option<Sleep>,
}

impl ClientDisconnect {
fn new(client: SimpleClient) -> Self {
log::debug!("Disconnecting client");

client.0.con.disconnect_when_ready();
ClientDisconnect {
disconnect: client.on_disconnect(),
timeout: None,
client,
}
}

pub fn disconnect_timeout<T>(mut self, timeout: T) -> Self
where
Millis: From<T>,
{
self.timeout = Some(Sleep::new(timeout.into()));
self
}
}

impl Drop for ClientDisconnect {
fn drop(&mut self) {
self.client.0.con.close();
}
}

impl Future for ClientDisconnect {
type Output = Result<(), OperationError>;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();

if Pin::new(&mut this.disconnect).poll(cx).is_ready() {
return Poll::Ready(this.client.0.con.check_error());
} else if let Some(ref mut sleep) = this.timeout {
if sleep.poll_elapsed(cx).is_ready() {
this.client.0.con.close();
return Poll::Ready(Err(OperationError::Disconnected));
}
}
Poll::Pending
}
}
20 changes: 20 additions & 0 deletions src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl Drop for SendStream {
fn drop(&mut self) {
if !self.0.send_state().is_closed() {
self.0.reset(Reason::CANCEL);

if self.0.is_disconnect_on_drop() {
self.0 .0.con.disconnect_when_ready();
}
}
}
}
Expand Down Expand Up @@ -120,6 +124,12 @@ impl SendStream {
self.0.reset(reason)
}

#[inline]
/// Disconnect connection on stream drop
pub fn disconnect_on_drop(&self) {
self.0.disconnect_on_drop()
}

#[inline]
/// Check for available send capacity
pub fn poll_send_capacity(&self, cx: &Context<'_>) -> Poll<Result<WindowSize, OperationError>> {
Expand Down Expand Up @@ -155,6 +165,12 @@ impl RecvStream {
&self.0
}

#[inline]
/// Disconnect connection on stream drop
pub fn disconnect_on_drop(&self) {
self.0.disconnect_on_drop()
}

/// Attempt to pull out the next value of http/2 stream
pub async fn recv(&self) -> Option<Message> {
poll_fn(|cx| self.poll_recv(cx)).await
Expand Down Expand Up @@ -184,6 +200,10 @@ impl Drop for RecvStream {
fn drop(&mut self) {
if !self.0.recv_state().is_closed() {
self.0.reset(Reason::CANCEL);

if self.0.is_disconnect_on_drop() {
self.0 .0.con.disconnect_when_ready();
}
}
self.1 .0.inflight.borrow_mut().remove(&self.0.id());
}
Expand Down
Loading

0 comments on commit 114afac

Please sign in to comment.