Skip to content

Commit

Permalink
Add Client::client() method, returns available client from the pool (#47
Browse files Browse the repository at this point in the history
)
  • Loading branch information
fafhrd91 authored Jan 31, 2025
1 parent 114afac commit 2098be0
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.8.0] - 2025-01-31

* Add Client::client() method, returns available client from the pool

## [1.7.0] - 2025-01-30

* Add disconnect on drop request for client
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-h2"
version = "1.7.0"
version = "1.8.0"
license = "MIT OR Apache-2.0"
authors = ["Nikolay Kim <[email protected]>"]
description = "An HTTP/2 client and server"
Expand Down
69 changes: 37 additions & 32 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl Client {
ClientBuilder::with_default(addr)
}

#[inline]
/// Send request to the peer
pub async fn send(
&self,
Expand All @@ -65,41 +64,46 @@ impl Client {
headers: HeaderMap,
eof: bool,
) -> Result<(SendStream, RecvStream), ClientError> {
self.client()
.await?
.send(method, path, headers, eof)
.await
.map_err(From::from)
}

/// Get client from the pool
pub async fn client(&self) -> Result<SimpleClient, ClientError> {
loop {
let (client, num) = self.get_client();

if let Some(client) = client {
return client
.send(method, path, headers, eof)
.await
.map_err(From::from);
return Ok(client);
} else {
self.connect(num).await?;
}
}
}

// can create new connection
if !self.inner.connecting.get()
&& (num < self.inner.maxconn
|| (self.inner.minconn > 0 && num < self.inner.minconn))
{
// create new connection
self.inner.connecting.set(true);

return self
.create_connection()
.await?
.send(method, path, headers, eof)
.await
.map_err(From::from);
} else {
log::debug!(
"New connection is being established {:?} or number of existing cons {} greater than allowed {}",
self.inner.connecting.get(), num, self.inner.maxconn);
async fn connect(&self, num: usize) -> Result<(), ClientError> {
// can create new connection
if !self.inner.connecting.get()
&& (num < self.inner.maxconn || (self.inner.minconn > 0 && num < self.inner.minconn))
{
// create new connection
self.inner.connecting.set(true);

// wait for available connection
let (tx, rx) = oneshot::channel();
self.waiters.borrow_mut().push_back(tx);
let _ = rx.await;
}
self.create_connection().await?;
} else {
log::debug!(
"New connection is being established {:?} or number of existing cons {} greater than allowed {}",
self.inner.connecting.get(), num, self.inner.maxconn);

// wait for available connection
let (tx, rx) = oneshot::channel();
self.waiters.borrow_mut().push_back(tx);
let _ = rx.await?;
}
Ok(())
}

fn get_client(&self) -> (Option<SimpleClient>, usize) {
Expand All @@ -113,9 +117,10 @@ impl Client {
} else if connections[idx].is_disconnecting() {
let con = connections.remove(idx);
let timeout = self.inner.disconnect_timeout;
ntex_util::spawn(async move {
let f = ntex_util::spawn(async move {
let _ = con.disconnect().disconnect_timeout(timeout).await;
});
drop(f);
} else {
idx += 1;
}
Expand Down Expand Up @@ -151,7 +156,7 @@ impl Client {
}
}

async fn create_connection(&self) -> Result<SimpleClient, ClientError> {
async fn create_connection(&self) -> Result<(), ClientError> {
let (tx, rx) = oneshot::channel();

let inner = self.inner.clone();
Expand All @@ -173,11 +178,11 @@ impl Client {
inner.authority.clone(),
storage,
);
inner.connections.borrow_mut().push(client.clone());
inner.connections.borrow_mut().push(client);
inner
.total_connections
.set(inner.total_connections.get() + 1);
Ok(client)
Ok(())
}
Ok(Err(err)) => Err(ClientError::from(err)),
Err(_) => Err(ClientError::HandshakeTimeout),
Expand Down
3 changes: 2 additions & 1 deletion src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,12 @@ where
if let Err(e) = self.inner.publish.poll(cx) {
let inner = self.inner.clone();
let con = self.connection.connection();
ntex_util::spawn(async move {
let f = ntex_util::spawn(async move {
if inner.control.call_nowait(Control::error(e)).await.is_ok() {
con.close();
}
});
drop(f);
}
self.inner.control.poll(cx).map_err(|_| ())
}
Expand Down

0 comments on commit 2098be0

Please sign in to comment.