Skip to content

Commit

Permalink
Improve the MasterServer,Client, and add Events + DashMap.
Browse files Browse the repository at this point in the history
  • Loading branch information
Makosai committed Nov 6, 2024
1 parent e27162f commit fed2799
Show file tree
Hide file tree
Showing 15 changed files with 636 additions and 87 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ path = "src/bin/client.rs"

[dependencies]
ctrlc = "3.4.5"
dashmap = "6.1.0"
lazy_static = "1.5.0"
num_cpus = "1.16.0"
quaternion = "2.0.0"
Expand Down
12 changes: 6 additions & 6 deletions rust/src/app/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! 2. Accept clients and authenticate or sign them up.
//! 3. Send clients to either a low population cluster or a specific one if provided.
use crate::{ master_debug, master_info, master_success, master_warning };
use crate::{ master::MasterServer, master_debug, master_info, master_success, master_warning };

use tokio::{
io::{ AsyncBufReadExt, AsyncWriteExt, BufReader },
Expand All @@ -15,17 +15,17 @@ use tokio::{
/// Start the Master Server and handles shutdown signals.
pub async fn start() {
let mut shutdown_rx = crate::app::shutdown_channel().unwrap();
let mut is_running = true;
let mut master_server = MasterServer::new(None, None).await.unwrap();

select! {
_ = shutdown_rx.recv() => {
is_running = false;
master_server.base.is_running = false;
master_warning!("Shutting down...");
}
_ = run(&mut is_running) => {}
_ = master_server.start() => {}
}

if !is_running {
if !master_server.base.is_running {
cleanup().await;
master_success!("Master Server has been shut down.");
}
Expand All @@ -39,7 +39,7 @@ async fn cleanup() {

/// Entrypoint for the Master Server.
#[inline(always)]
async fn run(is_running: &mut bool) {
async fn run(is_running: &bool) {
master_info!("Starting the Master Server...");

// TODO: Read from config.
Expand Down
12 changes: 11 additions & 1 deletion rust/src/clients/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::{ net::Ipv4Addr, str::FromStr };

use tokio::sync::mpsc::{self, Receiver, Sender};

use crate::events::Event;
use crate::transport::BaseClient;
use crate::utils::constants;
use crate::world::ClusterInfo;
Expand All @@ -24,12 +27,16 @@ pub struct Client {
on_cluster_server_list: Vec<Box<dyn Fn(ClusterInfo) + Send>>,

base: BaseClient,

event_receiver: Receiver<Event>,
event_sender: Sender<Event>,
}

impl Client {
// TODO: ip string and port
pub fn new(ip: Option<Ipv4Addr>, port: Option<u16>) -> Client {
let base_client = BaseClient::new(None);
let (event_sender, event_receiver) = mpsc::channel(100); // TODO: Could be a different channel type.
let base_client = BaseClient::new(None, None, event_sender.clone());

return Client {
active_connection: ConnectionType::None,
Expand All @@ -48,6 +55,9 @@ impl Client {
on_cluster_server_list: vec![],

base: base_client,

event_receiver,
event_sender,
};
}

Expand Down
5 changes: 5 additions & 0 deletions rust/src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub enum Event {
Connection(u32),
Disconnection(u32),
ReceivedData(u32, Vec<u8>),
}
2 changes: 2 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod app;
pub mod clients;
pub mod core;
pub mod events;
pub mod macros;
pub mod master;
pub mod network;
pub mod options;
Expand Down
15 changes: 15 additions & 0 deletions rust/src/macros/defer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
pub struct ScopeCall<F: FnMut()> {
pub c: F
}
impl<F: FnMut()> Drop for ScopeCall<F> {
fn drop(&mut self) {
(self.c)();
}
}

#[macro_export]
macro_rules! defer {
($e:expr) => (
let _scope_call = $crate::macros::defer::ScopeCall { c: || -> () { $e; } };
)
}
4 changes: 2 additions & 2 deletions rust/src/utils/macros.rs → rust/src/macros/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ macro_rules! log_message {
$($arg:tt)*
) => {
{
use $crate::utils::macros::LogLevel;
use $crate::utils::macros::LogType;
use $crate::macros::logging::LogLevel;
use $crate::macros::logging::LogType;
use $crate::utils::constants::{
TERMINAL_BLUE,
TERMINAL_GREEN,
Expand Down
2 changes: 2 additions & 0 deletions rust/src/macros/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod defer;
pub mod logging;
11 changes: 6 additions & 5 deletions rust/src/master/master_server.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
use crate::{ transport::{BaseServer, ServerType}, utils::constants };

pub struct MasterServer {
base: BaseServer,
pub base: BaseServer,
}

#[derive(Debug)]
pub enum MasterServerError {
// TODO Implement
}

// TODO Implement
impl MasterServer {
pub async fn new() -> Result<Self, (MasterServerError, String)> {
pub async fn new(max_connections: Option<u32>, port: Option<u16>) -> Result<Self, (MasterServerError, String)> {
Ok(MasterServer {
base: BaseServer::new(ServerType::MasterServer, 0, Some(constants::MASTER_PORT)).await.unwrap(),
base: BaseServer::new(ServerType::MasterServer, max_connections, port).await.unwrap(),
})
}

pub fn run(&self) {
println!("Master server started");
pub async fn start(&mut self) {
self.base.start().await;
}
}
187 changes: 172 additions & 15 deletions rust/src/network/packet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
// TODO Implement

// List of possible errors
#[derive(Debug)]
pub enum PacketError {
ReadError,
}
impl From<PacketError> for String {
fn from(error: PacketError) -> String {
match error {
PacketError::ReadError => "Failed to read packet.".to_string(),
}
}
}

pub struct Packet {
buffer: Vec<u8>,
readable_buffer: Option<Vec<u8>>,
Expand Down Expand Up @@ -30,7 +43,7 @@ impl Packet {
packet
}

//#region Packet Functions
// region: Packet Functions
/// Sets the packet's content and prepares it to be read.
pub fn set_bytes(&mut self, data: Vec<u8>) {
self.buffer.clear();
Expand Down Expand Up @@ -67,9 +80,9 @@ impl Packet {
self.read_pos = self.read_pos.saturating_sub(4);
}
}
//#endregion
// endregion

//#region Write Functions
// region: Write Functions
pub fn write_byte(&mut self, data: u8) {
self.buffer.push(data)
}
Expand All @@ -79,31 +92,175 @@ impl Packet {
}

pub fn write_short(&mut self, data: i16) {
let bytes = data.to_be_bytes();
self.buffer.extend(&bytes);
self.buffer.extend(&data.to_be_bytes());
}

pub fn write_ushort(&mut self, data: u16) {
let bytes = data.to_be_bytes();
self.buffer.extend(&bytes);
self.buffer.extend(&data.to_be_bytes());
}

pub fn write_int(&mut self, data: i32) {
let bytes = data.to_be_bytes();
self.buffer.extend(&bytes);
self.buffer.extend(&data.to_be_bytes());
}

pub fn write_uint(&mut self, data: u32) {
self.buffer.extend(&data.to_be_bytes());
}

pub fn write_long(&mut self, data: i64) {
self.buffer.extend(&data.to_be_bytes());
}

pub fn write_float(&mut self, data: f32) {
self.buffer.extend(&data.to_be_bytes());
}

pub fn write_double(&mut self, data: f64) {
self.buffer.extend(&data.to_be_bytes());
}
//#endregion

//#region Read Functions
// TODO Implement
//endregion
pub fn write_bool(&mut self, data: bool) {
self.buffer.push(data as u8);
}

pub fn write_string(&mut self, data: String) {
self.write_int(data.len() as i32);
self.write_bytes(data.into_bytes());
}
// endregion

// region: Read Functions
/// Reads a byte from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
///
/// Returns the byte that was read.
pub fn read_byte(&mut self, move_read_pos: Option<bool>) -> Result<u8, PacketError> {
// Check if there are bytes to read.
if self.buffer.len() < self.read_pos + 1 {
return Err(PacketError::ReadError);
}

let data = self.buffer[self.read_pos]; // Get the byte at the current read_pos.

if move_read_pos.unwrap_or(true) {
self.read_pos += 1;
}

Ok(data)
}

/// Reads a range of bytes from the packet.
///
/// * `length` - The length of the array to read.
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
///
/// Returns the rarnge of bytes that were read.
pub fn read_bytes(&mut self, length: usize, move_read_pos: Option<bool>) -> Result<Vec<u8>, PacketError> {
// Check if there are bytes to read.
if self.buffer.len() < self.read_pos + length {
return Err(PacketError::ReadError);
}

let data = self.buffer[self.read_pos..self.read_pos + length].to_vec();

if move_read_pos.unwrap_or(true) {
self.read_pos += length;
}

Ok(data)
}

/// Reads a short from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
/// Returns the short that was read.
pub fn read_short(&mut self, move_read_pos: Option<bool>) -> Result<i16, PacketError> {
let data = self.read_bytes(2, move_read_pos)?;
Ok(i16::from_be_bytes(data.try_into().unwrap()))
}

/// Reads an unsigned short from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
/// Returns the unsigned short that was read.
pub fn read_ushort(&mut self, move_read_pos: Option<bool>) -> Result<u16, PacketError> {
let data = self.read_bytes(2, move_read_pos)?;
Ok(u16::from_be_bytes(data.try_into().unwrap()))
}

/// Reads an integer from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
/// Returns the integer that was read.
pub fn read_int(&mut self, move_read_pos: Option<bool>) -> Result<i32, PacketError> {
let data = self.read_bytes(4, move_read_pos)?;
Ok(i32::from_be_bytes(data.try_into().unwrap()))
}

/// Reads a unsigned integer from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
/// Returns the unsigned integer that was read.
pub fn read_uint(&mut self, move_read_pos: Option<bool>) -> Result<u32, PacketError> {
let data = self.read_bytes(4, move_read_pos)?;
Ok(u32::from_be_bytes(data.try_into().unwrap()))
}

/// Reads a long from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
/// Returns the long that was read.
pub fn read_long(&mut self, move_read_pos: Option<bool>) -> Result<i64, PacketError> {
let data = self.read_bytes(8, move_read_pos)?;
Ok(i64::from_be_bytes(data.try_into().unwrap()))
}


/// Reads a float from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
/// Returns the float that was read.
pub fn read_float(&mut self, move_read_pos: Option<bool>) -> Result<f32, PacketError> {
let data = self.read_bytes(4, move_read_pos)?;
Ok(f32::from_be_bytes(data.try_into().unwrap()))
}

/// Reads a double from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
/// Returns the double that was read.
pub fn read_double(&mut self, move_read_pos: Option<bool>) -> Result<f64, PacketError> {
let data = self.read_bytes(8, move_read_pos)?;
Ok(f64::from_be_bytes(data.try_into().unwrap()))
}

/// Reads a boolean from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
/// Returns the boolean that was read.
pub fn read_bool(&mut self, move_read_pos: Option<bool>) -> Result<bool, PacketError> {
let data = self.read_byte(move_read_pos)?;
Ok(data != 0)
}

/// Reads a string from the packet.
///
/// * `move_read_pos` - If the buffer's read position should be incremented. Defaults to true.
/// Returns the string that was read.
pub fn read_string(&mut self, move_read_pos: Option<bool>) -> Result<String, PacketError> {
let length = self.read_int(move_read_pos)? as usize;
let data = self.read_bytes(length, move_read_pos)?;
Ok(String::from_utf8(data).unwrap())
}
// endregion

//#region Memory Functions
// region: Memory Functions
/// Deinitializes the packet.
pub fn deinit(&mut self) {
self.buffer.clear();
self.readable_buffer = None;
self.read_pos = 0;
}
//#endregion
// endregion
}
Loading

0 comments on commit fed2799

Please sign in to comment.