Skip to content

Commit

Permalink
feat: upgrade to hyper 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sigoden committed Dec 16, 2023
1 parent 5988442 commit 167c314
Show file tree
Hide file tree
Showing 11 changed files with 588 additions and 478 deletions.
362 changes: 266 additions & 96 deletions Cargo.lock

Large diffs are not rendered by default.

32 changes: 18 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ clap_complete = "4"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "fs", "io-util", "signal"]}
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
hyper = { version = "0.14", features = ["http1", "server", "tcp", "stream"] }
hyper = { version = "1.0", features = ["http1", "server"] }
percent-encoding = "2.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
async_zip = { version = "0.0.15", default-features = false, features = ["deflate", "bzip2", "xz", "chrono", "tokio"] }
headers = "0.3"
headers = "0.4"
mime_guess = "2.0"
if-addrs = "0.10.1"
rustls = { version = "0.21", default-features = false, features = ["tls12"], optional = true }
rustls-pemfile = { version = "1", optional = true }
tokio-rustls = { version = "0.24", optional = true }
if-addrs = "0.11"
rustls-pemfile = { version = "2.0", optional = true }
tokio-rustls = { version = "0.25", optional = true }
md5 = "0.7"
lazy_static = "1.4"
uuid = { version = "1.4", features = ["v4", "fast-rng"] }
Expand All @@ -42,16 +41,21 @@ alphanumeric-sort = "1.4"
content_inspector = "0.2"
anyhow = "1.0"
chardetng = "0.1"
glob = "0.3.1"
glob = "0.3"
indexmap = "2.0"
serde_yaml = "0.9.27"
sha-crypt = "0.5.0"
base64 = "0.21.5"
smart-default = "0.7.1"
serde_yaml = "0.9"
sha-crypt = "0.5"
base64 = "0.21"
smart-default = "0.7"
rustls-pki-types = "1.0"
hyper-util = { version = "0.1", features = ["server-auto", "tokio"] }
http-body-util = "0.1"
bytes = "1.5"
pin-project-lite = "0.2.13"

[features]
default = ["tls"]
tls = ["rustls", "rustls-pemfile", "tokio-rustls"]
tls = ["rustls-pemfile", "tokio-rustls"]

[dev-dependencies]
assert_cmd = "2"
Expand All @@ -61,7 +65,7 @@ port_check = "0.1"
rstest = "0.18"
regex = "1"
url = "2"
diqwest = { version = "1", features = ["blocking", "rustls-tls"], default-features = false }
diqwest = { version = "2.0", features = ["blocking"], default-features = false }
predicates = "3"

[profile.release]
Expand Down
105 changes: 105 additions & 0 deletions src/http_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use bytes::{Bytes, BytesMut};
use futures_util::Stream;
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::body::{Body, Incoming};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::io::AsyncRead;
use tokio_util::io::poll_read_buf;

#[derive(Debug)]
pub struct IncomingStream {
inner: Incoming,
}

impl IncomingStream {
pub fn new(inner: Incoming) -> Self {
Self { inner }
}
}

impl Stream for IncomingStream {
type Item = Result<Bytes, anyhow::Error>;

#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match futures_util::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
Some(frame) => match frame.into_data() {
Ok(data) => return Poll::Ready(Some(Ok(data))),
Err(_frame) => {}
},
None => return Poll::Ready(None),
}
}
}
}

pin_project_lite::pin_project! {
pub struct LengthLimitedStream<R> {
#[pin]
reader: Option<R>,
remaining: usize,
buf: BytesMut,
capacity: usize,
}
}

impl<R> LengthLimitedStream<R> {
pub fn new(reader: R, limit: usize) -> Self {
Self {
reader: Some(reader),
remaining: limit,
buf: BytesMut::new(),
capacity: 4096,
}
}
}

impl<R: AsyncRead> Stream for LengthLimitedStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();

if *this.remaining == 0 {
self.project().reader.set(None);
return Poll::Ready(None);
}

let reader = match this.reader.as_pin_mut() {
Some(r) => r,
None => return Poll::Ready(None),
};

if this.buf.capacity() == 0 {
this.buf.reserve(*this.capacity);
}

match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Ok(0)) => {
self.project().reader.set(None);
Poll::Ready(None)
}
Poll::Ready(Ok(_)) => {
let mut chunk = this.buf.split();
let chunk_size = (*this.remaining).min(chunk.len());
chunk.truncate(chunk_size);
*this.remaining -= chunk_size;
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
}

pub fn body_full(content: impl Into<hyper::body::Bytes>) -> BoxBody<Bytes, anyhow::Error> {
Full::new(content.into())
.map_err(anyhow::Error::new)
.boxed()
}
Loading

0 comments on commit 167c314

Please sign in to comment.