From d862599a38414a9dcaf03571413802e1099a333b Mon Sep 17 00:00:00 2001 From: Kevin Flansburg Date: Tue, 23 Jan 2024 23:53:01 +0000 Subject: [PATCH] Use Body::into_readable_stream --- worker-macros/Cargo.toml | 2 +- worker/src/body.rs | 1 + worker/src/body/body.rs | 103 ++++++++++++++++++++++++------------ worker/src/http/request.rs | 18 +------ worker/src/http/response.rs | 20 ++----- 5 files changed, 77 insertions(+), 67 deletions(-) diff --git a/worker-macros/Cargo.toml b/worker-macros/Cargo.toml index f0aeca2d..beb9e99f 100644 --- a/worker-macros/Cargo.toml +++ b/worker-macros/Cargo.toml @@ -17,7 +17,7 @@ worker-sys = { path = "../worker-sys", version = "0.0.9" } syn = "2.0.17" proc-macro2 = "1.0.60" quote = "1.0.28" -wasm-bindgen = "=0.2.86" +wasm-bindgen = {workspace=true} wasm-bindgen-futures = "0.4.36" wasm-bindgen-macro-support = "0.2.86" diff --git a/worker/src/body.rs b/worker/src/body.rs index 7b78db84..ace962eb 100644 --- a/worker/src/body.rs +++ b/worker/src/body.rs @@ -7,6 +7,7 @@ mod wasm; pub use body::Body; pub(crate) use body::BodyInner; +pub(crate) use body::BoxBodyReader; pub use http_body::Body as HttpBody; pub use to_bytes::to_bytes; diff --git a/worker/src/body/body.rs b/worker/src/body/body.rs index c74ed63e..fc675974 100644 --- a/worker/src/body/body.rs +++ b/worker/src/body/body.rs @@ -4,28 +4,16 @@ use std::{ }; use bytes::Bytes; -use futures_util::Stream; +use futures_util::{AsyncRead, Stream}; use http::HeaderMap; -use js_sys::{ArrayBuffer, Promise, Uint8Array}; use serde::de::DeserializeOwned; -use wasm_bindgen::{prelude::wasm_bindgen, JsCast, JsValue}; - +use crate::console_log; use crate::{ body::{wasm::WasmStreamBody, HttpBody}, futures::SendJsFuture, Error, }; -// FIXME(zeb): this is a really disgusting hack that has to clone the bytes inside of the stream -// because the array buffer backing them gets detached. -#[wasm_bindgen(module = "/js/hacks.js")] -extern "C" { - #[wasm_bindgen(js_name = "collectBytes", catch)] - fn collect_bytes( - stream: &JsValue, - ) -> Result; -} - type BoxBody = http_body::combinators::UnsyncBoxBody; fn try_downcast(k: K) -> Result @@ -116,25 +104,6 @@ impl Body { // performance as there's no polling overhead. match self.0 { BodyInner::Regular(body) => super::to_bytes(body).await, - /* - Working but clones all body - BodyInner::Request(req) if req.body().is_some() => { - // let body = req.body().unwrap(); - let promise = collect_bytes(&req.unchecked_into())?; - let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into(); - let bytes = Uint8Array::new(&bytes); - - Ok(bytes.to_vec().into()) - } - BodyInner::Response(res) if res.body().is_some() => { - let promise = collect_bytes(&res.unchecked_into())?; - let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into(); - let bytes = Uint8Array::new(&bytes); - - Ok(bytes.to_vec().into()) - } - */ - // Failing BodyInner::Request(req) => Ok(array_buffer_to_bytes(req.array_buffer()).await), BodyInner::Response(res) => Ok(array_buffer_to_bytes(res.array_buffer()).await), _ => Ok(Bytes::new()), @@ -197,6 +166,10 @@ impl Body { &self.0 } + pub(crate) fn into_inner(self) -> BodyInner { + self.0 + } + /// Turns the body into a regular streaming body, if it's not already, and returns the underlying body. fn as_inner_box_body(&mut self) -> Option<&mut BoxBody> { match &self.0 { @@ -211,6 +184,20 @@ impl Body { _ => unreachable!(), } } + + pub(crate) fn into_readable_stream(self) -> Option { + match self.into_inner() { + crate::body::BodyInner::Request(req) => req.body(), + crate::body::BodyInner::Response(res) => res.body(), + crate::body::BodyInner::Regular(s) => { + Some( + wasm_streams::ReadableStream::from_async_read(crate::body::BoxBodyReader::new(s), 1024).into_raw() + ) + } + crate::body::BodyInner::None => None, + _ => panic!("unexpected body inner"), + } + } } impl Default for Body { @@ -313,6 +300,56 @@ impl HttpBody for Body { } } +pub struct BoxBodyReader { + inner: BoxBody, + store: Vec +} + +impl BoxBodyReader { + pub fn new(inner: BoxBody) -> Self { + BoxBodyReader { + inner, + store: Vec::new() + } + } +} + +impl AsyncRead for BoxBodyReader { + // Required method + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8] + ) -> Poll> { + if self.store.len() > 0 { + let size = self.store.len().min(buf.len()); + buf[..size].clone_from_slice(&self.store[..size]); + self.store = self.store.split_off(size); + Poll::Ready(Ok(size)) + } else { + match Pin::new(&mut self.inner).poll_data(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(opt) => match opt { + Some(result) => match result { + Ok(data) => { + use bytes::Buf; + self.store.extend_from_slice(data.chunk()); + let size = self.store.len().min(buf.len()); + buf[..size].clone_from_slice(&self.store[..size]); + self.store = self.store.split_off(size); + Poll::Ready(Ok(size)) + }, + Err(e) => { + Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))) + } + }, + None => Poll::Ready(Ok(0)) // Not sure about this + } + } + } + } +} + impl Stream for Body { type Item = Result; diff --git a/worker/src/http/request.rs b/worker/src/http/request.rs index fefab465..0552f174 100644 --- a/worker/src/http/request.rs +++ b/worker/src/http/request.rs @@ -1,8 +1,5 @@ //! Functions for translating requests to and from JS -use bytes::Buf; -use futures_util::StreamExt; -use js_sys::Uint8Array; use wasm_bindgen::JsCast; use worker_sys::console_log; use worker_sys::ext::{HeadersExt, RequestExt}; @@ -131,19 +128,8 @@ pub fn into_wasm(mut req: http::Request) -> web_sys::Request { let _ = r; } - let body = req.into_body(); - let body = if body.is_none() { - None - } else { - let stream = wasm_streams::ReadableStream::from_stream(body.map(|chunk| { - chunk - .map(|buf| js_sys::Uint8Array::from(buf.chunk()).into()) - .map_err(|_| wasm_bindgen::JsValue::NULL) - })); - - Some(stream.into_raw().unchecked_into()) - }; - init.body(body.as_ref()); + let s = req.into_body().into_readable_stream(); + init.body(s.map(|s| s.into()).as_ref()); web_sys::Request::new_with_str_and_init(&uri, &init).unwrap() } diff --git a/worker/src/http/response.rs b/worker/src/http/response.rs index 81497a8b..802ceca0 100644 --- a/worker/src/http/response.rs +++ b/worker/src/http/response.rs @@ -1,13 +1,10 @@ //! Functions for translating responses to and from JS -use bytes::Buf; -use futures_util::StreamExt; use wasm_bindgen::JsCast; use worker_sys::ext::{HeadersExt, ResponseExt, ResponseInitExt}; -use crate::WebSocket; - use crate::body::Body; +use crate::WebSocket; /// Create a [`http::Response`] from a [`web_sys::Response`]. /// @@ -87,18 +84,7 @@ pub fn into_wasm(mut res: http::Response) -> web_sys::Response { init.websocket(ws.as_ref()); } - let body = res.into_body(); - let body = if body.is_none() { - None - } else { - let stream = wasm_streams::ReadableStream::from_stream(body.map(|chunk| { - chunk - .map(|buf| js_sys::Uint8Array::from(buf.chunk()).into()) - .map_err(|_| wasm_bindgen::JsValue::NULL) - })); - - Some(stream.into_raw().unchecked_into()) - }; + let s = res.into_body().into_readable_stream(); - web_sys::Response::new_with_opt_readable_stream_and_init(body.as_ref(), &init).unwrap() + web_sys::Response::new_with_opt_readable_stream_and_init(s.as_ref(), &init).unwrap() }