Skip to content

Commit

Permalink
Move macros to shared lib. Setup the listener for the Cluster. Improv…
Browse files Browse the repository at this point in the history
…e Client to Cluster interactions.
  • Loading branch information
Makosai committed Dec 28, 2024
1 parent cd48aa0 commit 5807324
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 35 deletions.
92 changes: 70 additions & 22 deletions rust/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ use tokio::net::TcpStream;
use tokio::sync::mpsc::Sender;
use tokio::sync::{ mpsc, Mutex, RwLock };

use shared::log_message;
use shared::packets::cluster::ToClient;
use shared::packets::master::{ FromUnknown, ToUnknown };
use shared::utils::constants::{ DEFAULT_IP, MASTER_PORT };
use shared::utils::{ self, constants };

pub mod macros;
use shared::{ log_message, lread_string, lselect };

lazy_static::lazy_static! {
static ref CLUSTER_SERVERS: Arc<RwLock<Vec<ClusterInfo>>> = Arc::new(RwLock::new(Vec::new()));
Expand Down Expand Up @@ -53,13 +51,23 @@ impl From<ClusterInfo> for Connection {
}
}

#[derive(Clone, Copy)]
#[derive(Clone, Copy, Eq, PartialEq)]
pub enum ConnectionType {
MasterServer,
ClusterServer,
None,
}

impl std::fmt::Display for ConnectionType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
ConnectionType::MasterServer => write!(f, "Master Server"),
ConnectionType::ClusterServer => write!(f, "Cluster Server"),
ConnectionType::None => write!(f, "Unknown"),
}
}
}

#[tokio::main]
async fn main() {
let mut shutdown_rx = utils::shutdown_channel().expect("Error creating shutdown channel.");
Expand Down Expand Up @@ -108,18 +116,11 @@ async fn start() {
}

let handler = tokio::spawn(async move {
warning(format!("Connecting to the {connection_type}...").as_str());
let mut stream = TcpStream::connect(format!("{}:{}", ip, port)).await.expect(
format!(
"Failed to connect to the {} Server at {}:{}.",
match connection_type {
ConnectionType::MasterServer => "Master",
ConnectionType::ClusterServer => "Cluster",
ConnectionType::None => "Unknown",
},
ip,
port
).as_str()
format!("Failed to connect to the {connection_type} at {ip}:{port}.").as_str()
);
success(format!("Connected to the {connection_type} at {ip}:{port}.").as_str());

let (reader, mut writer) = stream.split();
let mut reader = BufReader::new(reader);
Expand All @@ -130,12 +131,11 @@ async fn start() {
continue;
}

debug(format!("Client received data: {:?}", command).as_str());
info(format!("Received data: {:?}", command).as_str());

match connection_type {
ConnectionType::MasterServer => match command.unwrap() {
x if x == ToUnknown::SendClusters as u8 => {
//amount
let amount = match reader.read_u8().await {
Ok(amount) => amount,
Err(_) => {
Expand Down Expand Up @@ -176,7 +176,7 @@ async fn start() {
let mut cluster_servers = CLUSTER_SERVERS.write().await;
*cluster_servers = cluster_servers_tmp;

success("Client received the cluster servers from the Master Server.");
success(format!("Received {amount} Cluster servers from the {connection_type}.").as_str());
println!("{:?}", *cluster_servers);
}
join_cluster(0).await;
Expand All @@ -185,7 +185,53 @@ async fn start() {
_ => (),
}
ConnectionType::ClusterServer => match command.unwrap() {
x if x == ToClient::SendClusters as u8 => todo!(),
x if x == ToClient::SendClusters as u8 => {
let amount = match reader.read_u8().await {
Ok(amount) => amount,
Err(_) => {
error("Failed to read the amount of clusters.");
continue;
}
};

let mut cluster_servers_tmp = Vec::new();
for _ in 0..amount {
let name = lread_string!(reader, error, "cluster name");
let ip = lread_string!(reader, error, "cluster IP");
let port = match reader.read_u16().await {
Ok(port) => port,
Err(_) => {
error("Failed to read the cluster port.");
continue;
}
};
let max_connections = match reader.read_u32().await {
Ok(max_connections) => max_connections,
Err(_) => {
error("Failed to read the cluster max connections.");
continue;
}
};

cluster_servers_tmp.push(ClusterInfo {
name,
ip,
port,
max_connections,
});
}

{
{
let mut cluster_servers = CLUSTER_SERVERS.write().await;
*cluster_servers = cluster_servers_tmp;

success(format!("Received {amount} Cluster servers from the {connection_type}.").as_str());
println!("{:?}", *cluster_servers);
}
join_cluster(0).await;
}
},
x if x == ToClient::DisconnectCluster as u8 => todo!(),
x if x == ToClient::LeaveCluster as u8 => todo!(),

Expand All @@ -203,23 +249,25 @@ async fn start() {
if let Some(data) = result {
if data.is_empty() {
writer.shutdown().await.expect("Failed to shutdown the writer.");
info("Client is shutting down its writer.");
info("Closing connection...");
break;
}

writer.write_all(&data).await.expect("Failed to write to the Server.");
writer.flush().await.expect("Failed to flush the writer.");
success(format!("Client sent {data:?} as data to the Master Server.").as_str());
info(format!("Sent {data:?} as data to the {connection_type}.").as_str());
} else {
writer.shutdown().await.expect("Failed to shutdown the writer.");
info("Client is shutting down its writer.");
info("Shutting down connection...");
break;
}
}
}
});

send_data(Box::new([FromUnknown::RequestClusters as u8])).await;
if connection_type == ConnectionType::MasterServer {
send_data(Box::new([FromUnknown::RequestClusters as u8])).await;
}

let _ = handler.await;
}
Expand Down
96 changes: 85 additions & 11 deletions rust/cluster/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{ net::Ipv4Addr, str::FromStr };

use dashmap::DashMap;

use shared::lselect;
use tokio::io::{ AsyncReadExt, AsyncWriteExt, BufReader };
use tokio::net::{ TcpListener, TcpStream };
use tokio::select;
Expand Down Expand Up @@ -82,7 +83,8 @@ async fn start() {
let (tx, mut rx) = mpsc::channel::<Box<[u8]>>(10);
let tx_clone = tx.clone();

let handler = tokio::spawn(async move {
// Cluster Server's connection to the Master Server.
tokio::spawn(async move {
let mut stream = TcpStream::connect(
format!("{}:{}", get_ip(&master_ip), master_port)
).await.expect("Failed to connect to the Master Server.");
Expand Down Expand Up @@ -210,19 +212,91 @@ async fn start() {
}
});

let command = FromUnknown::BecomeCluster as u8;
// Send a request to the Master Server to become a cluster.
{
let command = FromUnknown::BecomeCluster as u8;

let mut data = [command].to_vec();
data.push(key_name.len() as u8);
data.extend_from_slice(key_name.as_bytes());
let mut data = [command].to_vec();
data.push(key_name.len() as u8);
data.extend_from_slice(key_name.as_bytes());

let data = data.into_boxed_slice();
send_data(&tx_clone, data).await;
let data = data.into_boxed_slice();
send_data(&tx_clone, data).await;
}

// Cluster Server Listener
{
let (event_sender, mut event_receiver) = mpsc::channel::<Event>(100);

let clients: DashMap<u32, ServerClient> = DashMap::new();
let released_ids: Arc<Mutex<BTreeSet<u32>>> = Arc::new(Mutex::new(BTreeSet::new()));

{
let max_connections_str = match max_connections {
0 => "unlimited max connections".to_string(),
1 => "1 max connection".to_string(),
_ => format!("{} max connections", max_connections),
};

debug(
format!("Starting the Master Server on port {} with {max_connections_str}...", port).as_str()
);
}

// Listen
{
let tcp_listener = TcpListener::bind(
format!("{}:{}", constants::DEFAULT_IP, port)
).await.expect("Failed to bind to the specified port.");

match handler.await {
Ok(_) => {}
Err(e) => {
error(format!("Error: {:?}", e).as_str());
lselect! {
event = event_receiver.recv() => {
if let Some(event) = event {
match event {
Event::Connection(id) => on_connection(id),
Event::Disconnection(id) => {
debug(format!("Client#{id} disconnected.").as_str());
clients.remove(&id);

if id >= clients.len() as u32 {
info(format!("Client#{id} wasn't added to the released IDs list.").as_str());
continue;
}

let mut ids = released_ids.lock().await;
if !(*ids).insert(id) {
error(format!("ID {} already exists in the released IDs.", id).as_str());
continue;
};
},
Event::ReceivedData(id, data) => on_received_data(id, &data),
}
}
}
// Listen and add clients.
res = tcp_listener.accept() => {
if let Ok((stream, addr)) = res {
debug(format!("Accepted connection from {:?}", addr).as_str());

// If the max_connections is reached, return an error.
if max_connections != 0 && clients.len() >= (max_connections as usize) {
error("Max connections reached.");
continue;
}

// Get the next available ID and insert it.
let released_id: u32 = released_ids
.lock().await
.pop_first()
.unwrap_or(clients.len() as u32);
let mut client = ServerClient::new(released_id);
client.handle_data(event_sender.clone(), stream).await;
clients.insert(released_id, client);

event_sender.send(Event::Connection(released_id)).await.unwrap();
}
}
}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions rust/shared/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
pub mod config;
pub mod network;
pub mod logging;
pub mod network;
pub mod packets;
pub mod utils;


pub mod security;

pub mod macros;
File renamed without changes.

0 comments on commit 5807324

Please sign in to comment.