Skip to content

Commit

Permalink
Move CGW runtion mcreation to init stage
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Jul 31, 2024
1 parent 91f3996 commit 25f471d
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 55 deletions.
170 changes: 115 additions & 55 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::cgw_nb_api_listener::{
cgw_construct_infra_group_device_del_response, cgw_construct_rebalance_group_response,
cgw_construct_unassigned_infra_connection_msg,
};
use crate::cgw_runtime::{cgw_get_runtime, CGWRuntimeType};
use crate::cgw_tls::cgw_tls_get_cn_from_stream;
use crate::cgw_ucentral_messages_queue_manager::{
CGWUCentralMessagesQueueItem, CGW_MESSAGES_QUEUE,
Expand All @@ -33,16 +34,14 @@ use crate::cgw_errors::{Error, Result};
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use tokio::{
net::TcpStream,
runtime::{Builder, Runtime},
runtime::Runtime,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
RwLock,
},
time::{sleep, Duration},
};

use std::sync::atomic::{AtomicUsize, Ordering};

use serde_json::{Map, Value};

use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -185,58 +184,119 @@ impl CGWNBApiParsedMsg {

impl CGWConnectionServer {
pub async fn new(app_args: &AppArgs) -> Result<Arc<Self>> {
let wss_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(app_args.wss_args.wss_t_num)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("cgw-wss-t-{}", id)
})
.thread_stack_size(3 * 1024 * 1024)
.enable_all()
.build()?,
);
let internal_mbox_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-mbox")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);
let nb_api_mbox_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-mbox-nbapi")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);
let relay_msg_mbox_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-relay-mbox-nbapi")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);
let nb_api_mbox_tx_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-mbox-nbapi-tx")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);
let queue_timeout_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-queue-timeout")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);
let wss_runtime_handle = match cgw_get_runtime(CGWRuntimeType::WssRxTx) {
Ok(ret_runtime) => match ret_runtime {
Some(runtime) => runtime,
None => {
return Err(Error::ConnectionServer(format!(
"Failed to find runtime type {:?}",
CGWRuntimeType::WssRxTx
)));
}
},
Err(e) => {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
)));
}
};

let internal_mbox_runtime_handle = match cgw_get_runtime(CGWRuntimeType::MboxInternal) {
Ok(ret_runtime) => match ret_runtime {
Some(runtime) => runtime,
None => {
return Err(Error::ConnectionServer(format!(
"Failed to find runtime type {:?}",
CGWRuntimeType::WssRxTx
)));
}
},
Err(e) => {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
)));
}
};

let nb_api_mbox_runtime_handle = match cgw_get_runtime(CGWRuntimeType::MboxNbApiRx) {
Ok(ret_runtime) => match ret_runtime {
Some(runtime) => runtime,
None => {
return Err(Error::ConnectionServer(format!(
"Failed to find runtime type {:?}",
CGWRuntimeType::WssRxTx
)));
}
},
Err(e) => {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
)));
}
};

let nb_api_mbox_tx_runtime_handle = match cgw_get_runtime(CGWRuntimeType::MboxNbApiTx) {
Ok(ret_runtime) => match ret_runtime {
Some(runtime) => runtime,
None => {
return Err(Error::ConnectionServer(format!(
"Failed to find runtime type {:?}",
CGWRuntimeType::WssRxTx
)));
}
},
Err(e) => {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
)));
}
};

let relay_msg_mbox_runtime_handle = match cgw_get_runtime(CGWRuntimeType::MboxRelay) {
Ok(ret_runtime) => match ret_runtime {
Some(runtime) => runtime,
None => {
return Err(Error::ConnectionServer(format!(
"Failed to find runtime type {:?}",
CGWRuntimeType::WssRxTx
)));
}
},
Err(e) => {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
)));
}
};

let queue_timeout_handle = match cgw_get_runtime(CGWRuntimeType::QueueTimeout) {
Ok(ret_runtime) => match ret_runtime {
Some(runtime) => runtime,
None => {
return Err(Error::ConnectionServer(format!(
"Failed to find runtime type {:?}",
CGWRuntimeType::WssRxTx
)));
}
},
Err(e) => {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
)));
}
};

let (internal_tx, internal_rx) = unbounded_channel::<CGWConnectionServerReqMsg>();
let (nb_api_tx, nb_api_rx) = unbounded_channel::<CGWConnectionNBAPIReqMsg>();
Expand Down
2 changes: 2 additions & 0 deletions src/cgw_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub enum Error {

AppArgsParser(String),

Runtime(String),

// -- Externals
#[from]
Io(std::io::Error),
Expand Down
115 changes: 115 additions & 0 deletions src/cgw_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use crate::cgw_errors::{Error, Result};

use lazy_static::lazy_static;

use std::{
collections::HashMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
};

use tokio::runtime::{Builder, Runtime};

#[derive(Hash, Eq, PartialEq, Debug)]
pub enum CGWRuntimeType {
WssRxTx,
MboxInternal,
MboxNbApiRx,
MboxNbApiTx,
MboxRelay,
QueueTimeout,
}

lazy_static! {
static ref RUNTIMES: Mutex<HashMap<CGWRuntimeType, Arc<Runtime>>> = Mutex::new(HashMap::new());
}

pub fn cgw_initialize_runtimes(wss_t_num: usize) -> Result<()> {
let wss_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(wss_t_num)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("cgw-wss-t-{}", id)
})
.thread_stack_size(3 * 1024 * 1024)
.enable_all()
.build()?,
);
let internal_mbox_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-mbox")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);
let nb_api_mbox_rx_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-mbox-nbapi")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);
let nb_api_mbox_tx_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-mbox-nbapi-tx")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);
let relay_msg_mbox_runtime_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-relay-mbox-nbapi")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);
let queue_timeout_handle = Arc::new(
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("cgw-queue-timeout")
.thread_stack_size(1024 * 1024)
.enable_all()
.build()?,
);

let mut runtimes = match RUNTIMES.lock() {
Ok(runtimes_lock) => runtimes_lock,
Err(e) => {
return Err(Error::Runtime(format!(
"Failed to get runtimes lock: {}",
e
)));
}
};

runtimes.insert(CGWRuntimeType::WssRxTx, wss_runtime_handle);
runtimes.insert(CGWRuntimeType::MboxInternal, internal_mbox_runtime_handle);
runtimes.insert(CGWRuntimeType::MboxNbApiRx, nb_api_mbox_rx_runtime_handle);
runtimes.insert(CGWRuntimeType::MboxNbApiTx, nb_api_mbox_tx_runtime_handle);
runtimes.insert(CGWRuntimeType::MboxRelay, relay_msg_mbox_runtime_handle);
runtimes.insert(CGWRuntimeType::QueueTimeout, queue_timeout_handle);

Ok(())
}

pub fn cgw_get_runtime(runtime_type: CGWRuntimeType) -> Result<Option<Arc<Runtime>>> {
let runtimes = match RUNTIMES.lock() {
Ok(runtimes_lock) => runtimes_lock,
Err(e) => {
return Err(Error::Runtime(format!(
"Failed to get runtimes lock: {}",
e
)));
}
};

Ok(runtimes.get(&runtime_type).cloned())
}
9 changes: 9 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod cgw_nb_api_listener;
mod cgw_remote_client;
mod cgw_remote_discovery;
mod cgw_remote_server;
mod cgw_runtime;
mod cgw_tls;
mod cgw_ucentral_ap_parser;
mod cgw_ucentral_messages_queue_manager;
Expand All @@ -25,6 +26,8 @@ extern crate log;
extern crate lazy_static;

use cgw_app_args::AppArgs;
use cgw_runtime::cgw_initialize_runtimes;

use tokio::{
net::TcpListener,
runtime::{Builder, Handle, Runtime},
Expand Down Expand Up @@ -271,6 +274,12 @@ async fn main() -> Result<()> {
// Configure logger
setup_logger(args.log_level);

// Initialize runtimes
if let Err(e) = cgw_initialize_runtimes(args.wss_args.wss_t_num) {
error!("Failed to initialize CGW runtimes: {}", e.to_string());
return Err(e);
}

if args.feature_topomap_enabled {
warn!("CGW_FEATURE_TOPOMAP_ENABLE is set, TOPO MAP feature (unstable) will be enabled (realtime events / state processing) - heavy performance drop with high number of devices connected could be observed");
}
Expand Down

0 comments on commit 25f471d

Please sign in to comment.