Skip to content

Commit

Permalink
Better error handling (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Oct 16, 2024
1 parent 0c4c74d commit 6592063
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 66 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,17 @@ jobs:
if: matrix.rust == 'nightly' || matrix.rust == 'beta'
with:
command: build
args: --features=ntex-net/tokio

- name: Run lib tests and doc tests
uses: actions-rs/cargo@v1
if: matrix.rust == 'nightly' || matrix.rust == 'beta'
with:
command: test
args: --features=ntex-net/tokio

- name: Generate code coverage
run: cargo llvm-cov --no-report
run: cargo llvm-cov --no-report --features=ntex-net/tokio
if: matrix.rust == 'stable'

#- name: Run integration tests
Expand Down Expand Up @@ -130,3 +132,4 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
args: --features=ntex-net/tokio
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.2.0] - 2024-10-16

* Better error handling

## [1.1.0] - 2024-08-12

* Server graceful shutdown support
Expand Down
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-h2"
version = "1.1.0"
version = "1.2.0"
license = "MIT OR Apache-2.0"
authors = ["Nikolay Kim <[email protected]>"]
description = "An HTTP/2 client and server"
Expand Down Expand Up @@ -46,9 +46,8 @@ walkdir = "2.3.2"
serde = "1.0"
serde_json = "1.0"

ntex = { version = "2", features = ["openssl", "tokio"] }
ntex = { version = "2", features = ["openssl"] }
ntex-tls = { version = "2", features = ["openssl"] }
ntex-net = { version = "2", features = ["tokio"] }
openssl = "0.10"

# Examples
Expand Down
13 changes: 5 additions & 8 deletions src/client/connector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{cell::Cell, marker::PhantomData, ops};
use std::{marker::PhantomData, ops};

use ntex_bytes::{ByteString, PoolId, PoolRef};
use ntex_bytes::{ByteString, PoolId};
use ntex_http::uri::Scheme;
use ntex_io::IoBoxed;
use ntex_net::connect::{self as connect, Address, Connect, Connector as DefaultConnector};
Expand All @@ -15,7 +15,6 @@ pub struct Connector<A: Address, T> {
connector: Pipeline<T>,
config: Config,
scheme: Scheme,
pub(super) pool: Cell<PoolRef>,

_t: PhantomData<A>,
}
Expand All @@ -35,7 +34,6 @@ where
connector: Pipeline::new(connector.into_service()),
config: Config::client(),
scheme: Scheme::HTTP,
pool: Cell::new(PoolId::P5.pool_ref()),
_t: PhantomData,
}
}
Expand All @@ -51,7 +49,6 @@ where
connector: DefaultConnector::default().into(),
config: Config::client(),
scheme: Scheme::HTTP,
pool: Cell::new(PoolId::P5.pool_ref()),
_t: PhantomData,
}
}
Expand Down Expand Up @@ -82,12 +79,13 @@ where
self
}

#[doc(hidden)]
#[deprecated]
/// Set memory pool.
///
/// Use specified memory pool for memory allocations. By default P5
/// memory pool is used.
pub fn memory_pool(&mut self, id: PoolId) -> &mut Self {
self.pool.set(id.pool_ref());
pub fn memory_pool(&mut self, _: PoolId) -> &mut Self {
self
}

Expand All @@ -102,7 +100,6 @@ where
connector: connector.into_service().into(),
config: self.config.clone(),
scheme: self.scheme.clone(),
pool: self.pool.clone(),
_t: PhantomData,
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/client/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ impl SimpleClient {
}))
}

#[inline]
/// Get io tag
pub fn tag(&self) -> &'static str {
self.0.con.tag()
}

#[inline]
/// Send request to the peer
pub async fn send(
Expand Down
39 changes: 26 additions & 13 deletions src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::{cell::RefCell, collections::VecDeque, fmt, future::poll_fn, pin::Pin,
use ntex_bytes::Bytes;
use ntex_http::HeaderMap;
use ntex_service::{Service, ServiceCtx};
use ntex_util::future::Either;
use ntex_util::{task::LocalWaker, HashMap, Stream as FutStream};
use ntex_util::{future::Either, task::LocalWaker, HashMap, Stream as FutStream};

use crate::error::OperationError;
use crate::frame::{Reason, StreamId, WindowSize};
Expand Down Expand Up @@ -75,21 +74,30 @@ impl Drop for SendStream {

impl SendStream {
#[inline]
/// Get stream id
pub fn id(&self) -> StreamId {
self.0.id()
}

#[inline]
/// Get io tag
pub fn tag(&self) -> &'static str {
self.0.tag()
}

#[inline]
pub fn stream(&self) -> &StreamRef {
&self.0
}

#[inline]
/// Get available capacity
pub fn available_send_capacity(&self) -> WindowSize {
self.0.available_send_capacity()
}

#[inline]
/// Wait for available capacity
pub async fn send_capacity(&self) -> Result<WindowSize, OperationError> {
self.0.send_capacity().await
}
Expand All @@ -101,6 +109,7 @@ impl SendStream {
}

#[inline]
/// Send trailers
pub fn send_trailers(&self, map: HeaderMap) {
self.0.send_trailers(map)
}
Expand Down Expand Up @@ -130,10 +139,17 @@ pub struct RecvStream(StreamRef, InflightStorage);

impl RecvStream {
#[inline]
/// Get stream id
pub fn id(&self) -> StreamId {
self.0.id()
}

#[inline]
/// Get io tag
pub fn tag(&self) -> &'static str {
self.0.tag()
}

#[inline]
pub fn stream(&self) -> &StreamRef {
&self.0
Expand All @@ -148,23 +164,17 @@ impl RecvStream {
/// the current task for wakeup if the value is not yet available,
/// and returning None if the stream is exhausted.
pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<Message>> {
let mut inner = self.1 .0.inflight.borrow_mut();
if let Some(inflight) = inner.get_mut(&self.0.id()) {
if let Some(inflight) = self.1 .0.inflight.borrow_mut().get_mut(&self.0.id()) {
if let Some(msg) = inflight.pop() {
let to_remove = match msg.kind() {
MessageKind::Headers { eof, .. } => *eof,
MessageKind::Eof(..) | MessageKind::Disconnect(..) => true,
_ => false,
};
if to_remove {
inner.remove(&self.0.id());
}
Poll::Ready(Some(msg))
} else if self.0.recv_state().is_closed() {
Poll::Ready(None)
} else {
inflight.waker.register(cx.waker());
Poll::Pending
}
} else {
log::warn!("Stream does not exists, {:?}", self.0.id());
Poll::Ready(None)
}
}
Expand Down Expand Up @@ -207,11 +217,14 @@ impl Service<Message> for HandleService {
MessageKind::Eof(..) | MessageKind::Disconnect(..) => true,
_ => false,
};
inflight.push(msg);

if eof {
self.0.notify(id);
log::debug!("Stream {:?} is closed, notify", id);
}
inflight.push(msg);
} else {
log::error!("Received message for unknown stream, {:?}", msg);
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Config {
let dispatcher_config = DispatcherConfig::default();
dispatcher_config
.set_keepalive_timeout(Seconds(0))
.set_disconnect_timeout(Seconds(3))
.set_disconnect_timeout(Seconds(1))
.set_frame_read_rate(Seconds(1), Seconds::ZERO, 256);

Config(Rc::new(ConfigInner {
Expand Down Expand Up @@ -308,7 +308,7 @@ impl Config {
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 3 seconds.
/// By default disconnect timeout is set to 1 seconds.
pub fn disconnect_timeout(&self, val: Seconds) -> &Self {
self.0.dispatcher_config.set_disconnect_timeout(val);
self
Expand Down
43 changes: 33 additions & 10 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ impl Connection {
&self.0.io
}

pub(crate) fn tag(&self) -> &'static str {
self.0.io.tag()
}

pub(crate) fn codec(&self) -> &Codec {
&self.0.codec
}
Expand Down Expand Up @@ -263,10 +267,13 @@ impl Connection {

pub(crate) fn disconnect_when_ready(&self) {
if self.0.streams.borrow().is_empty() {
log::debug!("All streams are closed, disconnecting");
log::trace!("{}: All streams are closed, disconnecting", self.tag());
self.0.io.close();
} else {
log::debug!("Not all streams are closed, set disconnect flag");
log::trace!(
"{}: Not all streams are closed, set disconnect flag",
self.tag()
);
self.set_flags(ConnectionFlags::DISCONNECT_WHEN_READY);
}
}
Expand All @@ -282,7 +289,10 @@ impl Connection {
self.check_error()?;

if !self.can_create_new_stream() {
log::warn!("Cannot create new stream, waiting for available streams");
log::warn!(
"{}: Cannot create new stream, waiting for available streams",
self.tag()
);
self.ready().await?
}

Expand Down Expand Up @@ -326,7 +336,12 @@ impl Connection {
let empty = {
let mut streams = self.0.streams.borrow_mut();
if let Some(stream) = streams.remove(&id) {
log::trace!("Dropping stream {:?} remote: {:?}", id, stream.is_remote());
log::trace!(
"{}: Dropping stream {:?} remote: {:?}",
self.tag(),
id,
stream.is_remote()
);
if stream.is_remote() {
self.0
.active_remote_streams
Expand All @@ -352,7 +367,7 @@ impl Connection {

// Close connection
if empty && flags.contains(ConnectionFlags::DISCONNECT_WHEN_READY) {
log::debug!("All streams are closed, disconnecting");
log::trace!("{}: All streams are closed, disconnecting", self.tag());
self.0.io.close();
return;
}
Expand All @@ -379,6 +394,10 @@ impl Connection {
}

impl RecvHalfConnection {
pub(crate) fn tag(&self) -> &'static str {
self.0.io.tag()
}

fn query(&self, id: StreamId) -> Option<StreamRef> {
self.0.streams.borrow_mut().get(&id).cloned()
}
Expand Down Expand Up @@ -614,7 +633,7 @@ impl RecvHalfConnection {
&self,
frm: frame::WindowUpdate,
) -> Result<(), Either<ConnectionError, StreamErrorInner>> {
log::trace!("processing incoming {:#?}", frm);
log::trace!("{}: processing incoming {:#?}", self.tag(), frm);

if frm.stream_id().is_zero() {
if frm.size_increment() == 0 {
Expand Down Expand Up @@ -658,7 +677,7 @@ impl RecvHalfConnection {
&self,
frm: frame::Reset,
) -> Result<(), Either<ConnectionError, StreamErrorInner>> {
log::trace!("processing incoming {:#?}", frm);
log::trace!("{}: processing incoming {:#?}", self.tag(), frm);

if frm.stream_id().is_zero() {
Err(Either::Left(ConnectionError::UnknownStream("RST_STREAM")))
Expand Down Expand Up @@ -688,7 +707,8 @@ impl RecvHalfConnection {
data: &Bytes,
) -> HashMap<StreamId, StreamRef> {
log::trace!(
"processing go away with reason: {:?}, data: {:?}",
"{}: processing go away with reason: {:?}, data: {:?}",
self.tag(),
reason,
data.slice(..std::cmp::min(data.len(), 20))
);
Expand Down Expand Up @@ -801,7 +821,7 @@ async fn delay_drop_task(state: Connection) {
loop {
if let Some(item) = queue.front() {
if item.1 <= now {
log::trace!("dropping {:?} after delay", item.0);
log::trace!("{}: dropping {:?} after delay", state.tag(), item.0);
ids.remove(&item.0);
queue.pop_front();
} else {
Expand All @@ -825,7 +845,10 @@ async fn ping(st: Connection, timeout: time::Seconds, io: IoRef) {
st.set_flags(ConnectionFlags::RECV_PONG);
loop {
if st.is_closed() {
log::debug!("http client connection is closed, stopping keep-alive task");
log::trace!(
"{}: http client connection is closed, stopping keep-alive task",
st.tag()
);
break;
}
sleep(keepalive).await;
Expand Down
Loading

0 comments on commit 6592063

Please sign in to comment.