Skip to content

Commit

Permalink
Use Body::into_readable_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
kflansburg committed Jan 23, 2024
1 parent fb27810 commit d862599
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 67 deletions.
2 changes: 1 addition & 1 deletion worker-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ worker-sys = { path = "../worker-sys", version = "0.0.9" }
syn = "2.0.17"
proc-macro2 = "1.0.60"
quote = "1.0.28"
wasm-bindgen = "=0.2.86"
wasm-bindgen = {workspace=true}
wasm-bindgen-futures = "0.4.36"
wasm-bindgen-macro-support = "0.2.86"

Expand Down
1 change: 1 addition & 0 deletions worker/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod wasm;

pub use body::Body;
pub(crate) use body::BodyInner;
pub(crate) use body::BoxBodyReader;
pub use http_body::Body as HttpBody;
pub use to_bytes::to_bytes;

Expand Down
103 changes: 70 additions & 33 deletions worker/src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,16 @@ use std::{
};

use bytes::Bytes;
use futures_util::Stream;
use futures_util::{AsyncRead, Stream};
use http::HeaderMap;
use js_sys::{ArrayBuffer, Promise, Uint8Array};
use serde::de::DeserializeOwned;
use wasm_bindgen::{prelude::wasm_bindgen, JsCast, JsValue};

use crate::console_log;
use crate::{
body::{wasm::WasmStreamBody, HttpBody},
futures::SendJsFuture,
Error,
};

// FIXME(zeb): this is a really disgusting hack that has to clone the bytes inside of the stream
// because the array buffer backing them gets detached.
#[wasm_bindgen(module = "/js/hacks.js")]
extern "C" {
#[wasm_bindgen(js_name = "collectBytes", catch)]
fn collect_bytes(
stream: &JsValue,
) -> Result<Promise, JsValue>;
}

type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, Error>;

fn try_downcast<T, K>(k: K) -> Result<T, K>
Expand Down Expand Up @@ -116,25 +104,6 @@ impl Body {
// performance as there's no polling overhead.
match self.0 {
BodyInner::Regular(body) => super::to_bytes(body).await,
/*
Working but clones all body
BodyInner::Request(req) if req.body().is_some() => {
// let body = req.body().unwrap();
let promise = collect_bytes(&req.unchecked_into())?;
let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into();
let bytes = Uint8Array::new(&bytes);
Ok(bytes.to_vec().into())
}
BodyInner::Response(res) if res.body().is_some() => {
let promise = collect_bytes(&res.unchecked_into())?;
let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into();
let bytes = Uint8Array::new(&bytes);
Ok(bytes.to_vec().into())
}
*/
// Failing
BodyInner::Request(req) => Ok(array_buffer_to_bytes(req.array_buffer()).await),
BodyInner::Response(res) => Ok(array_buffer_to_bytes(res.array_buffer()).await),
_ => Ok(Bytes::new()),
Expand Down Expand Up @@ -197,6 +166,10 @@ impl Body {
&self.0
}

pub(crate) fn into_inner(self) -> BodyInner {
self.0
}

/// Turns the body into a regular streaming body, if it's not already, and returns the underlying body.
fn as_inner_box_body(&mut self) -> Option<&mut BoxBody> {
match &self.0 {
Expand All @@ -211,6 +184,20 @@ impl Body {
_ => unreachable!(),
}
}

pub(crate) fn into_readable_stream(self) -> Option<web_sys::ReadableStream> {
match self.into_inner() {
crate::body::BodyInner::Request(req) => req.body(),
crate::body::BodyInner::Response(res) => res.body(),
crate::body::BodyInner::Regular(s) => {
Some(
wasm_streams::ReadableStream::from_async_read(crate::body::BoxBodyReader::new(s), 1024).into_raw()
)
}
crate::body::BodyInner::None => None,
_ => panic!("unexpected body inner"),
}
}
}

impl Default for Body {
Expand Down Expand Up @@ -313,6 +300,56 @@ impl HttpBody for Body {
}
}

pub struct BoxBodyReader {
inner: BoxBody,
store: Vec<u8>
}

impl BoxBodyReader {
pub fn new(inner: BoxBody) -> Self {
BoxBodyReader {
inner,
store: Vec::new()
}
}
}

impl AsyncRead for BoxBodyReader {
// Required method
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8]
) -> Poll<Result<usize, std::io::Error>> {
if self.store.len() > 0 {
let size = self.store.len().min(buf.len());
buf[..size].clone_from_slice(&self.store[..size]);
self.store = self.store.split_off(size);
Poll::Ready(Ok(size))
} else {
match Pin::new(&mut self.inner).poll_data(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(opt) => match opt {
Some(result) => match result {
Ok(data) => {
use bytes::Buf;
self.store.extend_from_slice(data.chunk());
let size = self.store.len().min(buf.len());
buf[..size].clone_from_slice(&self.store[..size]);
self.store = self.store.split_off(size);
Poll::Ready(Ok(size))
},
Err(e) => {
Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e)))
}
},
None => Poll::Ready(Ok(0)) // Not sure about this
}
}
}
}
}

impl Stream for Body {
type Item = Result<Bytes, Error>;

Expand Down
18 changes: 2 additions & 16 deletions worker/src/http/request.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! Functions for translating requests to and from JS
use bytes::Buf;
use futures_util::StreamExt;
use js_sys::Uint8Array;
use wasm_bindgen::JsCast;
use worker_sys::console_log;
use worker_sys::ext::{HeadersExt, RequestExt};
Expand Down Expand Up @@ -131,19 +128,8 @@ pub fn into_wasm(mut req: http::Request<Body>) -> web_sys::Request {
let _ = r;
}

let body = req.into_body();
let body = if body.is_none() {
None
} else {
let stream = wasm_streams::ReadableStream::from_stream(body.map(|chunk| {
chunk
.map(|buf| js_sys::Uint8Array::from(buf.chunk()).into())
.map_err(|_| wasm_bindgen::JsValue::NULL)
}));

Some(stream.into_raw().unchecked_into())
};
init.body(body.as_ref());
let s = req.into_body().into_readable_stream();
init.body(s.map(|s| s.into()).as_ref());

web_sys::Request::new_with_str_and_init(&uri, &init).unwrap()
}
20 changes: 3 additions & 17 deletions worker/src/http/response.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
//! Functions for translating responses to and from JS
use bytes::Buf;
use futures_util::StreamExt;
use wasm_bindgen::JsCast;
use worker_sys::ext::{HeadersExt, ResponseExt, ResponseInitExt};

use crate::WebSocket;

use crate::body::Body;
use crate::WebSocket;

/// Create a [`http::Response`] from a [`web_sys::Response`].
///
Expand Down Expand Up @@ -87,18 +84,7 @@ pub fn into_wasm(mut res: http::Response<Body>) -> web_sys::Response {
init.websocket(ws.as_ref());
}

let body = res.into_body();
let body = if body.is_none() {
None
} else {
let stream = wasm_streams::ReadableStream::from_stream(body.map(|chunk| {
chunk
.map(|buf| js_sys::Uint8Array::from(buf.chunk()).into())
.map_err(|_| wasm_bindgen::JsValue::NULL)
}));

Some(stream.into_raw().unchecked_into())
};
let s = res.into_body().into_readable_stream();

web_sys::Response::new_with_opt_readable_stream_and_init(body.as_ref(), &init).unwrap()
web_sys::Response::new_with_opt_readable_stream_and_init(s.as_ref(), &init).unwrap()
}

0 comments on commit d862599

Please sign in to comment.