From fed2799aca144bb38da272adb0caf52ccd38c680 Mon Sep 17 00:00:00 2001 From: Makosai Date: Tue, 5 Nov 2024 22:16:19 -0500 Subject: [PATCH] Improve the MasterServer,Client, and add Events + DashMap. --- rust/Cargo.toml | 1 + rust/src/app/master.rs | 12 +- rust/src/clients/client.rs | 12 +- rust/src/events/mod.rs | 5 + rust/src/lib.rs | 2 + rust/src/macros/defer.rs | 15 + .../{utils/macros.rs => macros/logging.rs} | 4 +- rust/src/macros/mod.rs | 2 + rust/src/master/master_server.rs | 11 +- rust/src/network/packet.rs | 187 +++++++- rust/src/transport/base_client.rs | 33 +- rust/src/transport/base_server.rs | 420 ++++++++++++++++-- rust/src/transport/thread_manager.rs | 16 +- rust/src/utils/constants.rs | 2 +- rust/src/utils/mod.rs | 1 - 15 files changed, 636 insertions(+), 87 deletions(-) create mode 100644 rust/src/events/mod.rs create mode 100644 rust/src/macros/defer.rs rename rust/src/{utils/macros.rs => macros/logging.rs} (97%) create mode 100644 rust/src/macros/mod.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index b09c028..d223ae9 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -22,6 +22,7 @@ path = "src/bin/client.rs" [dependencies] ctrlc = "3.4.5" +dashmap = "6.1.0" lazy_static = "1.5.0" num_cpus = "1.16.0" quaternion = "2.0.0" diff --git a/rust/src/app/master.rs b/rust/src/app/master.rs index 7d2c605..8ff2dc0 100644 --- a/rust/src/app/master.rs +++ b/rust/src/app/master.rs @@ -3,7 +3,7 @@ //! 2. Accept clients and authenticate or sign them up. //! 3. Send clients to either a low population cluster or a specific one if provided. -use crate::{ master_debug, master_info, master_success, master_warning }; +use crate::{ master::MasterServer, master_debug, master_info, master_success, master_warning }; use tokio::{ io::{ AsyncBufReadExt, AsyncWriteExt, BufReader }, @@ -15,17 +15,17 @@ use tokio::{ /// Start the Master Server and handles shutdown signals. pub async fn start() { let mut shutdown_rx = crate::app::shutdown_channel().unwrap(); - let mut is_running = true; + let mut master_server = MasterServer::new(None, None).await.unwrap(); select! { _ = shutdown_rx.recv() => { - is_running = false; + master_server.base.is_running = false; master_warning!("Shutting down..."); } - _ = run(&mut is_running) => {} + _ = master_server.start() => {} } - if !is_running { + if !master_server.base.is_running { cleanup().await; master_success!("Master Server has been shut down."); } @@ -39,7 +39,7 @@ async fn cleanup() { /// Entrypoint for the Master Server. #[inline(always)] -async fn run(is_running: &mut bool) { +async fn run(is_running: &bool) { master_info!("Starting the Master Server..."); // TODO: Read from config. diff --git a/rust/src/clients/client.rs b/rust/src/clients/client.rs index 331c4b5..2acc8c5 100644 --- a/rust/src/clients/client.rs +++ b/rust/src/clients/client.rs @@ -1,5 +1,8 @@ use std::{ net::Ipv4Addr, str::FromStr }; +use tokio::sync::mpsc::{self, Receiver, Sender}; + +use crate::events::Event; use crate::transport::BaseClient; use crate::utils::constants; use crate::world::ClusterInfo; @@ -24,12 +27,16 @@ pub struct Client { on_cluster_server_list: Vec>, base: BaseClient, + + event_receiver: Receiver, + event_sender: Sender, } impl Client { // TODO: ip string and port pub fn new(ip: Option, port: Option) -> Client { - let base_client = BaseClient::new(None); + let (event_sender, event_receiver) = mpsc::channel(100); // TODO: Could be a different channel type. + let base_client = BaseClient::new(None, None, event_sender.clone()); return Client { active_connection: ConnectionType::None, @@ -48,6 +55,9 @@ impl Client { on_cluster_server_list: vec![], base: base_client, + + event_receiver, + event_sender, }; } diff --git a/rust/src/events/mod.rs b/rust/src/events/mod.rs new file mode 100644 index 0000000..d081bf1 --- /dev/null +++ b/rust/src/events/mod.rs @@ -0,0 +1,5 @@ +pub enum Event { + Connection(u32), + Disconnection(u32), + ReceivedData(u32, Vec), +} \ No newline at end of file diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 5bac141..693ab17 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -1,6 +1,8 @@ pub mod app; pub mod clients; pub mod core; +pub mod events; +pub mod macros; pub mod master; pub mod network; pub mod options; diff --git a/rust/src/macros/defer.rs b/rust/src/macros/defer.rs new file mode 100644 index 0000000..67b5e9c --- /dev/null +++ b/rust/src/macros/defer.rs @@ -0,0 +1,15 @@ +pub struct ScopeCall { + pub c: F +} +impl Drop for ScopeCall { + fn drop(&mut self) { + (self.c)(); + } +} + +#[macro_export] +macro_rules! defer { + ($e:expr) => ( + let _scope_call = $crate::macros::defer::ScopeCall { c: || -> () { $e; } }; + ) +} \ No newline at end of file diff --git a/rust/src/utils/macros.rs b/rust/src/macros/logging.rs similarity index 97% rename from rust/src/utils/macros.rs rename to rust/src/macros/logging.rs index 19045fe..2aa3689 100644 --- a/rust/src/utils/macros.rs +++ b/rust/src/macros/logging.rs @@ -21,8 +21,8 @@ macro_rules! log_message { $($arg:tt)* ) => { { - use $crate::utils::macros::LogLevel; - use $crate::utils::macros::LogType; + use $crate::macros::logging::LogLevel; + use $crate::macros::logging::LogType; use $crate::utils::constants::{ TERMINAL_BLUE, TERMINAL_GREEN, diff --git a/rust/src/macros/mod.rs b/rust/src/macros/mod.rs new file mode 100644 index 0000000..2cbd754 --- /dev/null +++ b/rust/src/macros/mod.rs @@ -0,0 +1,2 @@ +pub mod defer; +pub mod logging; diff --git a/rust/src/master/master_server.rs b/rust/src/master/master_server.rs index 94a9f86..513ad7e 100644 --- a/rust/src/master/master_server.rs +++ b/rust/src/master/master_server.rs @@ -1,22 +1,23 @@ use crate::{ transport::{BaseServer, ServerType}, utils::constants }; pub struct MasterServer { - base: BaseServer, + pub base: BaseServer, } +#[derive(Debug)] pub enum MasterServerError { // TODO Implement } // TODO Implement impl MasterServer { - pub async fn new() -> Result { + pub async fn new(max_connections: Option, port: Option) -> Result { Ok(MasterServer { - base: BaseServer::new(ServerType::MasterServer, 0, Some(constants::MASTER_PORT)).await.unwrap(), + base: BaseServer::new(ServerType::MasterServer, max_connections, port).await.unwrap(), }) } - pub fn run(&self) { - println!("Master server started"); + pub async fn start(&mut self) { + self.base.start().await; } } diff --git a/rust/src/network/packet.rs b/rust/src/network/packet.rs index bf44900..40e5a0a 100644 --- a/rust/src/network/packet.rs +++ b/rust/src/network/packet.rs @@ -1,5 +1,18 @@ // TODO Implement +// List of possible errors +#[derive(Debug)] +pub enum PacketError { + ReadError, +} +impl From for String { + fn from(error: PacketError) -> String { + match error { + PacketError::ReadError => "Failed to read packet.".to_string(), + } + } +} + pub struct Packet { buffer: Vec, readable_buffer: Option>, @@ -30,7 +43,7 @@ impl Packet { packet } - //#region Packet Functions + // region: Packet Functions /// Sets the packet's content and prepares it to be read. pub fn set_bytes(&mut self, data: Vec) { self.buffer.clear(); @@ -67,9 +80,9 @@ impl Packet { self.read_pos = self.read_pos.saturating_sub(4); } } - //#endregion + // endregion - //#region Write Functions + // region: Write Functions pub fn write_byte(&mut self, data: u8) { self.buffer.push(data) } @@ -79,31 +92,175 @@ impl Packet { } pub fn write_short(&mut self, data: i16) { - let bytes = data.to_be_bytes(); - self.buffer.extend(&bytes); + self.buffer.extend(&data.to_be_bytes()); } pub fn write_ushort(&mut self, data: u16) { - let bytes = data.to_be_bytes(); - self.buffer.extend(&bytes); + self.buffer.extend(&data.to_be_bytes()); } pub fn write_int(&mut self, data: i32) { - let bytes = data.to_be_bytes(); - self.buffer.extend(&bytes); + self.buffer.extend(&data.to_be_bytes()); + } + + pub fn write_uint(&mut self, data: u32) { + self.buffer.extend(&data.to_be_bytes()); + } + + pub fn write_long(&mut self, data: i64) { + self.buffer.extend(&data.to_be_bytes()); + } + + pub fn write_float(&mut self, data: f32) { + self.buffer.extend(&data.to_be_bytes()); + } + + pub fn write_double(&mut self, data: f64) { + self.buffer.extend(&data.to_be_bytes()); } - //#endregion - //#region Read Functions - // TODO Implement - //endregion + pub fn write_bool(&mut self, data: bool) { + self.buffer.push(data as u8); + } + + pub fn write_string(&mut self, data: String) { + self.write_int(data.len() as i32); + self.write_bytes(data.into_bytes()); + } + // endregion + + // region: Read Functions + /// Reads a byte from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// + /// Returns the byte that was read. + pub fn read_byte(&mut self, move_read_pos: Option) -> Result { + // Check if there are bytes to read. + if self.buffer.len() < self.read_pos + 1 { + return Err(PacketError::ReadError); + } + + let data = self.buffer[self.read_pos]; // Get the byte at the current read_pos. + + if move_read_pos.unwrap_or(true) { + self.read_pos += 1; + } + + Ok(data) + } + + /// Reads a range of bytes from the packet. + /// + /// * `length` - The length of the array to read. + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// + /// Returns the rarnge of bytes that were read. + pub fn read_bytes(&mut self, length: usize, move_read_pos: Option) -> Result, PacketError> { + // Check if there are bytes to read. + if self.buffer.len() < self.read_pos + length { + return Err(PacketError::ReadError); + } + + let data = self.buffer[self.read_pos..self.read_pos + length].to_vec(); + + if move_read_pos.unwrap_or(true) { + self.read_pos += length; + } + + Ok(data) + } + + /// Reads a short from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// Returns the short that was read. + pub fn read_short(&mut self, move_read_pos: Option) -> Result { + let data = self.read_bytes(2, move_read_pos)?; + Ok(i16::from_be_bytes(data.try_into().unwrap())) + } + + /// Reads an unsigned short from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// Returns the unsigned short that was read. + pub fn read_ushort(&mut self, move_read_pos: Option) -> Result { + let data = self.read_bytes(2, move_read_pos)?; + Ok(u16::from_be_bytes(data.try_into().unwrap())) + } + + /// Reads an integer from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// Returns the integer that was read. + pub fn read_int(&mut self, move_read_pos: Option) -> Result { + let data = self.read_bytes(4, move_read_pos)?; + Ok(i32::from_be_bytes(data.try_into().unwrap())) + } + + /// Reads a unsigned integer from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// Returns the unsigned integer that was read. + pub fn read_uint(&mut self, move_read_pos: Option) -> Result { + let data = self.read_bytes(4, move_read_pos)?; + Ok(u32::from_be_bytes(data.try_into().unwrap())) + } + + /// Reads a long from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// Returns the long that was read. + pub fn read_long(&mut self, move_read_pos: Option) -> Result { + let data = self.read_bytes(8, move_read_pos)?; + Ok(i64::from_be_bytes(data.try_into().unwrap())) + } + + + /// Reads a float from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// Returns the float that was read. + pub fn read_float(&mut self, move_read_pos: Option) -> Result { + let data = self.read_bytes(4, move_read_pos)?; + Ok(f32::from_be_bytes(data.try_into().unwrap())) + } + + /// Reads a double from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// Returns the double that was read. + pub fn read_double(&mut self, move_read_pos: Option) -> Result { + let data = self.read_bytes(8, move_read_pos)?; + Ok(f64::from_be_bytes(data.try_into().unwrap())) + } + + /// Reads a boolean from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// Returns the boolean that was read. + pub fn read_bool(&mut self, move_read_pos: Option) -> Result { + let data = self.read_byte(move_read_pos)?; + Ok(data != 0) + } + + /// Reads a string from the packet. + /// + /// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true. + /// Returns the string that was read. + pub fn read_string(&mut self, move_read_pos: Option) -> Result { + let length = self.read_int(move_read_pos)? as usize; + let data = self.read_bytes(length, move_read_pos)?; + Ok(String::from_utf8(data).unwrap()) + } + // endregion - //#region Memory Functions + // region: Memory Functions /// Deinitializes the packet. pub fn deinit(&mut self) { self.buffer.clear(); self.readable_buffer = None; self.read_pos = 0; } - //#endregion + // endregion } diff --git a/rust/src/transport/base_client.rs b/rust/src/transport/base_client.rs index a543d09..a9fb54d 100644 --- a/rust/src/transport/base_client.rs +++ b/rust/src/transport/base_client.rs @@ -1,30 +1,32 @@ use tokio::io::{ self, AsyncReadExt, AsyncWriteExt }; use tokio::net::{ TcpStream, UdpSocket }; +use tokio::sync::mpsc::Sender; use crate::core::spawning::Player; +use crate::events::Event; use crate::network::Packet; use crate::transport::Protocols; +use super::BaseServer; + pub struct BaseClient { - id: Option, - name: Option, + pub id: Option, + pub name: Option, - tcp: TcpHandler, - udp: UdpHandler, + pub tcp: TcpHandler, + pub udp: UdpHandler, - received_data: Packet, + pub(crate) received_data: Packet, - on_connected: Vec>, - on_disconnected: Vec>, - on_received: Vec>, + pub player: Option, - player: Option, + pub event_sender: Sender, } impl BaseClient { pub const BUFFER_SIZE: usize = 4096; - pub fn new(id: Option) -> Self { + pub fn new(id: Option, name: Option, event_sender: Sender) -> Self { BaseClient { id, name: None, @@ -37,11 +39,9 @@ impl BaseClient { received_data: Packet::new(), - on_connected: vec![], - on_disconnected: vec![], - on_received: vec![], - player: None, + + event_sender, } } @@ -50,16 +50,13 @@ impl BaseClient { self.tcp.deinit(); self.udp.deinit(); self.received_data.deinit(); - self.on_connected.clear(); - self.on_disconnected.clear(); - self.on_received.clear(); } } /// Handles events for connecting, receiving, and debugging. /// Also controls the socket connection. pub struct TcpHandler { - socket: Option, + pub(crate) socket: Option, receive_buffer: Option<[u8; BaseClient::BUFFER_SIZE]>, } diff --git a/rust/src/transport/base_server.rs b/rust/src/transport/base_server.rs index db63e89..e87e397 100644 --- a/rust/src/transport/base_server.rs +++ b/rust/src/transport/base_server.rs @@ -1,96 +1,452 @@ -use std::collections::HashMap; +use std::borrow::BorrowMut; +use std::collections::BTreeSet; +use std::sync::Arc; -use tokio::net::TcpListener; +use dashmap::DashMap; +use tokio::io::{ AsyncBufReadExt, AsyncWriteExt, BufReader }; +use tokio::net::{ TcpListener, TcpStream }; +use tokio::sync::mpsc::{ Receiver, Sender }; +use tokio::sync::{ broadcast, mpsc, Mutex }; +use crate::events::Event; +use crate::network::Packet; use crate::utils::constants; use crate::utils::constants::DEFAULT_IP; -use super::BaseClient; +use super::{ BaseClient, Protocols, ThreadManager }; +#[derive(Clone, Copy, Debug)] pub enum ServerType { ClusterServer, MasterServer, } +impl From for String { + fn from(server_type: ServerType) -> String { + match server_type { + ServerType::ClusterServer => "Cluster Server".to_string(), + ServerType::MasterServer => "Master Server".to_string(), + } + } +} // List of possible errors #[derive(Debug)] pub enum BaseServerError { ClientMissing, + ClientMissingID, + TCPError, + AddClientError, } - impl From for String { fn from(error: BaseServerError) -> String { match error { BaseServerError::ClientMissing => "Client is missing".to_string(), + BaseServerError::ClientMissingID => "Client is missing an ID".to_string(), + BaseServerError::TCPError => "Failed to read TCP data".to_string(), + BaseServerError::AddClientError => "Failed to add client".to_string(), } } } +type PacketHandler = fn(from_client: u32, packet: u32); -type PacketHandler = fn(from_client: i32, packet: i32); - +/// The base of all server types. Takes in clients. pub struct BaseServer { + pub is_running: bool, + // Packet Handlers - pub packet_handlers: Option>, + pub packet_handlers: DashMap, // Network tcp_listener: TcpListener, // UDP equivalent is in BaseClient.UdpHandler.socket // Server Info - server_type: ServerType, - server_type_name: String, - max_connections: u32, - port: u16, + pub server_type: ServerType, + pub max_connections: u32, + pub port: u16, // Data - clients: HashMap, - released_ids: Vec, + pub clients: DashMap, + pub released_ids: Arc>>, // Events - on_connection: Vec>, - on_disconnection: Vec>, - on_received: Vec>, + pub event_receiver: mpsc::Receiver, + pub event_sender: mpsc::Sender, } impl BaseServer { + /// Creates a new BaseServer. Defaults the port to `utils::constants::MASTER_PORT`. pub async fn new( server_type: ServerType, - max_connections: u32, + max_connections: Option, port: Option ) -> Result { + let max_connections = max_connections.unwrap_or(0); let port = port.unwrap_or(constants::MASTER_PORT); - let server_type_name = ( - match server_type { - ServerType::ClusterServer => "Cluster Server", - ServerType::MasterServer => "Master Server", - } - ).to_string(); + + let (event_sender, event_receiver) = mpsc::channel(100); Ok(BaseServer { - packet_handlers: None, + is_running: false, + + packet_handlers: DashMap::new(), tcp_listener: TcpListener::bind(format!("{DEFAULT_IP}:{port}")).await.unwrap(), server_type, - server_type_name, max_connections, port, - clients: HashMap::new(), - released_ids: Vec::new(), + clients: DashMap::new(), + released_ids: Arc::new(Mutex::new(BTreeSet::new())), - on_connection: Vec::new(), - on_disconnection: Vec::new(), - on_received: Vec::new(), + event_receiver, + event_sender, }) } + // region: Connection Functions + /// Starts a server. pub async fn start(&mut self) { - println!("Starting the {}...", self.server_type_name); + self.is_running = true; + + { + let max_connections_str = match self.max_connections { + 0 => "unlimited".to_string(), + 1 => "max connection".to_string(), + _ => format!("max connections: {}", self.max_connections), + }; + + BaseServer::debug( + self.server_type, + format!( + "Starting the {:?} on port {} with {} max connections...", + self.server_type, + self.port, + max_connections_str + ) + ); + } + + self.listen().await; + } + + #[inline(always)] + async fn listen(&mut self) { + self.process_events(); + + let (tx, _rx) = broadcast::channel(10); + + BaseServer::success(self.server_type, "Now listening for connections.".to_string()); + + while self.is_running { + let (mut socket, addr) = self.tcp_listener.accept().await.unwrap(); + + BaseServer::debug(self.server_type, format!("Accepted connection from {:?}", addr)); + + self.on_tcp_connection(&socket).await.ok().expect("Failed to handle TCP connection."); + + let tx = tx.clone(); + let mut rx = tx.subscribe(); + + tokio::spawn(async move { + let (reader, mut writer) = socket.split(); + + let mut reader = BufReader::new(reader); + let mut line = String::new(); + + loop { + tokio::select! { + result = reader.read_line(&mut line) => { + // Break if the line is empty. + if result.unwrap() == 0 { + break; + } + + // Send the line to the channel. + tx.send((line.clone(), addr)).unwrap(); + line.clear(); + } + result = rx.recv() => { + // Write the message to the writer. + let (msg, msg_addr) = result.unwrap(); + + if addr != msg_addr { + writer.write_all(&msg.as_bytes()).await.unwrap(); + } + } + } + } + }); + } + } + + async fn on_tcp_connection(&mut self, client: &TcpStream) -> Result<(), BaseServerError> { + self.add_client(client).await.ok().expect("Failed to add client."); + + Ok(()) + } + + async fn on_udp_received(&mut self) -> Result<(), BaseServerError> { + todo!(); + } + + /// Adds a client to the server, generating or reusing an ID for them. + async fn add_client(&mut self, client: &TcpStream) -> Result<(), BaseServerError> { + let mut id: Option = None; + + { + let mut released_ids = self.released_ids.lock().await; + if self.max_connections == 0 || self.clients.len() < (self.max_connections as usize) { + // Loop until an ID is found. + while id.is_none() { + // If there are released IDs, use one. + if released_ids.len() > 0 { + id = released_ids.pop_last(); + if !self.clients.contains_key(&id.unwrap()) { + self.clients.insert(id.unwrap(), BaseClient::new(id, None, self.event_sender.clone())); + // Reserve this spot. + } else { + id = None; + } + continue; + } else { + // Assign the next highest client ID if there's no released IDs. + id = Some(self.clients.len() as u32); + + if !self.clients.contains_key(&id.unwrap()) { + self.clients.insert(id.unwrap(), BaseClient::new(id, None, self.event_sender.clone())); + // Reserve this spot here too. + } else { + id = None; + continue; + } + } + } + + { + // Check if the client was added successfully. + let mut client = match self.clients.get_mut(&id.unwrap()) { + Some(client) => client, + None => { + return Err(BaseServerError::ClientMissing); + } + }; + client.received_data = Packet::new(); + } + } + } + + // If the id was never reset. + // That means that a client may still exist. + if id != None { + self.disconnect_client(id.unwrap()); + } + + Ok(()) + } + + fn disconnect_client(&mut self, client_id: u32) { + self.clear_client(client_id); + } + + async fn clear_client(&mut self, client_id: u32) { + self.clients.remove(&client_id); + + if self.clients.len() == 0 { + self.released_ids.lock().await.clear(); + } else if self.clients.len() > (client_id as usize) { + // If the client is not the last one. + self.released_ids.lock().await.insert(client_id); + } + + BaseServer::debug(self.server_type, format!("Disconnected Client#{client_id}")); + } + // endregion - loop { + // region: Data Functions + async fn handle_tcp_data( + &self, + client: &mut BaseClient, + data: &[u8] + ) -> Result { + if client.id == None { + return Err(BaseServerError::ClientMissingID); + } + + let mut packet_length = 0; + + client.received_data.set_bytes(data.to_vec()); + + if client.received_data.unread_length() >= 4 { + packet_length = match client.received_data.read_uint(None) { + Ok(length) => length, + Err(_) => { + return Err(BaseServerError::TCPError); + } + }; + if packet_length <= 0 { + return Ok(true); + } + } + + while + packet_length > 0 && + packet_length <= client.received_data.unread_length().try_into().unwrap() + { + let packet_bytes = match + client.received_data.read_bytes((packet_length - 4).try_into().unwrap(), None) + { + Ok(bytes) => bytes, + Err(_) => { + return Err(BaseServerError::TCPError); + } + }; + + let thread_manager = ThreadManager::get_instance(); + + // TODO: Fix lifetime. + // thread_manager.execute_on_side_thread(Box::new(move || + { + if client.id == None { + return Ok(false); + } + + let mut packet = Packet::new_with_data(packet_bytes); + let packet_id = match packet.read_uint(None) { + Ok(id) => id, + Err(_) => { + return Err(BaseServerError::TCPError); + } + }; + + if !self.packet_handlers.contains_key(&packet_id) { + return Err(BaseServerError::TCPError); + } + + // Call the packet handler. + self.packet_handlers.get(&packet_id).unwrap()(client.id.unwrap(), packet_id); + } + // )); + + packet_length = 0; + + if client.received_data.unread_length() >= 4 { + packet_length = match client.received_data.read_uint(None) { + Ok(length) => length, + Err(_) => { + return Err(BaseServerError::TCPError); + } + }; + if packet_length <= 0 { + return Ok(true); + } + } + } + + if packet_length <= 1 { + return Ok(true); + } + + Ok(false) + } + // endregion + + // region: Event Functions + pub async fn process_events(&mut self) { + while let Some(event) = self.event_receiver.recv().await { + match event { + Event::Connection(id) => self.on_connection(id).await, + Event::Disconnection(id) => self.on_disconnection(id).await, + Event::ReceivedData(id, data) => self.on_received_data(id, &data).await, + } + } + } + pub async fn on_connection(&mut self, id: u32) { + BaseServer::debug(self.server_type, format!("Client#{} connected", id)); + } + + pub async fn on_disconnection(&mut self, id: u32) { + BaseServer::debug(self.server_type, format!("Client#{} disconnected", id)); + } + + pub async fn on_received_data(&mut self, id: u32, data: &[u8]) { + BaseServer::debug(self.server_type, format!("{:?} received data", self.server_type)); + } + + pub async fn on_client_connected(&mut self, id: u32) { + BaseServer::debug(self.server_type, format!("Client#{} connected", id)); + } + + pub async fn on_client_disconnected(&mut self, id: u32, protocol: Protocols) { + BaseServer::debug(self.server_type, format!("Client#{} disconnected", id)); + } + + pub async fn on_client_received_data(&mut self, id: u32, protocol: Protocols, data: &[u8]) { + BaseServer::debug(self.server_type, format!("Client#{} received data", id)); + + match protocol { + Protocols::TCP => { + // TODO: Instead of unwrapping, be mindful of thread safety and handle the error. + // let client = self.clients.get_mut(&id).unwrap(); + + let mut client_ref = match self.clients.get_mut(&id) { + Some(client) => client, + None => { + BaseServer::debug(self.server_type, "Client not found".to_string()); + return; + } + }; + let client = client_ref.value_mut(); + + let full_reset = match self.handle_tcp_data(client, data).await { + Ok(reset) => reset, + Err(e) => { + BaseServer::debug( + self.server_type, + format!("Error handling TCP data: {:?}", e) + ); + return; + } + }; + + client.received_data.reset(full_reset); + } + Protocols::UDP => { + // Extra things to do goes here. + return; + } + } + } + // endregion + + pub(crate) fn debug(server_type: ServerType, message: String) { + if !constants::DEBUGGING { + return; + } + + match server_type { + ServerType::ClusterServer => crate::cluster_debug!("{}", message), + ServerType::MasterServer => crate::master_debug!("{}", message), + } + } + + pub(crate) fn success(server_type: ServerType, message: String) { + match server_type { + ServerType::ClusterServer => crate::cluster_success!("{}", message), + ServerType::MasterServer => crate::master_success!("{}", message), } } } + +// +// Test: +// Does released_ids get populated with the correct values? +// Does it get smaller when clients are added? +// Does it get bigger when clients are removed anywhere not at the end? +// Does it remove elements when the client list shrinks to the next threshold? +// (i.e. 10, 20, 30, 40, 50 then if there are 51 clients and a person joins, then the id 50 is used.) +// (but if 51, 50, 49...39 then released_ids should delete anything higher than the largest index) +// std::collections::BTreeSet could make this simpler by just checking if the last element is larger than the size of the list. diff --git a/rust/src/transport/thread_manager.rs b/rust/src/transport/thread_manager.rs index 28283bc..47de29c 100644 --- a/rust/src/transport/thread_manager.rs +++ b/rust/src/transport/thread_manager.rs @@ -52,9 +52,11 @@ impl ThreadManager { drop(main_pool); - while let Some(action) = main_pool_copied.pop_front() { - action(); - } + tokio::task::spawn(async move { + while let Some(action) = main_pool_copied.pop_front() { + action(); + } + }); } } @@ -79,9 +81,11 @@ impl ThreadManager { drop(side_pool); - while let Some(action) = side_pool_copied.pop_front() { - action(); - } + tokio::task::spawn(async move { + while let Some(action) = side_pool_copied.pop_front() { + action(); + } + }); } } diff --git a/rust/src/utils/constants.rs b/rust/src/utils/constants.rs index b043870..9804440 100644 --- a/rust/src/utils/constants.rs +++ b/rust/src/utils/constants.rs @@ -1,6 +1,6 @@ pub const VERSION: &str = "0.1.0"; -pub(crate) const DEBUGGING: bool = false; +pub(crate) const DEBUGGING: bool = true; /// How many ticks are in a second. pub const TICK_RATE: i32 = 30; diff --git a/rust/src/utils/mod.rs b/rust/src/utils/mod.rs index 4fa7990..0b6d2c4 100644 --- a/rust/src/utils/mod.rs +++ b/rust/src/utils/mod.rs @@ -1,2 +1 @@ pub mod constants; -pub mod macros;