diff --git a/Cargo.toml b/Cargo.toml index 897e751449319..62cf40e6870bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -234,6 +234,9 @@ bevy_dev_tools = ["bevy_internal/bevy_dev_tools"] # Enable the Bevy Remote Protocol bevy_remote = ["bevy_internal/bevy_remote"] +# Enable WebSocket support for Bevy Remote Protocol +remote_websocket = ["bevy_internal/remote_websocket"] + # Enable passthrough loading for SPIR-V shaders (Only supported on Vulkan, shader capabilities and extensions must agree with the platform implementation) spirv_shader_passthrough = ["bevy_internal/spirv_shader_passthrough"] diff --git a/crates/bevy_internal/Cargo.toml b/crates/bevy_internal/Cargo.toml index 9e6cdebacccfb..2ee3e903c04bf 100644 --- a/crates/bevy_internal/Cargo.toml +++ b/crates/bevy_internal/Cargo.toml @@ -217,6 +217,9 @@ bevy_dev_tools = ["dep:bevy_dev_tools"] # Enable support for the Bevy Remote Protocol bevy_remote = ["dep:bevy_remote"] +# Enable WebSocket support for Bevy Remote Protocol +remote_websocket = ["bevy_remote/remote_websocket"] + # Provides picking functionality bevy_picking = [ "dep:bevy_picking", diff --git a/crates/bevy_remote/Cargo.toml b/crates/bevy_remote/Cargo.toml index 4a12f7742c997..0304164fa6e05 100644 --- a/crates/bevy_remote/Cargo.toml +++ b/crates/bevy_remote/Cargo.toml @@ -10,7 +10,13 @@ keywords = ["bevy"] [features] default = ["http"] -http = ["dep:async-io", "dep:smol-hyper"] +http = ["dep:hyper", "dep:http-body-util", "dep:async-io", "dep:smol-hyper"] +remote_websocket = [ + "http", + "dep:tungstenite", + "dep:hyper-tungstenite", + "dep:futures-util", +] [dependencies] # bevy @@ -26,12 +32,19 @@ bevy_utils = { path = "../bevy_utils", version = "0.15.0-dev" } # other anyhow = "1" -hyper = { version = "1", features = ["server", "http1"] } serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } -http-body-util = "0.1" async-channel = "2" +# http +hyper = { version = "1", features = ["server", "http1"], optional = true } +http-body-util = { version = "0.1", optional = true } + +# websocket +tungstenite = { version = "0.24", optional = true } +hyper-tungstenite = { version = "0.15", optional = true } +futures-util = { version = "0.3", default-features = false, optional = true } + # dependencies that will not compile on wasm [target.'cfg(not(target_family = "wasm"))'.dependencies] async-io = { version = "2", optional = true } diff --git a/crates/bevy_remote/src/http.rs b/crates/bevy_remote/src/http.rs index 321f6aa6bf497..f7436763a969e 100644 --- a/crates/bevy_remote/src/http.rs +++ b/crates/bevy_remote/src/http.rs @@ -246,30 +246,153 @@ async fn handle_client( request_sender: Sender, headers: Headers, ) -> AnyhowResult<()> { - http1::Builder::new() + let builder = http1::Builder::new() .timer(SmolTimer::new()) .serve_connection( FuturesIo::new(client), - service::service_fn(|request| { - process_request_batch(request, &request_sender, &headers) - }), - ) - .await?; + service::service_fn(|request| process_http_request(request, &request_sender, &headers)), + ); + + #[cfg(feature = "remote_websocket")] + let builder = builder.with_upgrades(); + + builder.await?; Ok(()) } -/// A helper function for the Bevy Remote Protocol server that handles a batch -/// of requests coming from a client. -async fn process_request_batch( - request: Request, +async fn process_http_request( + #[allow(unused_mut)] mut request: Request, request_sender: &Sender, headers: &Headers, ) -> AnyhowResult> { + #[cfg(feature = "remote_websocket")] + if hyper_tungstenite::is_upgrade_request(&request) { + let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?; + + IoTaskPool::get() + .spawn(handle_websocket_connection( + websocket, + request_sender.clone(), + )) + .detach(); + + let (header, body) = response.into_parts(); + + return Ok(Response::from_parts(header, BrpHttpBody::Complete(body))); + } + let batch_bytes = request.into_body().collect().await?.to_bytes(); - let batch: Result = serde_json::from_slice(&batch_bytes); + let batch = serde_json::from_slice::(&batch_bytes); - let result = match batch { + let mut response = match process_request_batch(batch, request_sender).await? { + BrpHttpResponse::Complete(serialized) => { + let mut response = Response::new(BrpHttpBody::Complete(Full::new(Bytes::from( + serialized.as_bytes().to_owned(), + )))); + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + response + } + BrpHttpResponse::Stream(stream) => { + let mut response = Response::new(BrpHttpBody::Stream(stream)); + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + HeaderValue::from_static("text/event-stream"), + ); + response + } + }; + + for (key, value) in &headers.headers { + response.headers_mut().insert(key, value.clone()); + } + + Ok(response) +} + +#[cfg(feature = "remote_websocket")] +async fn handle_websocket_connection( + ws: hyper_tungstenite::HyperWebsocket, + request_sender: Sender, +) -> AnyhowResult<()> { + use futures_util::{stream::SplitSink, SinkExt, StreamExt}; + use hyper_tungstenite::HyperWebsocketStream; + use tungstenite::Message; + + async fn forward_responses( + mut response_receiver: Pin>>, + mut write_stream: SplitSink, + ) -> AnyhowResult<()> { + while let Some(response) = StreamExt::next(&mut response_receiver).await { + write_stream.send(response).await?; + } + Ok(()) + } + + async fn process_ws_request( + request: String, + request_sender: Sender, + response_sender: Sender, + ) -> AnyhowResult<()> { + let batch = serde_json::from_str::(&request); + + match process_request_batch(batch, &request_sender).await? { + BrpHttpResponse::Complete(serialized) => { + response_sender.send(Message::text(serialized)).await?; + } + BrpHttpResponse::Stream(mut stream) => { + while let Some(result) = StreamExt::next(&mut stream.rx).await { + let response = BrpResponse::new(stream.id.clone(), result); + let serialized = serde_json::to_string(&response).unwrap(); + response_sender.send(Message::text(serialized)).await?; + } + } + }; + + Ok(()) + } + + let ws = ws.await?; + + let (write_stream, mut read_stream) = ws.split(); + + let (response_sender, response_receiver) = async_channel::bounded(32); + + // Send any queued outgoing responses in a background task + IoTaskPool::get() + .spawn(forward_responses(Box::pin(response_receiver), write_stream)) + .detach(); + + // Read and process incoming requests + while let Some(message) = StreamExt::next(&mut read_stream).await { + match message { + Ok(Message::Text(request)) => { + IoTaskPool::get() + .spawn(process_ws_request( + request, + request_sender.clone(), + response_sender.clone(), + )) + .detach(); + } + Ok(Message::Close(_)) | Err(_) => return Ok(()), + _ => {} + } + } + + Ok(()) +} + +/// A helper function for the Bevy Remote Protocol server that handles a batch +/// of requests coming from a client. +async fn process_request_batch( + batch_result: Result, + request_sender: &Sender, +) -> AnyhowResult> { + let response = match batch_result { Ok(BrpBatch::Single(request)) => { let response = process_single_request(request, request_sender).await?; match response { @@ -315,29 +438,6 @@ async fn process_request_batch( } }; - let mut response = match result { - BrpHttpResponse::Complete(serialized) => { - let mut response = Response::new(BrpHttpBody::Complete(Full::new(Bytes::from( - serialized.as_bytes().to_owned(), - )))); - response.headers_mut().insert( - hyper::header::CONTENT_TYPE, - HeaderValue::from_static("application/json"), - ); - response - } - BrpHttpResponse::Stream(stream) => { - let mut response = Response::new(BrpHttpBody::Stream(stream)); - response.headers_mut().insert( - hyper::header::CONTENT_TYPE, - HeaderValue::from_static("text/event-stream"), - ); - response - } - }; - for (key, value) in &headers.headers { - response.headers_mut().insert(key, value.clone()); - } Ok(response) } diff --git a/docs/cargo_features.md b/docs/cargo_features.md index e9ad2e10c10ca..6606ecc4ea660 100644 --- a/docs/cargo_features.md +++ b/docs/cargo_features.md @@ -89,6 +89,7 @@ The default feature set enables most of the expected features of a game engine, |pnm|PNM image format support, includes pam, pbm, pgm and ppm| |qoi|QOI image format support| |reflect_functions|Enable function reflection| +|remote_websocket|Enable WebSocket support for Bevy Remote Protocol| |serialize|Enable serialization support through serde| |shader_format_glsl|Enable support for shaders in GLSL| |shader_format_spirv|Enable support for shaders in SPIR-V|