diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index 288ebf6..f219d06 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -8,6 +8,7 @@ use crate::cgw_nb_api_listener::{ cgw_construct_infra_group_device_del_response, cgw_construct_rebalance_group_response, cgw_construct_unassigned_infra_connection_msg, }; +use crate::cgw_runtime::{cgw_get_runtime, CGWRuntimeType}; use crate::cgw_tls::cgw_tls_get_cn_from_stream; use crate::cgw_ucentral_messages_queue_manager::{ CGWUCentralMessagesQueueItem, CGW_MESSAGES_QUEUE, @@ -33,7 +34,7 @@ use crate::cgw_errors::{Error, Result}; use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use tokio::{ net::TcpStream, - runtime::{Builder, Runtime}, + runtime::Runtime, sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, RwLock, @@ -41,8 +42,6 @@ use tokio::{ time::{sleep, Duration}, }; -use std::sync::atomic::{AtomicUsize, Ordering}; - use serde_json::{Map, Value}; use serde::{Deserialize, Serialize}; @@ -185,58 +184,119 @@ impl CGWNBApiParsedMsg { impl CGWConnectionServer { pub async fn new(app_args: &AppArgs) -> Result> { - let wss_runtime_handle = Arc::new( - Builder::new_multi_thread() - .worker_threads(app_args.wss_t_num) - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("cgw-wss-t-{}", id) - }) - .thread_stack_size(3 * 1024 * 1024) - .enable_all() - .build()?, - ); - let internal_mbox_runtime_handle = Arc::new( - Builder::new_multi_thread() - .worker_threads(1) - .thread_name("cgw-mbox") - .thread_stack_size(1024 * 1024) - .enable_all() - .build()?, - ); - let nb_api_mbox_runtime_handle = Arc::new( - Builder::new_multi_thread() - .worker_threads(1) - .thread_name("cgw-mbox-nbapi") - .thread_stack_size(1024 * 1024) - .enable_all() - .build()?, - ); - let relay_msg_mbox_runtime_handle = Arc::new( - Builder::new_multi_thread() - .worker_threads(1) - .thread_name("cgw-relay-mbox-nbapi") - .thread_stack_size(1024 * 1024) - .enable_all() - .build()?, - ); - let nb_api_mbox_tx_runtime_handle = Arc::new( - Builder::new_multi_thread() - .worker_threads(1) - .thread_name("cgw-mbox-nbapi-tx") - .thread_stack_size(1024 * 1024) - .enable_all() - .build()?, - ); - let queue_timeout_handle = Arc::new( - Builder::new_multi_thread() - .worker_threads(1) - .thread_name("cgw-queue-timeout") - .thread_stack_size(1024 * 1024) - .enable_all() - .build()?, - ); + let wss_runtime_handle = match cgw_get_runtime(CGWRuntimeType::WssRxTx) { + Ok(ret_runtime) => match ret_runtime { + Some(runtime) => runtime, + None => { + return Err(Error::ConnectionServer(format!( + "Failed to find runtime type {:?}", + CGWRuntimeType::WssRxTx + ))); + } + }, + Err(e) => { + return Err(Error::ConnectionServer(format!( + "Failed to get runtime type {:?}, err: {}", + CGWRuntimeType::WssRxTx, + e.to_string() + ))); + } + }; + + let internal_mbox_runtime_handle = match cgw_get_runtime(CGWRuntimeType::MboxInternal) { + Ok(ret_runtime) => match ret_runtime { + Some(runtime) => runtime, + None => { + return Err(Error::ConnectionServer(format!( + "Failed to find runtime type {:?}", + CGWRuntimeType::WssRxTx + ))); + } + }, + Err(e) => { + return Err(Error::ConnectionServer(format!( + "Failed to get runtime type {:?}, err: {}", + CGWRuntimeType::WssRxTx, + e.to_string() + ))); + } + }; + + let nb_api_mbox_runtime_handle = match cgw_get_runtime(CGWRuntimeType::MboxNbApiRx) { + Ok(ret_runtime) => match ret_runtime { + Some(runtime) => runtime, + None => { + return Err(Error::ConnectionServer(format!( + "Failed to find runtime type {:?}", + CGWRuntimeType::WssRxTx + ))); + } + }, + Err(e) => { + return Err(Error::ConnectionServer(format!( + "Failed to get runtime type {:?}, err: {}", + CGWRuntimeType::WssRxTx, + e.to_string() + ))); + } + }; + + let nb_api_mbox_tx_runtime_handle = match cgw_get_runtime(CGWRuntimeType::MboxNbApiTx) { + Ok(ret_runtime) => match ret_runtime { + Some(runtime) => runtime, + None => { + return Err(Error::ConnectionServer(format!( + "Failed to find runtime type {:?}", + CGWRuntimeType::WssRxTx + ))); + } + }, + Err(e) => { + return Err(Error::ConnectionServer(format!( + "Failed to get runtime type {:?}, err: {}", + CGWRuntimeType::WssRxTx, + e.to_string() + ))); + } + }; + + let relay_msg_mbox_runtime_handle = match cgw_get_runtime(CGWRuntimeType::MboxRelay) { + Ok(ret_runtime) => match ret_runtime { + Some(runtime) => runtime, + None => { + return Err(Error::ConnectionServer(format!( + "Failed to find runtime type {:?}", + CGWRuntimeType::WssRxTx + ))); + } + }, + Err(e) => { + return Err(Error::ConnectionServer(format!( + "Failed to get runtime type {:?}, err: {}", + CGWRuntimeType::WssRxTx, + e.to_string() + ))); + } + }; + + let queue_timeout_handle = match cgw_get_runtime(CGWRuntimeType::QueueTimeout) { + Ok(ret_runtime) => match ret_runtime { + Some(runtime) => runtime, + None => { + return Err(Error::ConnectionServer(format!( + "Failed to find runtime type {:?}", + CGWRuntimeType::WssRxTx + ))); + } + }, + Err(e) => { + return Err(Error::ConnectionServer(format!( + "Failed to get runtime type {:?}, err: {}", + CGWRuntimeType::WssRxTx, + e.to_string() + ))); + } + }; let (internal_tx, internal_rx) = unbounded_channel::(); let (nb_api_tx, nb_api_rx) = unbounded_channel::(); diff --git a/src/cgw_errors.rs b/src/cgw_errors.rs index b93f50d..7747208 100644 --- a/src/cgw_errors.rs +++ b/src/cgw_errors.rs @@ -23,6 +23,8 @@ pub enum Error { AppArgsParser(String), + Runtime(String), + // -- Externals #[from] Io(std::io::Error), diff --git a/src/cgw_runtime.rs b/src/cgw_runtime.rs new file mode 100644 index 0000000..8349fbc --- /dev/null +++ b/src/cgw_runtime.rs @@ -0,0 +1,115 @@ +use crate::cgw_errors::{Error, Result}; + +use lazy_static::lazy_static; + +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, +}; + +use tokio::runtime::{Builder, Runtime}; + +#[derive(Hash, Eq, PartialEq, Debug)] +pub enum CGWRuntimeType { + WssRxTx, + MboxInternal, + MboxNbApiRx, + MboxNbApiTx, + MboxRelay, + QueueTimeout, +} + +lazy_static! { + static ref RUNTIMES: Mutex>> = Mutex::new(HashMap::new()); +} + +pub fn cgw_initialize_runtimes(wss_t_num: usize) -> Result<()> { + let wss_runtime_handle = Arc::new( + Builder::new_multi_thread() + .worker_threads(wss_t_num) + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("cgw-wss-t-{}", id) + }) + .thread_stack_size(3 * 1024 * 1024) + .enable_all() + .build()?, + ); + let internal_mbox_runtime_handle = Arc::new( + Builder::new_multi_thread() + .worker_threads(1) + .thread_name("cgw-mbox") + .thread_stack_size(1024 * 1024) + .enable_all() + .build()?, + ); + let nb_api_mbox_rx_runtime_handle = Arc::new( + Builder::new_multi_thread() + .worker_threads(1) + .thread_name("cgw-mbox-nbapi") + .thread_stack_size(1024 * 1024) + .enable_all() + .build()?, + ); + let nb_api_mbox_tx_runtime_handle = Arc::new( + Builder::new_multi_thread() + .worker_threads(1) + .thread_name("cgw-mbox-nbapi-tx") + .thread_stack_size(1024 * 1024) + .enable_all() + .build()?, + ); + let relay_msg_mbox_runtime_handle = Arc::new( + Builder::new_multi_thread() + .worker_threads(1) + .thread_name("cgw-relay-mbox-nbapi") + .thread_stack_size(1024 * 1024) + .enable_all() + .build()?, + ); + let queue_timeout_handle = Arc::new( + Builder::new_multi_thread() + .worker_threads(1) + .thread_name("cgw-queue-timeout") + .thread_stack_size(1024 * 1024) + .enable_all() + .build()?, + ); + + let mut runtimes = match RUNTIMES.lock() { + Ok(runtimes_lock) => runtimes_lock, + Err(e) => { + return Err(Error::Runtime(format!( + "Failed to get runtimes lock: {}", + e + ))); + } + }; + + runtimes.insert(CGWRuntimeType::WssRxTx, wss_runtime_handle); + runtimes.insert(CGWRuntimeType::MboxInternal, internal_mbox_runtime_handle); + runtimes.insert(CGWRuntimeType::MboxNbApiRx, nb_api_mbox_rx_runtime_handle); + runtimes.insert(CGWRuntimeType::MboxNbApiTx, nb_api_mbox_tx_runtime_handle); + runtimes.insert(CGWRuntimeType::MboxRelay, relay_msg_mbox_runtime_handle); + runtimes.insert(CGWRuntimeType::QueueTimeout, queue_timeout_handle); + + Ok(()) +} + +pub fn cgw_get_runtime(runtime_type: CGWRuntimeType) -> Result>> { + let runtimes = match RUNTIMES.lock() { + Ok(runtimes_lock) => runtimes_lock, + Err(e) => { + return Err(Error::Runtime(format!( + "Failed to get runtimes lock: {}", + e + ))); + } + }; + + Ok(runtimes.get(&runtime_type).cloned()) +} diff --git a/src/main.rs b/src/main.rs index a50a710..dcb791c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ mod cgw_nb_api_listener; mod cgw_remote_client; mod cgw_remote_discovery; mod cgw_remote_server; +mod cgw_runtime; mod cgw_tls; mod cgw_ucentral_ap_parser; mod cgw_ucentral_messages_queue_manager; @@ -23,6 +24,8 @@ extern crate log; #[macro_use] extern crate lazy_static; +use cgw_runtime::cgw_initialize_runtimes; + use tokio::{ net::TcpListener, runtime::{Builder, Handle, Runtime}, @@ -637,6 +640,12 @@ async fn main() -> Result<()> { // Configure logger setup_logger(args.log_level); + // Initialize runtimes + if let Err(e) = cgw_initialize_runtimes(args.wss_t_num) { + error!("Failed to initialize CGW runtimes: {}", e.to_string()); + return Err(e); + } + if args.feature_topomap_enabled { warn!("CGW_FEATURE_TOPOMAP_ENABLE is set, TOPO MAP feature (unstable) will be enabled (realtime events / state processing) - heavy performance drop with high number of devices connected could be observed"); }