Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

physnet: Events and UI for creating an instance #114

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 76 additions & 13 deletions apps/networked_physics_demo/client/src/netcode.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::ops::{Deref, DerefMut};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
};

use bevy::{
app::{Plugin, PostUpdate, PreUpdate, Update},
Expand All @@ -7,15 +10,19 @@ use bevy::{
entity::Entity,
event::{Event, EventReader, EventWriter},
query::{Added, With, Without},
schedule::{common_conditions::resource_exists, IntoSystemConfigs as _},
system::{CommandQueue, Commands, Query, Res, ResMut, Resource},
world::World,
},
log::{error, trace},
log::{debug, error, trace},
tasks::IoTaskPool,
transform::components::{GlobalTransform, Transform},
};
use color_eyre::eyre::{Result, WrapErr as _};
use replicate_client::common::data_model::{DataModel, Entity as DmEntity};
use replicate_client::{
common::data_model::{DataModel, Entity as DmEntity},
url::Url,
};
use tokio::sync::mpsc;

const BOUNDED_CHAN_COMMAND_QUEUE_SIZE: usize = 16;
Expand All @@ -27,16 +34,25 @@ impl Plugin for NetcodePlugin {
fn build(&self, app: &mut bevy::prelude::App) {
app.add_event::<ConnectToManagerRequest>()
.add_event::<ConnectToManagerResponse>()
.add_event::<CreateInstanceRequest>()
.add_event::<CreateInstanceResponse>()
.init_resource::<CommandQueueChannel>()
.init_resource::<NetcodeDataModel>()
.add_systems(PreUpdate, (apply_queued_commands, from_data_model))
.add_systems(PostUpdate, (spawn_entities, to_data_model))
.add_systems(Update, on_connect_to_manager_evt);
.add_systems(
Update,
(
handle_connect_to_manager_evt,
handle_create_instance_evt
.run_if(resource_exists::<NetcodeManager>),
),
);
}
}

#[derive(Debug, Resource)]
struct NetcodeManager(#[allow(unused)] replicate_client::manager::Manager);
#[derive(Debug, Resource, derive_more::Deref)]
pub struct NetcodeManager(Arc<replicate_client::manager::Manager>);

/// Convenient way to receive commands sent from the async tasks.
#[derive(Debug, Resource)]
Expand Down Expand Up @@ -65,14 +81,14 @@ fn apply_queued_commands(
#[derive(Debug, Event, Eq, PartialEq)]
pub struct ConnectToManagerRequest {
/// The URL of the manager to connect to. If `None`, locally host.
pub manager_url: Option<replicate_client::url::Url>,
pub manager_url: Option<Url>,
}

/// Produced in response to [`ConnectToManagerRequest`].
#[derive(Debug, Event)]
pub struct ConnectToManagerResponse(pub Result<()>);

fn on_connect_to_manager_evt(
fn handle_connect_to_manager_evt(
command_queue: Res<CommandQueueChannel>,
mut request: EventReader<ConnectToManagerRequest>,
mut response: EventWriter<ConnectToManagerResponse>,
Expand All @@ -82,9 +98,10 @@ fn on_connect_to_manager_evt(
response.send(ConnectToManagerResponse(Ok(())));
continue;
};
let pool = IoTaskPool::get();
let manager_url = manager_url.to_owned();
let tx = command_queue.tx.clone();
let pool = IoTaskPool::get();
debug!("spawned async task for connecting to manager");
pool.spawn(async_compat::Compat::new(async move {
let connect_result =
replicate_client::manager::Manager::connect(manager_url, None)
Expand All @@ -98,20 +115,66 @@ fn on_connect_to_manager_evt(
// async code.
let mut queue = CommandQueue::default();
let response_event = ConnectToManagerResponse(connect_result.map(|mngr| {
queue.push(|w: &mut World| w.insert_resource(NetcodeManager(mngr)));
queue.push(|w: &mut World| {
w.insert_resource(NetcodeManager(Arc::new(mngr)))
});
}));
queue.push(|w: &mut World| {
w.send_event(response_event).expect("failed to send event");
});
match tx.send(queue).await {
Ok(()) | Err(mpsc::error::SendError(_)) => (),
}
let _ = tx.send(queue).await;
}))
// We don't need to explicitly retrieve the return value.
.detach();
}
}

/// Other plugins can send this to create and then connect to a new instance.
#[derive(Debug, Event, Eq, PartialEq)]
pub struct CreateInstanceRequest;

/// Produced in response to [`CreateInstanceRequest`].
#[derive(Debug, Event)]
pub struct CreateInstanceResponse(pub Result<Url>);

fn handle_create_instance_evt(
command_queue: Res<CommandQueueChannel>,
manager: Res<NetcodeManager>,
mut request: EventReader<CreateInstanceRequest>,
) {
for _ in request.read() {
let mngr = manager.0.clone();
let url_fut = async move {
let id = mngr
.instance_create()
.await
.wrap_err("failed to create instance")?;
mngr.instance_url(id)
.await
.wrap_err("failed to get instance url")
};
let tx = command_queue.tx.clone();
let pool = IoTaskPool::get();
debug!("spawned async task for creating instance");
pool.spawn(async_compat::Compat::new(async move {
let url_result = url_fut.await;
if let Err(ref err) = url_result {
error!("{err:?}");
}

// We use a command queue to enqueue commands back to bevy from the
// async code.
let mut queue = CommandQueue::default();
let response = CreateInstanceResponse(url_result);
queue.push(|w: &mut World| {
w.send_event(response).expect("failed to send event");
});
let _ = tx.send(queue).await;
}))
.detach()
}
}

/// Add this to entities that should be synchronized over the network
#[derive(Debug, Eq, PartialEq, Component)]
pub struct Synchronized(pub DmEntity);
Expand Down
77 changes: 71 additions & 6 deletions apps/networked_physics_demo/client/src/title_screen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ use bevy::{
system::{Commands, Res, ResMut},
},
input::{keyboard::KeyCode, ButtonInput},
log::trace,
reflect::Reflect,
};
use bevy_inspector_egui::bevy_egui::{egui, EguiContexts};

use crate::{
netcode::{ConnectToManagerRequest, ConnectToManagerResponse},
netcode::{
ConnectToManagerRequest, ConnectToManagerResponse, CreateInstanceRequest,
CreateInstanceResponse,
},
AppExt, GameModeState,
};

Expand All @@ -38,7 +42,11 @@ impl Plugin for TitleScreenPlugin {
.add_systems(
Update,
(
handle_connect_to_manager_response.in_set(UiStateSystems),
(
handle_connect_to_manager_response,
handle_create_instance_response,
)
.in_set(UiStateSystems),
(should_transition, draw_ui.after(UiStateSystems))
.run_if(in_state(GameModeState::TitleScreen)),
),
Expand All @@ -50,13 +58,14 @@ impl Plugin for TitleScreenPlugin {
mod ui {
use bevy::{ecs::system::Resource, prelude::default};

use crate::netcode::ConnectToManagerRequest;
use crate::netcode::{ConnectToManagerRequest, CreateInstanceRequest};

use super::*;

/// [`EventWriter`]s needed by the UI.
pub(super) struct EventWriters<'a> {
pub connect_to_manager: EventWriter<'a, ConnectToManagerRequest>,
pub create_instance: EventWriter<'a, CreateInstanceRequest>,
}

impl EventWriters<'_> {
Expand All @@ -78,6 +87,12 @@ mod ui {
}
}

impl UiEvent for CreateInstanceRequest {
fn send(self, evw: &mut EventWriters<'_>) {
evw.create_instance.send(self);
}
}

#[derive(Debug, Default, Resource, Reflect, Eq, PartialEq, derive_more::From)]
pub enum TitleScreen {
#[default]
Expand Down Expand Up @@ -117,7 +132,9 @@ mod ui {
manager_url: String,
},
Connected,
InstanceCreated,
InstanceCreated {
instance_url: String,
},
}

impl CreateInstance {
Expand Down Expand Up @@ -178,7 +195,18 @@ mod ui {
ui.spinner();
self.into()
}
CreateInstance::InstanceCreated => todo!(),
CreateInstance::InstanceCreated {
ref mut instance_url,
} => {
ui.label("Created instance!");
ui.label(&*instance_url);
ui.output_mut(|o| instance_url.clone_into(&mut o.copied_text));
if ui.button("Join Instance").clicked() {
// TODO: Spawn event to connect to instance
return JoinInstance::WaitingForConnection.into();
}
self.into()
}
}
}
}
Expand Down Expand Up @@ -211,6 +239,7 @@ mod ui {
.hint_text("Instance Url"),
);
if ui.button("Submit").clicked() {
// TODO: spawn event for joining instance
return JoinInstance::WaitingForConnection.into();
}
if ui.button("Back").clicked() {
Expand Down Expand Up @@ -247,13 +276,16 @@ struct UiStateSystems;
fn handle_connect_to_manager_response(
mut ui_state: ResMut<ui::TitleScreen>,
mut connect_to_manager_response: EventReader<ConnectToManagerResponse>,
mut create_instance_request: EventWriter<CreateInstanceRequest>,
) {
let ui::TitleScreen::Create(ref mut create_state) = *ui_state else {
return;
};
for response in connect_to_manager_response.read() {
trace!("handling ConnectToManagerResponse");
let Err(ref _response_err) = response.0 else {
*create_state = ui::CreateInstance::Connected;
create_instance_request.send(CreateInstanceRequest);
continue;
};
if let ui::CreateInstance::WaitingForConnection {
Expand All @@ -268,18 +300,51 @@ fn handle_connect_to_manager_response(
}
}

fn handle_create_instance_response(
manager: Option<Res<crate::netcode::NetcodeManager>>,
mut ui_state: ResMut<ui::TitleScreen>,
mut create_instance_response: EventReader<CreateInstanceResponse>,
) {
let ui::TitleScreen::Create(ref mut create_state) = *ui_state else {
return;
};
for response in create_instance_response.read() {
trace!("handling CreateInstanceResponse");
match &response.0 {
Ok(url) => {
*create_state = ui::CreateInstance::InstanceCreated {
instance_url: url.to_string(),
}
}
Err(_err) => {
*create_state = ui::CreateInstance::Initial {
error_msg: "error while creating instance".to_owned(),
manager_url: manager
.as_ref()
.map(|m| m.url().to_string())
.unwrap_or_default(),
}
}
}
}
}

fn draw_ui(
mut state: ResMut<ui::TitleScreen>,
mut contexts: EguiContexts,
connect_to_manager: EventWriter<ConnectToManagerRequest>,
create_instance: EventWriter<CreateInstanceRequest>,
) {
egui::Window::new("Instances")
.resizable(false)
.movable(false)
.collapsible(false)
.anchor(egui::Align2::CENTER_CENTER, egui::Vec2::ZERO)
.show(contexts.ctx_mut(), |ui: &mut egui::Ui| {
let evw = EventWriters { connect_to_manager };
let evw = EventWriters {
connect_to_manager,
create_instance,
};
// need ownership of state, so replace with the default temporarily
let stolen = std::mem::take(state.as_mut());
*state = stolen.draw(ui, evw);
Expand Down
9 changes: 7 additions & 2 deletions crates/replicate/client/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Framed = replicate_common::Framed<wtransport::stream::BiStream, Cb, Sb>;
#[derive(Debug)]
pub struct Manager {
_conn: wtransport::Connection,
_url: Url,
url: Url,
task: tokio::task::JoinHandle<Result<()>>,
request_tx: mpsc::Sender<(Sb, oneshot::Sender<Cb>)>,
}
Expand Down Expand Up @@ -80,7 +80,7 @@ impl Manager {

Ok(Self {
_conn: conn,
_url: url,
url,
task,
request_tx,
})
Expand Down Expand Up @@ -121,6 +121,11 @@ impl Manager {
.wrap_err("panic in manager task, file a bug report on github uwu")?
.wrap_err("error in task")
}

/// The url of this Manager.
pub fn url(&self) -> &Url {
&self.url
}
}

async fn manager_task(
Expand Down
Loading