Skip to content

Commit

Permalink
Implement Hibernatable Web Sockets API (#436)
Browse files Browse the repository at this point in the history
* Rebase commit

* Pull Request feedback

* Fix problems with async_trait

* Improve error message around incorrect impl methods

* Improve error message around incorrect impl methods

* Add missing semi-colon

* Add missing async

* Add clippy exceptions

* Fix trait type

* Properly qualify worker_sys

* Change websockets to ref:

* Revert formatting changes to Cargo.toml

* Remove left-over code

* fix formatting

* clippy

---------

Co-authored-by: Kevin Flansburg <[email protected]>
  • Loading branch information
DylanRJohnston and kflansburg authored Feb 28, 2024
1 parent e06193a commit 1ddf6d7
Showing 5 changed files with 263 additions and 19 deletions.
166 changes: 148 additions & 18 deletions worker-macros/src/durable_object.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use proc_macro2::{Ident, TokenStream};
use quote::{quote, ToTokens};
use syn::{spanned::Spanned, Error, FnArg, ImplItem, Item, Type, TypePath};
use syn::{spanned::Spanned, Error, FnArg, ImplItem, Item, Type, TypePath, Visibility};

pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
let item = syn::parse2::<Item>(tokens)?;
@@ -20,24 +20,37 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
let struct_name = imp.self_ty;
let items = imp.items;
let mut tokenized = vec![];
let mut has_alarm = false;

#[derive(Default)]
struct OptionalMethods {
has_alarm: bool,
has_websocket_message: bool,
has_websocket_close: bool,
has_websocket_error: bool,
}

let mut optional_methods = OptionalMethods::default();

for item in items {
let impl_method = match item {
ImplItem::Fn(func) => func,
_ => return Err(Error::new_spanned(item, "Impl block must only contain methods"))
};

let span = impl_method.sig.ident.span();

let tokens = match impl_method.sig.ident.to_string().as_str() {
"new" => {
let mut method = impl_method.clone();
method.sig.ident = Ident::new("_new", method.sig.ident.span());
method.vis = Visibility::Inherited;


// modify the `state` argument so it is type ObjectState
let arg_tokens = method.sig.inputs.first_mut().expect("DurableObject `new` method must have 2 arguments: state and env").into_token_stream();
match syn::parse2::<FnArg>(arg_tokens)? {
FnArg::Typed(pat) => {
let path = syn::parse2::<TypePath>(quote!{worker_sys::DurableObjectState})?;
let path = syn::parse2::<TypePath>(quote!{worker::worker_sys::DurableObjectState})?;
let mut updated_pat = pat;
updated_pat.ty = Box::new(Type::Path(path));

@@ -57,17 +70,19 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
prepended.extend(method.block.stmts);
method.block.stmts = prepended;

quote! {
Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(constructor)]
pub #method
}
})
},
"fetch" => {
let mut method = impl_method.clone();
method.sig.ident = Ident::new("_fetch_raw", method.sig.ident.span());
quote! {
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = fetch)]
pub fn _fetch(&mut self, req: worker_sys::web_sys::Request) -> js_sys::Promise {
pub fn _fetch(&mut self, req: worker::worker_sys::web_sys::Request) -> worker::js_sys::Promise {
// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
@@ -77,22 +92,24 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._fetch_raw(req.into()).await.map(worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
static_self._fetch_raw(req.into()).await.map(worker::worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
}
})
},
"alarm" => {
has_alarm = true;
optional_methods.has_alarm = true;

let mut method = impl_method.clone();
method.sig.ident = Ident::new("_alarm_raw", method.sig.ident.span());
quote! {
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = alarm)]
pub fn _alarm(&mut self) -> js_sys::Promise {
pub fn _alarm(&mut self) -> worker::js_sys::Promise {
// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
@@ -102,24 +119,131 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._alarm_raw().await.map(worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
static_self._alarm_raw().await.map(worker::worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
}
}
_ => panic!()
})
},
"websocket_message" => {
optional_methods.has_websocket_message = true;

let mut method = impl_method.clone();
method.sig.ident = Ident::new("_websocket_message_raw", method.sig.ident.span());
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = webSocketMessage)]
pub fn _websocket_message(&mut self, ws: worker::worker_sys::web_sys::WebSocket, message: wasm_bindgen::JsValue) -> worker::js_sys::Promise {
let ws_message = if let Some(string_message) = message.as_string() {
worker::WebSocketIncomingMessage::String(string_message)
} else {
let v = worker::js_sys::Uint8Array::new(&message).to_vec();
worker::WebSocketIncomingMessage::Binary(v)
};

// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
// we know something that Rust doesn't: that the Durable Object will never be destroyed
// while there is still a running promise inside of it, therefore we can let a reference
// to the durable object escape into a static-lifetime future.
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._websocket_message_raw(ws.into(), ws_message).await.map(|_| wasm_bindgen::JsValue::NULL)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
})
},
"websocket_close" => {
optional_methods.has_websocket_close = true;

let mut method = impl_method.clone();
method.sig.ident = Ident::new("_websocket_close_raw", method.sig.ident.span());
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = webSocketClose)]
pub fn _websocket_close(&mut self, ws: worker::worker_sys::web_sys::WebSocket, code: usize, reason: String, was_clean: bool) -> worker::js_sys::Promise {
// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
// we know something that Rust doesn't: that the Durable Object will never be destroyed
// while there is still a running promise inside of it, therefore we can let a reference
// to the durable object escape into a static-lifetime future.
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._websocket_close_raw(ws.into(), code, reason, was_clean).await.map(|_| wasm_bindgen::JsValue::NULL)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
})
},
"websocket_error" => {
optional_methods.has_websocket_error = true;

let mut method = impl_method.clone();
method.sig.ident = Ident::new("_websocket_error_raw", method.sig.ident.span());
method.vis = Visibility::Inherited;

Ok(quote! {
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = webSocketError)]
pub fn _websocket_error(&mut self, ws: worker::worker_sys::web_sys::WebSocket, error: wasm_bindgen::JsValue) -> worker::js_sys::Promise {
// SAFETY:
// On the surface, this is unsound because the Durable Object could be dropped
// while JavaScript still has possession of the future. However,
// we know something that Rust doesn't: that the Durable Object will never be destroyed
// while there is still a running promise inside of it, therefore we can let a reference
// to the durable object escape into a static-lifetime future.
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};

wasm_bindgen_futures::future_to_promise(async move {
static_self._websocket_error_raw(ws.into(), error.into()).await.map(|_| wasm_bindgen::JsValue::NULL)
.map_err(wasm_bindgen::JsValue::from)
})
}

#method
})
},
ident => Err(Error::new(span, format!("Unsupported method `{}`, please move extra impl methods to a separate impl definition", ident)))
};
tokenized.push(tokens);
tokenized.push(tokens?);
}

let alarm_tokens = has_alarm.then(|| quote! {
let alarm_tokens = optional_methods.has_alarm.then(|| quote! {
async fn alarm(&mut self) -> ::worker::Result<worker::Response> {
self._alarm_raw().await
}
});

let websocket_message_tokens = optional_methods.has_websocket_message.then(|| quote! {
async fn websocket_message(&mut self, ws: ::worker::WebSocket, message: ::worker::WebSocketIncomingMessage) -> ::worker::Result<()> {
self._websocket_message_raw(ws, message).await
}
});

let websocket_close_tokens = optional_methods.has_websocket_close.then(|| quote! {
async fn websocket_close(&mut self, ws: ::worker::WebSocket, code: usize, reason: String, was_clean: bool) -> ::worker::Result<()> {
self._websocket_close_raw(ws, code, reason, was_clean).await
}
});

let websocket_error_tokens = optional_methods.has_websocket_error.then(|| quote! {
async fn websocket_error(&mut self, ws: ::worker::WebSocket, error: ::worker::Error) -> ::worker::Result<()> {
self._websocket_error_raw(ws, error).await
}
});

Ok(quote! {
#wasm_bindgen_attr
impl #struct_name {
@@ -137,6 +261,12 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
}

#alarm_tokens

#websocket_message_tokens

#websocket_close_tokens

#websocket_error_tokens
}

trait __Need_Durable_Object_Trait_Impl_With_durable_object_Attribute { const MACROED: bool = true; }
20 changes: 20 additions & 0 deletions worker-sys/src/ext/websocket.rs
Original file line number Diff line number Diff line change
@@ -10,6 +10,12 @@ mod glue {

#[wasm_bindgen(method, catch)]
pub fn accept(this: &WebSocket) -> Result<(), JsValue>;

#[wasm_bindgen(method, catch, js_name = "serializeAttachment")]
pub fn serialize_attachment(this: &WebSocket, value: JsValue) -> Result<(), JsValue>;

#[wasm_bindgen(method, catch, js_name = "deserializeAttachment")]
pub fn deserialize_attachment(this: &WebSocket) -> Result<JsValue, JsValue>;
}
}

@@ -18,10 +24,24 @@ pub trait WebSocketExt {
///
/// [CF Documentation](https://developers.cloudflare.com/workers/runtime-apis/websockets#accept)
fn accept(&self) -> Result<(), JsValue>;

fn serialize_attachment(&self, value: JsValue) -> Result<(), JsValue>;

fn deserialize_attachment(&self) -> Result<JsValue, JsValue>;
}

impl WebSocketExt for web_sys::WebSocket {
fn accept(&self) -> Result<(), JsValue> {
self.unchecked_ref::<glue::WebSocket>().accept()
}

fn serialize_attachment(&self, value: JsValue) -> Result<(), JsValue> {
self.unchecked_ref::<glue::WebSocket>()
.serialize_attachment(value)
}

fn deserialize_attachment(&self) -> Result<JsValue, JsValue> {
self.unchecked_ref::<glue::WebSocket>()
.deserialize_attachment()
}
}
17 changes: 17 additions & 0 deletions worker-sys/src/types/durable_object/state.rs
Original file line number Diff line number Diff line change
@@ -15,4 +15,21 @@ extern "C" {

#[wasm_bindgen(method, js_name=waitUntil)]
pub fn wait_until(this: &DurableObjectState, promise: &js_sys::Promise);

#[wasm_bindgen(method, js_name=acceptWebSocket)]
pub fn accept_websocket(this: &DurableObjectState, ws: &web_sys::WebSocket);

#[wasm_bindgen(method, js_name=acceptWebSocket)]
pub fn accept_websocket_with_tags(
this: &DurableObjectState,
ws: &web_sys::WebSocket,
tags: Vec<JsValue>,
);

#[wasm_bindgen(method, js_name=getWebSockets)]
pub fn get_websockets(this: &DurableObjectState) -> Vec<web_sys::WebSocket>;

#[wasm_bindgen(method, js_name=getWebSockets)]
pub fn get_websockets_with_tag(this: &DurableObjectState, tag: &str)
-> Vec<web_sys::WebSocket>;
}
Loading

0 comments on commit 1ddf6d7

Please sign in to comment.