Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WebSocket support for RemoteHttpPlugin #16403

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ bevy_dev_tools = ["bevy_internal/bevy_dev_tools"]
# Enable the Bevy Remote Protocol
bevy_remote = ["bevy_internal/bevy_remote"]

# Enable WebSocket support for Bevy Remote Protocol
remote_websocket = ["bevy_internal/remote_websocket"]

# Enable passthrough loading for SPIR-V shaders (Only supported on Vulkan, shader capabilities and extensions must agree with the platform implementation)
spirv_shader_passthrough = ["bevy_internal/spirv_shader_passthrough"]

Expand Down
3 changes: 3 additions & 0 deletions crates/bevy_internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ bevy_dev_tools = ["dep:bevy_dev_tools"]
# Enable support for the Bevy Remote Protocol
bevy_remote = ["dep:bevy_remote"]

# Enable WebSocket support for Bevy Remote Protocol
remote_websocket = ["bevy_remote/remote_websocket"]

# Provides picking functionality
bevy_picking = [
"dep:bevy_picking",
Expand Down
19 changes: 16 additions & 3 deletions crates/bevy_remote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ keywords = ["bevy"]

[features]
default = ["http"]
http = ["dep:async-io", "dep:smol-hyper"]
http = ["dep:hyper", "dep:http-body-util", "dep:async-io", "dep:smol-hyper"]
remote_websocket = [
"http",
"dep:tungstenite",
"dep:hyper-tungstenite",
"dep:futures-util",
]

[dependencies]
# bevy
Expand All @@ -26,12 +32,19 @@ bevy_utils = { path = "../bevy_utils", version = "0.15.0-dev" }

# other
anyhow = "1"
hyper = { version = "1", features = ["server", "http1"] }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
http-body-util = "0.1"
async-channel = "2"

# http
hyper = { version = "1", features = ["server", "http1"], optional = true }
http-body-util = { version = "0.1", optional = true }

# websocket
tungstenite = { version = "0.24", optional = true }
hyper-tungstenite = { version = "0.15", optional = true }
futures-util = { version = "0.3", default-features = false, optional = true }

# dependencies that will not compile on wasm
[target.'cfg(not(target_family = "wasm"))'.dependencies]
async-io = { version = "2", optional = true }
Expand Down
170 changes: 135 additions & 35 deletions crates/bevy_remote/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,30 +246,153 @@ async fn handle_client(
request_sender: Sender<BrpMessage>,
headers: Headers,
) -> AnyhowResult<()> {
http1::Builder::new()
let builder = http1::Builder::new()
.timer(SmolTimer::new())
.serve_connection(
FuturesIo::new(client),
service::service_fn(|request| {
process_request_batch(request, &request_sender, &headers)
}),
)
.await?;
service::service_fn(|request| process_http_request(request, &request_sender, &headers)),
);

#[cfg(feature = "remote_websocket")]
let builder = builder.with_upgrades();

builder.await?;

Ok(())
}

/// A helper function for the Bevy Remote Protocol server that handles a batch
/// of requests coming from a client.
async fn process_request_batch(
request: Request<Incoming>,
async fn process_http_request(
#[allow(unused_mut)] mut request: Request<Incoming>,
request_sender: &Sender<BrpMessage>,
headers: &Headers,
) -> AnyhowResult<Response<BrpHttpBody>> {
#[cfg(feature = "remote_websocket")]
if hyper_tungstenite::is_upgrade_request(&request) {
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?;

IoTaskPool::get()
.spawn(handle_websocket_connection(
websocket,
request_sender.clone(),
))
.detach();

let (header, body) = response.into_parts();

return Ok(Response::from_parts(header, BrpHttpBody::Complete(body)));
}

let batch_bytes = request.into_body().collect().await?.to_bytes();
let batch: Result<BrpBatch, _> = serde_json::from_slice(&batch_bytes);
let batch = serde_json::from_slice::<BrpBatch>(&batch_bytes);

let result = match batch {
let mut response = match process_request_batch(batch, request_sender).await? {
BrpHttpResponse::Complete(serialized) => {
let mut response = Response::new(BrpHttpBody::Complete(Full::new(Bytes::from(
serialized.as_bytes().to_owned(),
))));
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
response
}
BrpHttpResponse::Stream(stream) => {
let mut response = Response::new(BrpHttpBody::Stream(stream));
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
HeaderValue::from_static("text/event-stream"),
);
response
}
};

for (key, value) in &headers.headers {
response.headers_mut().insert(key, value.clone());
}

Ok(response)
}

#[cfg(feature = "remote_websocket")]
async fn handle_websocket_connection(
ws: hyper_tungstenite::HyperWebsocket,
request_sender: Sender<BrpMessage>,
) -> AnyhowResult<()> {
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use hyper_tungstenite::HyperWebsocketStream;
use tungstenite::Message;

async fn forward_responses(
mut response_receiver: Pin<Box<Receiver<Message>>>,
mut write_stream: SplitSink<HyperWebsocketStream, Message>,
) -> AnyhowResult<()> {
while let Some(response) = StreamExt::next(&mut response_receiver).await {
write_stream.send(response).await?;
}
Ok(())
}

async fn process_ws_request(
request: String,
request_sender: Sender<BrpMessage>,
response_sender: Sender<Message>,
) -> AnyhowResult<()> {
let batch = serde_json::from_str::<BrpBatch>(&request);

match process_request_batch(batch, &request_sender).await? {
BrpHttpResponse::Complete(serialized) => {
response_sender.send(Message::text(serialized)).await?;
}
BrpHttpResponse::Stream(mut stream) => {
while let Some(result) = StreamExt::next(&mut stream.rx).await {
let response = BrpResponse::new(stream.id.clone(), result);
let serialized = serde_json::to_string(&response).unwrap();
response_sender.send(Message::text(serialized)).await?;
}
}
};

Ok(())
}

let ws = ws.await?;

let (write_stream, mut read_stream) = ws.split();

let (response_sender, response_receiver) = async_channel::bounded(32);

// Send any queued outgoing responses in a background task
IoTaskPool::get()
.spawn(forward_responses(Box::pin(response_receiver), write_stream))
.detach();

// Read and process incoming requests
while let Some(message) = StreamExt::next(&mut read_stream).await {
match message {
Ok(Message::Text(request)) => {
IoTaskPool::get()
.spawn(process_ws_request(
request,
request_sender.clone(),
response_sender.clone(),
))
.detach();
}
Ok(Message::Close(_)) | Err(_) => return Ok(()),
_ => {}
}
}

Ok(())
}

/// A helper function for the Bevy Remote Protocol server that handles a batch
/// of requests coming from a client.
async fn process_request_batch(
batch_result: Result<BrpBatch, serde_json::Error>,
request_sender: &Sender<BrpMessage>,
) -> AnyhowResult<BrpHttpResponse<String, BrpStream>> {
let response = match batch_result {
Ok(BrpBatch::Single(request)) => {
let response = process_single_request(request, request_sender).await?;
match response {
Expand Down Expand Up @@ -315,29 +438,6 @@ async fn process_request_batch(
}
};

let mut response = match result {
BrpHttpResponse::Complete(serialized) => {
let mut response = Response::new(BrpHttpBody::Complete(Full::new(Bytes::from(
serialized.as_bytes().to_owned(),
))));
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
response
}
BrpHttpResponse::Stream(stream) => {
let mut response = Response::new(BrpHttpBody::Stream(stream));
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
HeaderValue::from_static("text/event-stream"),
);
response
}
};
for (key, value) in &headers.headers {
response.headers_mut().insert(key, value.clone());
}
Ok(response)
}

Expand Down
1 change: 1 addition & 0 deletions docs/cargo_features.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The default feature set enables most of the expected features of a game engine,
|pnm|PNM image format support, includes pam, pbm, pgm and ppm|
|qoi|QOI image format support|
|reflect_functions|Enable function reflection|
|remote_websocket|Enable WebSocket support for Bevy Remote Protocol|
|serialize|Enable serialization support through serde|
|shader_format_glsl|Enable support for shaders in GLSL|
|shader_format_spirv|Enable support for shaders in SPIR-V|
Expand Down
Loading