From d7929225c679fb9e9960df4b15e553ecc74c57e8 Mon Sep 17 00:00:00 2001 From: John Eriksson Date: Wed, 17 Jan 2024 23:37:36 +0100 Subject: [PATCH] Alternative loop 1 --- Cargo.lock | 94 ++++++++++++++++++++++++++++++++++++++++++----- Cargo.toml | 2 + src/hqm_server.rs | 53 ++++++++++++++------------ 3 files changed, 116 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18a2031..ac169cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,6 +74,28 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2 1.0.70", + "quote 1.0.33", + "syn 2.0.39", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -251,43 +273,93 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2 1.0.70", + "quote 1.0.33", + "syn 2.0.39", +] [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -547,8 +619,10 @@ name = "migo-hqm-server" version = "1.8.0" dependencies = [ "arr_macro", + "async-stream", "bytes", "chrono", + "futures", "nalgebra", "reqwest", "rust-ini", diff --git a/Cargo.toml b/Cargo.toml index 5061a8a..62998df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ chrono = "0.4" uuid = { version = "1.4", features = ["v4"] } smallvec = { version = "1.11", features = ["union", "const_generics"]} systemctl = "0.1.9" +futures = "0.3" +async-stream = { version = "0.3.5", features = [] } [profile.dev] opt-level = 2 diff --git a/src/hqm_server.rs b/src/hqm_server.rs index 287dac5..98e4ec6 100644 --- a/src/hqm_server.rs +++ b/src/hqm_server.rs @@ -21,6 +21,9 @@ use tokio::time::MissedTickBehavior; use tracing::{info, warn}; use uuid::Uuid; +use async_stream::stream; +use futures::{StreamExt}; + use crate::hqm_game::{ HQMGameValues, HQMGameWorld, HQMObjectIndex, HQMPhysicsConfiguration, HQMPlayerInput, HQMRulesState, HQMSkater, HQMSkaterHand, @@ -33,11 +36,6 @@ use crate::hqm_parse::{ pub(crate) const GAME_HEADER: &[u8] = b"Hock"; -struct HQMServerReceivedData { - addr: SocketAddr, - data: HQMClientToServerMessage, -} - #[derive(Copy, Clone, PartialEq, Eq)] pub enum HQMClientVersion { Vanilla, @@ -1485,11 +1483,21 @@ pub async fn run_server( } }); } - let (msg_sender, mut msg_receiver) = tokio::sync::mpsc::channel(256); - { - let socket = socket.clone(); + enum Msg { + Time, + Message(SocketAddr, HQMClientToServerMessage), + } - tokio::spawn(async move { + let timeout_stream = stream! { + loop { + tick_timer.tick().await; + yield Msg::Time; + } + }; + tokio::pin!(timeout_stream); + let packet_stream = { + let socket = socket.clone(); + stream! { let mut buf = BytesMut::with_capacity(512); let codec = HQMMessageCodec; loop { @@ -1498,30 +1506,29 @@ pub async fn run_server( match socket.recv_buf_from(&mut buf).await { Ok((_, addr)) => { if let Ok(data) = codec.parse_message(&buf) { - let _ = msg_sender.send(HQMServerReceivedData { addr, data }).await; + yield Msg::Message(addr, data) } } Err(_) => {} } } - }); + } }; + tokio::pin!(packet_stream); + + let mut stream = futures::stream_select!(timeout_stream, packet_stream); let mut write_buf = BytesMut::with_capacity(4096); - loop { - tokio::select! { - _ = tick_timer.tick() => { - server.tick(& socket, & mut behaviour, & mut write_buf).await; - } - x = msg_receiver.recv() => { - if let Some (HQMServerReceivedData { - addr, - data - }) = x { - server.handle_message(addr, & socket, data, & mut behaviour, & mut write_buf).await; - } + while let Some(msg) = stream.next().await { + match msg { + Msg::Time => server.tick(&socket, &mut behaviour, &mut write_buf).await, + Msg::Message(addr, data) => { + server + .handle_message(addr, &socket, data, &mut behaviour, &mut write_buf) + .await } } } + Ok(()) } async fn send_updates(