-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix issues with Request.clone in http types #440
Changes from 3 commits
fb27810
d862599
e7caadc
a655664
a480f94
709caa0
e958265
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,11 @@ use std::{ | |
use router_service::unsync::Router; | ||
use serde::{Deserialize, Serialize}; | ||
use tower::Service; | ||
use worker::{body::Body, http::{Response, HttpClone, RequestRedirect}, *}; | ||
use worker::{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ran cargo fmt |
||
body::Body, | ||
http::{HttpClone, RequestRedirect, Response}, | ||
*, | ||
}; | ||
|
||
mod alarm; | ||
mod counter; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,10 +25,10 @@ serde = { version = "1.0.164", features = ["derive"] } | |
serde_json = "1.0.96" | ||
tokio = { version = "1.28", default-features = false } | ||
url = "2.4.0" | ||
wasm-bindgen = "=0.2.86" | ||
wasm-bindgen = {workspace=true} | ||
wasm-bindgen-futures = "0.4.36" | ||
serde-wasm-bindgen = "0.5.0" | ||
wasm-streams = "0.3.0" | ||
wasm-streams = "0.4.0" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This had to be bumped due to incompatibility in ReadableStream types in the prior version (now re-exports web_sys types). |
||
worker-kv = "0.6.0" | ||
worker-macros = { path = "../worker-macros", version = "0.0.9" } | ||
worker-sys = { path = "../worker-sys", version = "0.0.9" } | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,28 +3,16 @@ | |
task::{Context, Poll}, | ||
}; | ||
|
||
use bytes::Bytes; | ||
use futures_util::Stream; | ||
use http::HeaderMap; | ||
use js_sys::{ArrayBuffer, Promise, Uint8Array}; | ||
use serde::de::DeserializeOwned; | ||
use wasm_bindgen::{prelude::wasm_bindgen, JsCast, JsValue}; | ||
|
||
use crate::console_log; | ||
Check warning on line 6 in worker/src/body/body.rs GitHub Actions / Test
|
||
use crate::{ | ||
body::{wasm::WasmStreamBody, HttpBody}, | ||
futures::SendJsFuture, | ||
Error, | ||
}; | ||
|
||
// FIXME(zeb): this is a really disgusting hack that has to clone the bytes inside of the stream | ||
// because the array buffer backing them gets detached. | ||
#[wasm_bindgen(module = "/js/hacks.js")] | ||
extern "C" { | ||
#[wasm_bindgen(js_name = "collectBytes", catch)] | ||
fn collect_bytes( | ||
stream: &JsValue, | ||
) -> Result<Promise, JsValue>; | ||
} | ||
use bytes::Bytes; | ||
use futures_util::{AsyncRead, Stream}; | ||
use http::HeaderMap; | ||
use serde::de::DeserializeOwned; | ||
|
||
type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, Error>; | ||
|
||
|
@@ -116,25 +104,6 @@ | |
// performance as there's no polling overhead. | ||
match self.0 { | ||
BodyInner::Regular(body) => super::to_bytes(body).await, | ||
/* | ||
Working but clones all body | ||
BodyInner::Request(req) if req.body().is_some() => { | ||
// let body = req.body().unwrap(); | ||
let promise = collect_bytes(&req.unchecked_into())?; | ||
let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into(); | ||
let bytes = Uint8Array::new(&bytes); | ||
|
||
Ok(bytes.to_vec().into()) | ||
} | ||
BodyInner::Response(res) if res.body().is_some() => { | ||
let promise = collect_bytes(&res.unchecked_into())?; | ||
let bytes: ArrayBuffer = SendJsFuture::from(promise).await?.into(); | ||
let bytes = Uint8Array::new(&bytes); | ||
|
||
Ok(bytes.to_vec().into()) | ||
} | ||
*/ | ||
// Failing | ||
BodyInner::Request(req) => Ok(array_buffer_to_bytes(req.array_buffer()).await), | ||
BodyInner::Response(res) => Ok(array_buffer_to_bytes(res.array_buffer()).await), | ||
_ => Ok(Bytes::new()), | ||
|
@@ -184,7 +153,7 @@ | |
.and_then(|buf| serde_json::from_slice(&buf).map_err(Error::SerdeJsonError)) | ||
} | ||
|
||
pub(crate) fn is_none(&self) -> bool { | ||
Check warning on line 156 in worker/src/body/body.rs GitHub Actions / Test
|
||
match &self.0 { | ||
BodyInner::None => true, | ||
BodyInner::Regular(_) => false, | ||
|
@@ -197,6 +166,10 @@ | |
&self.0 | ||
} | ||
|
||
pub(crate) fn into_inner(self) -> BodyInner { | ||
self.0 | ||
} | ||
|
||
/// Turns the body into a regular streaming body, if it's not already, and returns the underlying body. | ||
fn as_inner_box_body(&mut self) -> Option<&mut BoxBody> { | ||
match &self.0 { | ||
|
@@ -211,6 +184,22 @@ | |
_ => unreachable!(), | ||
} | ||
} | ||
|
||
pub(crate) fn into_readable_stream(self) -> Option<web_sys::ReadableStream> { | ||
match self.into_inner() { | ||
crate::body::BodyInner::Request(req) => req.body(), | ||
crate::body::BodyInner::Response(res) => res.body(), | ||
crate::body::BodyInner::Regular(s) => Some( | ||
wasm_streams::ReadableStream::from_async_read( | ||
crate::body::BoxBodyReader::new(s), | ||
1024, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Idk what to set this default buffer size to |
||
) | ||
.into_raw(), | ||
), | ||
crate::body::BodyInner::None => None, | ||
_ => panic!("unexpected body inner"), | ||
Check warning on line 200 in worker/src/body/body.rs GitHub Actions / Test
|
||
} | ||
} | ||
} | ||
|
||
impl Default for Body { | ||
|
@@ -313,6 +302,56 @@ | |
} | ||
} | ||
|
||
pub struct BoxBodyReader { | ||
inner: BoxBody, | ||
store: Vec<u8>, | ||
} | ||
|
||
impl BoxBodyReader { | ||
pub fn new(inner: BoxBody) -> Self { | ||
BoxBodyReader { | ||
inner, | ||
store: Vec::new(), | ||
} | ||
} | ||
} | ||
|
||
impl AsyncRead for BoxBodyReader { | ||
// Required method | ||
fn poll_read( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &mut [u8], | ||
) -> Poll<Result<usize, std::io::Error>> { | ||
if self.store.len() > 0 { | ||
let size = self.store.len().min(buf.len()); | ||
buf[..size].clone_from_slice(&self.store[..size]); | ||
self.store = self.store.split_off(size); | ||
Poll::Ready(Ok(size)) | ||
} else { | ||
match Pin::new(&mut self.inner).poll_data(cx) { | ||
Poll::Pending => Poll::Pending, | ||
Poll::Ready(opt) => match opt { | ||
Some(result) => match result { | ||
Ok(data) => { | ||
use bytes::Buf; | ||
self.store.extend_from_slice(data.chunk()); | ||
let size = self.store.len().min(buf.len()); | ||
buf[..size].clone_from_slice(&self.store[..size]); | ||
self.store = self.store.split_off(size); | ||
Poll::Ready(Ok(size)) | ||
} | ||
Err(e) => { | ||
Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))) | ||
} | ||
}, | ||
None => Poll::Ready(Ok(0)), // Not sure about this | ||
}, | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl Stream for Body { | ||
type Item = Result<Bytes, Error>; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be easier to set this at the workspace level.