From c586aaee1798ad2d70e393417741e18c4692c572 Mon Sep 17 00:00:00 2001 From: Ryan Butler Date: Sat, 15 Jun 2024 18:32:59 -0400 Subject: [PATCH] physnet: actually connect to manager --- Cargo.lock | 6 +- Cargo.toml | 1 + apps/networked_physics_demo/client/Cargo.toml | 2 + .../client/src/netcode.rs | 103 ++++++++++++--- .../client/src/title_screen.rs | 119 ++++++++++++++---- crates/replicate/client/src/lib.rs | 4 +- 6 files changed, 191 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4431c01..e611a9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -480,9 +480,9 @@ dependencies = [ [[package]] name = "async-compat" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68a707c1feb095d8c07f8a65b9f506b117d30af431cab89374357de7c11461b" +checksum = "7bab94bde396a3f7b4962e396fdad640e241ed797d4d8d77fc8c237d14c58fc0" dependencies = [ "futures-core", "futures-io", @@ -6034,6 +6034,7 @@ dependencies = [ name = "physnet_client" version = "0.0.0" dependencies = [ + "async-compat", "bevy", "bevy-inspector-egui", "bevy_flycam", @@ -6045,6 +6046,7 @@ dependencies = [ "rand_xoshiro", "replicate-client", "serde_json", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 46f787a..806890e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ edition = "2021" rust-version = "1.78.0" [workspace.dependencies] +async-compat = "0.2.4" base64 = "0.21.7" bevy = { version = "0.13", features = ["serialize"] } bevy-inspector-egui = "0.23.4" diff --git a/apps/networked_physics_demo/client/Cargo.toml b/apps/networked_physics_demo/client/Cargo.toml index 46ea764..e607b13 100644 --- a/apps/networked_physics_demo/client/Cargo.toml +++ b/apps/networked_physics_demo/client/Cargo.toml @@ -23,3 +23,5 @@ rand.workspace = true rand_xoshiro.workspace = true replicate-client.workspace = true serde_json.workspace = true +tokio.workspace = true +async-compat.workspace = true diff --git a/apps/networked_physics_demo/client/src/netcode.rs b/apps/networked_physics_demo/client/src/netcode.rs index c73ff0c..a27e8fe 100644 --- a/apps/networked_physics_demo/client/src/netcode.rs +++ b/apps/networked_physics_demo/client/src/netcode.rs @@ -5,47 +5,110 @@ use bevy::{ ecs::{ component::Component, entity::Entity, - event::{Event, EventReader}, + event::{Event, EventReader, EventWriter}, query::{Added, With, Without}, - schedule::NextState, - system::{Commands, Query, Res, ResMut, Resource}, + system::{CommandQueue, Commands, Query, Res, ResMut, Resource}, + world::World, }, - log::trace, - reflect::Reflect, + log::{error, trace}, + tasks::IoTaskPool, transform::components::{GlobalTransform, Transform}, }; +use color_eyre::eyre::{Result, WrapErr as _}; use replicate_client::common::data_model::{DataModel, Entity as DmEntity}; +use tokio::sync::mpsc; -use crate::GameModeState; +const BOUNDED_CHAN_COMMAND_QUEUE_SIZE: usize = 16; -#[derive(Debug)] -pub struct NetcodePlugin; +#[derive(Debug, Default)] +pub struct NetcodePlugin {} impl Plugin for NetcodePlugin { fn build(&self, app: &mut bevy::prelude::App) { - app.register_type::() - .add_event::() + app.add_event::() + .add_event::() + .init_resource::() .init_resource::() - .add_systems(PreUpdate, from_data_model) + .add_systems(PreUpdate, (apply_queued_commands, from_data_model)) .add_systems(PostUpdate, (spawn_entities, to_data_model)) .add_systems(Update, on_connect_to_manager_evt); } } +#[derive(Debug, Resource)] +struct NetcodeManager(#[allow(unused)] replicate_client::manager::Manager); + +/// Convenient way to receive commands sent from the async tasks. +#[derive(Debug, Resource)] +struct CommandQueueChannel { + tx: mpsc::Sender, + rx: mpsc::Receiver, +} + +impl Default for CommandQueueChannel { + fn default() -> Self { + let (tx, rx) = mpsc::channel(BOUNDED_CHAN_COMMAND_QUEUE_SIZE); + Self { tx, rx } + } +} + +fn apply_queued_commands( + mut commands: Commands, + mut chan: ResMut, +) { + while let Ok(mut command_queue) = chan.rx.try_recv() { + commands.append(&mut command_queue) + } +} + /// Other plugins create this when they want to connect to a manager. -#[derive(Debug, Reflect, Event, Eq, PartialEq)] -pub struct ConnectToManager { - /// The URL of the manager to connect to - pub manager_url: String, +#[derive(Debug, Event, Eq, PartialEq)] +pub struct ConnectToManagerRequest { + /// The URL of the manager to connect to. If `None`, locally host. + pub manager_url: Option, } +/// Produced in response to [`ConnectToManagerRequest`]. +#[derive(Debug, Event)] +pub struct ConnectToManagerResponse(pub Result<()>); + fn on_connect_to_manager_evt( - mut connect_to_manager: EventReader, - mut next_state: ResMut>, + command_queue: Res, + mut request: EventReader, + mut response: EventWriter, ) { - for ConnectToManager { manager_url: _ } in connect_to_manager.read() { - // TODO: Actually connect to the manager instead of faking it - next_state.set(GameModeState::InMinecraft); + for ConnectToManagerRequest { manager_url } in request.read() { + let Some(manager_url) = manager_url else { + response.send(ConnectToManagerResponse(Ok(()))); + continue; + }; + let pool = IoTaskPool::get(); + let manager_url = manager_url.to_owned(); + let tx = command_queue.tx.clone(); + pool.spawn(async_compat::Compat::new(async move { + let connect_result = + replicate_client::manager::Manager::connect(manager_url, None) + .await + .wrap_err("failed to connect to manager server"); + if let Err(ref err) = connect_result { + error!("{err:?}"); + } + + // We use a command queue to enqueue commands back to bevy from the + // async code. + let mut queue = CommandQueue::default(); + let response_event = ConnectToManagerResponse(connect_result.map(|mngr| { + queue.push(|w: &mut World| w.insert_resource(NetcodeManager(mngr))); + })); + queue.push(|w: &mut World| { + w.send_event(response_event).expect("failed to send event"); + }); + match tx.send(queue).await { + Ok(()) | Err(mpsc::error::SendError(_)) => (), + } + })) + // We don't need to explicitly retrieve the return value. + .detach(); } } diff --git a/apps/networked_physics_demo/client/src/title_screen.rs b/apps/networked_physics_demo/client/src/title_screen.rs index 93d89af..69142d2 100644 --- a/apps/networked_physics_demo/client/src/title_screen.rs +++ b/apps/networked_physics_demo/client/src/title_screen.rs @@ -3,9 +3,10 @@ use bevy::{ app::{Plugin, Update}, ecs::{ - event::{Event, EventWriter}, + event::{Event, EventReader, EventWriter}, schedule::{ - common_conditions::in_state, IntoSystemConfigs as _, NextState, OnEnter, + common_conditions::in_state, IntoSystemConfigs as _, + IntoSystemSetConfigs as _, NextState, OnEnter, }, system::{Commands, Res, ResMut}, }, @@ -14,7 +15,10 @@ use bevy::{ }; use bevy_inspector_egui::bevy_egui::{egui, EguiContexts}; -use crate::{netcode::ConnectToManager, AppExt, GameModeState}; +use crate::{ + netcode::{ConnectToManagerRequest, ConnectToManagerResponse}, + AppExt, GameModeState, +}; use self::ui::EventWriters; @@ -24,13 +28,20 @@ pub struct TitleScreenPlugin; impl Plugin for TitleScreenPlugin { fn build(&self, app: &mut bevy::prelude::App) { app.add_if_not_added(bevy_inspector_egui::bevy_egui::EguiPlugin) - .add_if_not_added(crate::netcode::NetcodePlugin) + .add_if_not_added(crate::netcode::NetcodePlugin::default()) .register_type::() + .configure_sets( + Update, + UiStateSystems.run_if(in_state(GameModeState::TitleScreen)), + ) .add_systems(OnEnter(GameModeState::TitleScreen), setup) .add_systems( Update, - (should_transition, draw_title_screen) - .run_if(in_state(GameModeState::TitleScreen)), + ( + handle_connect_to_manager_response.in_set(UiStateSystems), + (should_transition, draw_ui.after(UiStateSystems)) + .run_if(in_state(GameModeState::TitleScreen)), + ), ); } } @@ -39,11 +50,13 @@ impl Plugin for TitleScreenPlugin { mod ui { use bevy::{ecs::system::Resource, prelude::default}; + use crate::netcode::ConnectToManagerRequest; + use super::*; /// [`EventWriter`]s needed by the UI. pub(super) struct EventWriters<'a> { - pub connect_to_manager: EventWriter<'a, ConnectToManager>, + pub connect_to_manager: EventWriter<'a, ConnectToManagerRequest>, } impl EventWriters<'_> { @@ -59,7 +72,7 @@ mod ui { fn send(self, evw: &mut EventWriters<'_>); } - impl UiEvent for ConnectToManager { + impl UiEvent for ConnectToManagerRequest { fn send(self, evw: &mut EventWriters<'_>) { evw.connect_to_manager.send(self); } @@ -94,8 +107,15 @@ mod ui { /// User has chosen to create an instance. #[derive(Debug, Reflect, Eq, PartialEq)] pub enum CreateInstance { - Initial { manager_url: String }, - WaitingForConnection, + Initial { + manager_url: String, + /// Non-empty will display this as the error. + error_msg: String, + }, + WaitingForConnection { + /// The url given in `Initial` + manager_url: String, + }, Connected, InstanceCreated, } @@ -105,31 +125,59 @@ mod ui { match self { CreateInstance::Initial { ref mut manager_url, + ref mut error_msg, } => { - ui.add(egui::TextEdit::singleline(manager_url).hint_text( - "Manager Url (leave blank to spin up server locally)", - )); + ui.add( + egui::TextEdit::singleline(manager_url) + .hint_text( + "Manager Url (leave blank to spin up server locally)", + ) + .text_color_opt( + (!error_msg.is_empty()).then_some(egui::Color32::RED), + ), + ); let text = if manager_url.is_empty() { + error_msg.clear(); "Locally Host" + } else if !error_msg.is_empty() { + error_msg.as_str() } else { "Remotely Host" }; if ui.button(text).clicked() { - evw.send(ConnectToManager { - manager_url: manager_url.to_owned(), - }); - return CreateInstance::WaitingForConnection.into(); + match (!manager_url.is_empty()) + .then(|| manager_url.parse()) + .transpose() + { + Ok(parsed_manager_url) => { + evw.send(ConnectToManagerRequest { + manager_url: parsed_manager_url, + }); + return CreateInstance::WaitingForConnection { + manager_url: std::mem::take(manager_url), + } + .into(); + } + Err(_parse_err) => { + error_msg.clear(); + error_msg.push_str("Invalid URL"); + } + } } if ui.button("Back").clicked() { return default(); } self.into() } - CreateInstance::WaitingForConnection => { + CreateInstance::WaitingForConnection { .. } => { + ui.spinner(); + self.into() + } + CreateInstance::Connected => { + ui.label("Connected to Manager, creating instance..."); ui.spinner(); self.into() } - CreateInstance::Connected => todo!(), CreateInstance::InstanceCreated => todo!(), } } @@ -139,6 +187,7 @@ mod ui { fn default() -> Self { Self::Initial { manager_url: default(), + error_msg: String::new(), } } } @@ -191,10 +240,38 @@ fn setup(mut commands: Commands) { commands.init_resource::() } -fn draw_title_screen( +/// Systems related to transition of UI state +#[derive(bevy::prelude::SystemSet, Hash, Debug, Eq, PartialEq, Clone)] +struct UiStateSystems; + +fn handle_connect_to_manager_response( + mut ui_state: ResMut, + mut connect_to_manager_response: EventReader, +) { + let ui::TitleScreen::Create(ref mut create_state) = *ui_state else { + return; + }; + for response in connect_to_manager_response.read() { + let Err(ref _response_err) = response.0 else { + *create_state = ui::CreateInstance::Connected; + continue; + }; + if let ui::CreateInstance::WaitingForConnection { + ref mut manager_url, + } = *create_state + { + *create_state = ui::CreateInstance::Initial { + error_msg: "Could not connect to server".to_owned(), + manager_url: std::mem::take(manager_url), + } + } + } +} + +fn draw_ui( mut state: ResMut, mut contexts: EguiContexts, - connect_to_manager: EventWriter, + connect_to_manager: EventWriter, ) { egui::Window::new("Instances") .resizable(false) diff --git a/crates/replicate/client/src/lib.rs b/crates/replicate/client/src/lib.rs index 9d62009..f397887 100644 --- a/crates/replicate/client/src/lib.rs +++ b/crates/replicate/client/src/lib.rs @@ -3,9 +3,11 @@ use eyre::Result; use tracing::warn; use url::Url; -pub use replicate_common as common; use wtransport::{endpoint::ConnectOptions, ClientConfig, Endpoint}; +pub use replicate_common as common; +pub use url; + pub mod instance; pub mod manager;