Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with Request.clone in http types #440

Merged
merged 7 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ lto = true
[profile.release.package."*"]
codegen-units = 1
opt-level = "z"

[workspace.dependencies]
wasm-bindgen = "=0.2.87"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be easier to set this at the workspace level.

2 changes: 1 addition & 1 deletion worker-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ worker-sys = { path = "../worker-sys", version = "0.0.9" }
syn = "2.0.17"
proc-macro2 = "1.0.60"
quote = "1.0.28"
wasm-bindgen = "=0.2.86"
wasm-bindgen = {workspace=true}
wasm-bindgen-futures = "0.4.36"
wasm-bindgen-macro-support = "0.2.86"

Expand Down
6 changes: 5 additions & 1 deletion worker-sandbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use std::{
use router_service::unsync::Router;
use serde::{Deserialize, Serialize};
use tower::Service;
use worker::{body::Body, http::{Response, HttpClone, RequestRedirect}, *};
use worker::{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran cargo fmt

body::Body,
http::{HttpClone, RequestRedirect, Response},
*,
};

mod alarm;
mod counter;
Expand Down
2 changes: 1 addition & 1 deletion worker-sys/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Low-level extern definitions / FFI bindings to the Cloudflare Wor
[dependencies]
cfg-if = "1.0.0"
js-sys = "0.3.63"
wasm-bindgen = "=0.2.86"
wasm-bindgen = {workspace=true}

[dependencies.web-sys]
version = "0.3.63"
Expand Down
4 changes: 2 additions & 2 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ serde = { version = "1.0.164", features = ["derive"] }
serde_json = "1.0.96"
tokio = { version = "1.28", default-features = false }
url = "2.4.0"
wasm-bindgen = "=0.2.86"
wasm-bindgen = {workspace=true}
wasm-bindgen-futures = "0.4.36"
serde-wasm-bindgen = "0.5.0"
wasm-streams = "0.3.0"
wasm-streams = "0.4.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to be bumped due to incompatibility in ReadableStream types in the prior version (now re-exports web_sys types).

worker-kv = "0.6.0"
worker-macros = { path = "../worker-macros", version = "0.0.9" }
worker-sys = { path = "../worker-sys", version = "0.0.9" }
Expand Down
1 change: 1 addition & 0 deletions worker/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod wasm;

pub use body::Body;
pub(crate) use body::BodyInner;
pub(crate) use body::BoxBodyReader;
pub use http_body::Body as HttpBody;
pub use to_bytes::to_bytes;

Expand Down
111 changes: 75 additions & 36 deletions worker/src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,16 @@
task::{Context, Poll},
};

use bytes::Bytes;
use futures_util::Stream;
use http::HeaderMap;
use js_sys::{ArrayBuffer, Promise, Uint8Array};
use serde::de::DeserializeOwned;
use wasm_bindgen::{prelude::wasm_bindgen, JsCast, JsValue};

use crate::console_log;

Check warning on line 6 in worker/src/body/body.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `crate::console_log`

Check warning on line 6 in worker/src/body/body.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `crate::console_log`
use crate::{
body::{wasm::WasmStreamBody, HttpBody},
futures::SendJsFuture,
Error,
};

// FIXME(zeb): this is a really disgusting hack that has to clone the bytes inside of the stream
// because the array buffer backing them gets detached.
#[wasm_bindgen(module = "/js/hacks.js")]
extern "C" {
#[wasm_bindgen(js_name = "collectBytes", catch)]
fn collect_bytes(
stream: &JsValue,
) -> Result<Promise, JsValue>;
}
use bytes::Bytes;
use futures_util::{AsyncRead, Stream};
use http::HeaderMap;
use serde::de::DeserializeOwned;

type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, Error>;

Expand Down Expand Up @@ -116,25 +104,6 @@
// performance as there's no polling overhead.
match self.0 {
BodyInner::Regular(body) => super::to_bytes(body).await,
/*
Working but clones all body
BodyInner::Request(req) if req.body().is_some() => {
// let body = req.body().unwrap();
let promise = collect_bytes(&req.unchecked_into())?;
let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into();
let bytes = Uint8Array::new(&bytes);

Ok(bytes.to_vec().into())
}
BodyInner::Response(res) if res.body().is_some() => {
let promise = collect_bytes(&res.unchecked_into())?;
let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into();
let bytes = Uint8Array::new(&bytes);

Ok(bytes.to_vec().into())
}
*/
// Failing
BodyInner::Request(req) => Ok(array_buffer_to_bytes(req.array_buffer()).await),
BodyInner::Response(res) => Ok(array_buffer_to_bytes(res.array_buffer()).await),
_ => Ok(Bytes::new()),
Expand Down Expand Up @@ -184,7 +153,7 @@
.and_then(|buf| serde_json::from_slice(&buf).map_err(Error::SerdeJsonError))
}

pub(crate) fn is_none(&self) -> bool {

Check warning on line 156 in worker/src/body/body.rs

View workflow job for this annotation

GitHub Actions / Test

method `is_none` is never used

Check warning on line 156 in worker/src/body/body.rs

View workflow job for this annotation

GitHub Actions / Test

method `is_none` is never used
match &self.0 {
BodyInner::None => true,
BodyInner::Regular(_) => false,
Expand All @@ -197,6 +166,10 @@
&self.0
}

pub(crate) fn into_inner(self) -> BodyInner {
self.0
}

/// Turns the body into a regular streaming body, if it's not already, and returns the underlying body.
fn as_inner_box_body(&mut self) -> Option<&mut BoxBody> {
match &self.0 {
Expand All @@ -211,6 +184,22 @@
_ => unreachable!(),
}
}

pub(crate) fn into_readable_stream(self) -> Option<web_sys::ReadableStream> {
match self.into_inner() {
crate::body::BodyInner::Request(req) => req.body(),
crate::body::BodyInner::Response(res) => res.body(),
crate::body::BodyInner::Regular(s) => Some(
wasm_streams::ReadableStream::from_async_read(
crate::body::BoxBodyReader::new(s),
1024,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idk what to set this default buffer size to

)
.into_raw(),
),
crate::body::BodyInner::None => None,
_ => panic!("unexpected body inner"),

Check warning on line 200 in worker/src/body/body.rs

View workflow job for this annotation

GitHub Actions / Test

unreachable pattern

Check warning on line 200 in worker/src/body/body.rs

View workflow job for this annotation

GitHub Actions / Test

unreachable pattern
}
}
}

impl Default for Body {
Expand Down Expand Up @@ -313,6 +302,56 @@
}
}

pub struct BoxBodyReader {
inner: BoxBody,
store: Vec<u8>,
}

impl BoxBodyReader {
pub fn new(inner: BoxBody) -> Self {
BoxBodyReader {
inner,
store: Vec::new(),
}
}
}

impl AsyncRead for BoxBodyReader {
// Required method
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, std::io::Error>> {
if self.store.len() > 0 {
let size = self.store.len().min(buf.len());
buf[..size].clone_from_slice(&self.store[..size]);
self.store = self.store.split_off(size);
Poll::Ready(Ok(size))
} else {
match Pin::new(&mut self.inner).poll_data(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(opt) => match opt {
Some(result) => match result {
Ok(data) => {
use bytes::Buf;
self.store.extend_from_slice(data.chunk());
let size = self.store.len().min(buf.len());
buf[..size].clone_from_slice(&self.store[..size]);
self.store = self.store.split_off(size);
Poll::Ready(Ok(size))
}
Err(e) => {
Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e)))
}
},
None => Poll::Ready(Ok(0)), // Not sure about this
},
}
}
}
}

impl Stream for Body {
type Item = Result<Bytes, Error>;

Expand Down
Loading
Loading