Skip to content

Commit

Permalink
Alternative loop 1
Browse files Browse the repository at this point in the history
  • Loading branch information
migomipo committed Jan 18, 2024
1 parent 446d5ae commit d792922
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 33 deletions.
94 changes: 84 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ chrono = "0.4"
uuid = { version = "1.4", features = ["v4"] }
smallvec = { version = "1.11", features = ["union", "const_generics"]}
systemctl = "0.1.9"
futures = "0.3"
async-stream = { version = "0.3.5", features = [] }

[profile.dev]
opt-level = 2
Expand Down
53 changes: 30 additions & 23 deletions src/hqm_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use tokio::time::MissedTickBehavior;
use tracing::{info, warn};
use uuid::Uuid;

use async_stream::stream;
use futures::{StreamExt};

use crate::hqm_game::{
HQMGameValues, HQMGameWorld, HQMObjectIndex, HQMPhysicsConfiguration, HQMPlayerInput,
HQMRulesState, HQMSkater, HQMSkaterHand,
Expand All @@ -33,11 +36,6 @@ use crate::hqm_parse::{

pub(crate) const GAME_HEADER: &[u8] = b"Hock";

struct HQMServerReceivedData {
addr: SocketAddr,
data: HQMClientToServerMessage,
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub enum HQMClientVersion {
Vanilla,
Expand Down Expand Up @@ -1485,11 +1483,21 @@ pub async fn run_server<B: HQMServerBehaviour>(
}
});
}
let (msg_sender, mut msg_receiver) = tokio::sync::mpsc::channel(256);
{
let socket = socket.clone();
enum Msg {
Time,
Message(SocketAddr, HQMClientToServerMessage),
}

tokio::spawn(async move {
let timeout_stream = stream! {
loop {
tick_timer.tick().await;
yield Msg::Time;
}
};
tokio::pin!(timeout_stream);
let packet_stream = {
let socket = socket.clone();
stream! {
let mut buf = BytesMut::with_capacity(512);
let codec = HQMMessageCodec;
loop {
Expand All @@ -1498,30 +1506,29 @@ pub async fn run_server<B: HQMServerBehaviour>(
match socket.recv_buf_from(&mut buf).await {
Ok((_, addr)) => {
if let Ok(data) = codec.parse_message(&buf) {
let _ = msg_sender.send(HQMServerReceivedData { addr, data }).await;
yield Msg::Message(addr, data)
}
}
Err(_) => {}
}
}
});
}
};
tokio::pin!(packet_stream);

let mut stream = futures::stream_select!(timeout_stream, packet_stream);
let mut write_buf = BytesMut::with_capacity(4096);
loop {
tokio::select! {
_ = tick_timer.tick() => {
server.tick(& socket, & mut behaviour, & mut write_buf).await;
}
x = msg_receiver.recv() => {
if let Some (HQMServerReceivedData {
addr,
data
}) = x {
server.handle_message(addr, & socket, data, & mut behaviour, & mut write_buf).await;
}
while let Some(msg) = stream.next().await {
match msg {
Msg::Time => server.tick(&socket, &mut behaviour, &mut write_buf).await,
Msg::Message(addr, data) => {
server
.handle_message(addr, &socket, data, &mut behaviour, &mut write_buf)
.await
}
}
}
Ok(())
}

async fn send_updates(
Expand Down

0 comments on commit d792922

Please sign in to comment.