Skip to content

Commit

Permalink
physnet: create instance ui and IO
Browse files Browse the repository at this point in the history
  • Loading branch information
TheButlah committed Jun 20, 2024
1 parent bd07767 commit 56aab4d
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 21 deletions.
89 changes: 76 additions & 13 deletions apps/networked_physics_demo/client/src/netcode.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::ops::{Deref, DerefMut};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
};

use bevy::{
app::{Plugin, PostUpdate, PreUpdate, Update},
Expand All @@ -7,15 +10,19 @@ use bevy::{
entity::Entity,
event::{Event, EventReader, EventWriter},
query::{Added, With, Without},
schedule::{common_conditions::resource_exists, IntoSystemConfigs as _},
system::{CommandQueue, Commands, Query, Res, ResMut, Resource},
world::World,
},
log::{error, trace},
log::{debug, error, trace},
tasks::IoTaskPool,
transform::components::{GlobalTransform, Transform},
};
use color_eyre::eyre::{Result, WrapErr as _};
use replicate_client::common::data_model::{DataModel, Entity as DmEntity};
use replicate_client::{
common::data_model::{DataModel, Entity as DmEntity},
url::Url,
};
use tokio::sync::mpsc;

const BOUNDED_CHAN_COMMAND_QUEUE_SIZE: usize = 16;
Expand All @@ -27,16 +34,25 @@ impl Plugin for NetcodePlugin {
fn build(&self, app: &mut bevy::prelude::App) {
app.add_event::<ConnectToManagerRequest>()
.add_event::<ConnectToManagerResponse>()
.add_event::<CreateInstanceRequest>()
.add_event::<CreateInstanceResponse>()
.init_resource::<CommandQueueChannel>()
.init_resource::<NetcodeDataModel>()
.add_systems(PreUpdate, (apply_queued_commands, from_data_model))
.add_systems(PostUpdate, (spawn_entities, to_data_model))
.add_systems(Update, on_connect_to_manager_evt);
.add_systems(
Update,
(
handle_connect_to_manager_evt,
handle_create_instance_evt
.run_if(resource_exists::<NetcodeManager>),
),
);
}
}

#[derive(Debug, Resource)]
struct NetcodeManager(#[allow(unused)] replicate_client::manager::Manager);
#[derive(Debug, Resource, derive_more::Deref)]
pub struct NetcodeManager(Arc<replicate_client::manager::Manager>);

/// Convenient way to receive commands sent from the async tasks.
#[derive(Debug, Resource)]
Expand Down Expand Up @@ -65,14 +81,14 @@ fn apply_queued_commands(
#[derive(Debug, Event, Eq, PartialEq)]
pub struct ConnectToManagerRequest {
/// The URL of the manager to connect to. If `None`, locally host.
pub manager_url: Option<replicate_client::url::Url>,
pub manager_url: Option<Url>,
}

/// Produced in response to [`ConnectToManagerRequest`].
#[derive(Debug, Event)]
pub struct ConnectToManagerResponse(pub Result<()>);

fn on_connect_to_manager_evt(
fn handle_connect_to_manager_evt(
command_queue: Res<CommandQueueChannel>,
mut request: EventReader<ConnectToManagerRequest>,
mut response: EventWriter<ConnectToManagerResponse>,
Expand All @@ -82,9 +98,10 @@ fn on_connect_to_manager_evt(
response.send(ConnectToManagerResponse(Ok(())));
continue;
};
let pool = IoTaskPool::get();
let manager_url = manager_url.to_owned();
let tx = command_queue.tx.clone();
let pool = IoTaskPool::get();
debug!("spawned async task for connecting to manager");
pool.spawn(async_compat::Compat::new(async move {
let connect_result =
replicate_client::manager::Manager::connect(manager_url, None)
Expand All @@ -98,20 +115,66 @@ fn on_connect_to_manager_evt(
// async code.
let mut queue = CommandQueue::default();
let response_event = ConnectToManagerResponse(connect_result.map(|mngr| {
queue.push(|w: &mut World| w.insert_resource(NetcodeManager(mngr)));
queue.push(|w: &mut World| {
w.insert_resource(NetcodeManager(Arc::new(mngr)))
});
}));
queue.push(|w: &mut World| {
w.send_event(response_event).expect("failed to send event");
});
match tx.send(queue).await {
Ok(()) | Err(mpsc::error::SendError(_)) => (),
}
let _ = tx.send(queue).await;
}))
// We don't need to explicitly retrieve the return value.
.detach();
}
}

/// Other plugins can send this to create and then connect to a new instance.
#[derive(Debug, Event, Eq, PartialEq)]
pub struct CreateInstanceRequest;

/// Produced in response to [`CreateInstanceRequest`].
#[derive(Debug, Event)]
pub struct CreateInstanceResponse(pub Result<Url>);

fn handle_create_instance_evt(
command_queue: Res<CommandQueueChannel>,
manager: Res<NetcodeManager>,
mut request: EventReader<CreateInstanceRequest>,
) {
for _ in request.read() {
let mngr = manager.0.clone();
let url_fut = async move {
let id = mngr
.instance_create()
.await
.wrap_err("failed to create instance")?;
mngr.instance_url(id)
.await
.wrap_err("failed to get instance url")
};
let tx = command_queue.tx.clone();
let pool = IoTaskPool::get();
debug!("spawned async task for creating instance");
pool.spawn(async_compat::Compat::new(async move {
let url_result = url_fut.await;
if let Err(ref err) = url_result {
error!("{err:?}");
}

// We use a command queue to enqueue commands back to bevy from the
// async code.
let mut queue = CommandQueue::default();
let response = CreateInstanceResponse(url_result);
queue.push(|w: &mut World| {
w.send_event(response).expect("failed to send event");
});
let _ = tx.send(queue).await;
}))
.detach()
}
}

/// Add this to entities that should be synchronized over the network
#[derive(Debug, Eq, PartialEq, Component)]
pub struct Synchronized(pub DmEntity);
Expand Down
77 changes: 71 additions & 6 deletions apps/networked_physics_demo/client/src/title_screen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ use bevy::{
system::{Commands, Res, ResMut},
},
input::{keyboard::KeyCode, ButtonInput},
log::trace,
reflect::Reflect,
};
use bevy_inspector_egui::bevy_egui::{egui, EguiContexts};

use crate::{
netcode::{ConnectToManagerRequest, ConnectToManagerResponse},
netcode::{
ConnectToManagerRequest, ConnectToManagerResponse, CreateInstanceRequest,
CreateInstanceResponse,
},
AppExt, GameModeState,
};

Expand All @@ -38,7 +42,11 @@ impl Plugin for TitleScreenPlugin {
.add_systems(
Update,
(
handle_connect_to_manager_response.in_set(UiStateSystems),
(
handle_connect_to_manager_response,
handle_create_instance_response,
)
.in_set(UiStateSystems),
(should_transition, draw_ui.after(UiStateSystems))
.run_if(in_state(GameModeState::TitleScreen)),
),
Expand All @@ -50,13 +58,14 @@ impl Plugin for TitleScreenPlugin {
mod ui {
use bevy::{ecs::system::Resource, prelude::default};

use crate::netcode::ConnectToManagerRequest;
use crate::netcode::{ConnectToManagerRequest, CreateInstanceRequest};

use super::*;

/// [`EventWriter`]s needed by the UI.
pub(super) struct EventWriters<'a> {
pub connect_to_manager: EventWriter<'a, ConnectToManagerRequest>,
pub create_instance: EventWriter<'a, CreateInstanceRequest>,
}

impl EventWriters<'_> {
Expand All @@ -78,6 +87,12 @@ mod ui {
}
}

impl UiEvent for CreateInstanceRequest {
fn send(self, evw: &mut EventWriters<'_>) {
evw.create_instance.send(self);
}
}

#[derive(Debug, Default, Resource, Reflect, Eq, PartialEq, derive_more::From)]
pub enum TitleScreen {
#[default]
Expand Down Expand Up @@ -117,7 +132,9 @@ mod ui {
manager_url: String,
},
Connected,
InstanceCreated,
InstanceCreated {
instance_url: String,
},
}

impl CreateInstance {
Expand Down Expand Up @@ -178,7 +195,18 @@ mod ui {
ui.spinner();
self.into()
}
CreateInstance::InstanceCreated => todo!(),
CreateInstance::InstanceCreated {
ref mut instance_url,
} => {
ui.label("Created instance!");
ui.label(&*instance_url);
ui.output_mut(|o| instance_url.clone_into(&mut o.copied_text));
if ui.button("Join Instance").clicked() {
// TODO: Spawn event to connect to instance
return JoinInstance::WaitingForConnection.into();
}
self.into()
}
}
}
}
Expand Down Expand Up @@ -211,6 +239,7 @@ mod ui {
.hint_text("Instance Url"),
);
if ui.button("Submit").clicked() {
// TODO: spawn event for joining instance
return JoinInstance::WaitingForConnection.into();
}
if ui.button("Back").clicked() {
Expand Down Expand Up @@ -247,13 +276,16 @@ struct UiStateSystems;
fn handle_connect_to_manager_response(
mut ui_state: ResMut<ui::TitleScreen>,
mut connect_to_manager_response: EventReader<ConnectToManagerResponse>,
mut create_instance_request: EventWriter<CreateInstanceRequest>,
) {
let ui::TitleScreen::Create(ref mut create_state) = *ui_state else {
return;
};
for response in connect_to_manager_response.read() {
trace!("handling ConnectToManagerResponse");
let Err(ref _response_err) = response.0 else {
*create_state = ui::CreateInstance::Connected;
create_instance_request.send(CreateInstanceRequest);
continue;
};
if let ui::CreateInstance::WaitingForConnection {
Expand All @@ -268,18 +300,51 @@ fn handle_connect_to_manager_response(
}
}

fn handle_create_instance_response(
manager: Option<Res<crate::netcode::NetcodeManager>>,
mut ui_state: ResMut<ui::TitleScreen>,
mut create_instance_response: EventReader<CreateInstanceResponse>,
) {
let ui::TitleScreen::Create(ref mut create_state) = *ui_state else {
return;
};
for response in create_instance_response.read() {
trace!("handling CreateInstanceResponse");
match &response.0 {
Ok(url) => {
*create_state = ui::CreateInstance::InstanceCreated {
instance_url: url.to_string(),
}
}
Err(_err) => {
*create_state = ui::CreateInstance::Initial {
error_msg: "error while creating instance".to_owned(),
manager_url: manager
.as_ref()
.map(|m| m.url().to_string())
.unwrap_or_default(),
}
}
}
}
}

fn draw_ui(
mut state: ResMut<ui::TitleScreen>,
mut contexts: EguiContexts,
connect_to_manager: EventWriter<ConnectToManagerRequest>,
create_instance: EventWriter<CreateInstanceRequest>,
) {
egui::Window::new("Instances")
.resizable(false)
.movable(false)
.collapsible(false)
.anchor(egui::Align2::CENTER_CENTER, egui::Vec2::ZERO)
.show(contexts.ctx_mut(), |ui: &mut egui::Ui| {
let evw = EventWriters { connect_to_manager };
let evw = EventWriters {
connect_to_manager,
create_instance,
};
// need ownership of state, so replace with the default temporarily
let stolen = std::mem::take(state.as_mut());
*state = stolen.draw(ui, evw);
Expand Down
9 changes: 7 additions & 2 deletions crates/replicate/client/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Framed = replicate_common::Framed<wtransport::stream::BiStream, Cb, Sb>;
#[derive(Debug)]
pub struct Manager {
_conn: wtransport::Connection,
_url: Url,
url: Url,
task: tokio::task::JoinHandle<Result<()>>,
request_tx: mpsc::Sender<(Sb, oneshot::Sender<Cb>)>,
}
Expand Down Expand Up @@ -80,7 +80,7 @@ impl Manager {

Ok(Self {
_conn: conn,
_url: url,
url,
task,
request_tx,
})
Expand Down Expand Up @@ -121,6 +121,11 @@ impl Manager {
.wrap_err("panic in manager task, file a bug report on github uwu")?
.wrap_err("error in task")
}

/// The url of this Manager.
pub fn url(&self) -> &Url {
&self.url
}
}

async fn manager_task(
Expand Down

0 comments on commit 56aab4d

Please sign in to comment.