Skip to content

Commit

Permalink
Merge pull request #77 from Telecominfraproject/feat/topo_map_enhance…
Browse files Browse the repository at this point in the history
…ments

Feat/topo map enhancements
  • Loading branch information
SviatoslavBoichuk authored Sep 19, 2024
2 parents 6ec99c7 + 3bc34fc commit 99241ad
Show file tree
Hide file tree
Showing 11 changed files with 1,590 additions and 713 deletions.
1 change: 1 addition & 0 deletions run_cgw.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export CGW_NB_INFRA_CERTS_PATH="${CGW_NB_INFRA_CERTS_PATH:-$DEFAULT_CERTS_PATH}"
export CGW_NB_INFRA_TLS="${CGW_NB_INFRA_TLS:-$DEFAULT_NB_INFRA_TLS}"
export CGW_UCENTRAL_AP_DATAMODEL_URI="${CGW_UCENTRAL_AP_DATAMODEL_URI:-$DEFAULT_UCENTRAL_AP_DATAMODEL_URI}"
export CGW_UCENTRAL_SWITCH_DATAMODEL_URI="${CGW_UCENTRAL_SWITCH_DATAMODEL_URI:-$DEFAULT_UCENTRAL_SWITCH_DATAMODEL_URI}"
export RUST_BACKTRACE=1

if [ -z "${!CGW_REDIS_USERNAME}" ]; then
export CGW_REDIS_USERNAME="${CGW_REDIS_USERNAME}"
Expand Down
48 changes: 34 additions & 14 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct CGWConnectionProcessor {
pub idx: i64,
pub group_id: i32,
pub feature_topomap_enabled: bool,
pub device_type: CGWDeviceType,
}

impl CGWConnectionProcessor {
Expand All @@ -82,6 +83,8 @@ impl CGWConnectionProcessor {
idx: conn_idx,
group_id: 0,
feature_topomap_enabled: server.feature_topomap_enabled,
// Default to AP, it's safe, as later-on it will be changed
device_type: CGWDeviceType::CGWDeviceAP,
};

conn_processor
Expand Down Expand Up @@ -187,7 +190,8 @@ impl CGWConnectionProcessor {
}

self.serial = evt.serial;
let device_type = CGWDeviceType::from_str(caps.platform.as_str())?;

self.device_type = CGWDeviceType::from_str(caps.platform.as_str())?;

// TODO: we accepted tls stream and split the WS into RX TX part,
// now we have to ASK cgw_connection_server's permission whether
Expand Down Expand Up @@ -253,16 +257,14 @@ impl CGWConnectionProcessor {
}
}

self.process_connection(stream, sink, mbox_rx, device_type)
.await;
self.process_connection(stream, sink, mbox_rx).await;

Ok(())
}

async fn process_wss_rx_msg(
&self,
msg: std::result::Result<Message, tungstenite::error::Error>,
device_type: CGWDeviceType,
fsm_state: &mut CGWUCentralMessageProcessorState,
pending_req_id: u64,
) -> Result<CGWConnectionState> {
Expand All @@ -278,20 +280,31 @@ impl CGWConnectionProcessor {
}
Text(payload) => {
if let Ok(evt) = cgw_ucentral_event_parse(
&device_type,
&self.device_type,
self.feature_topomap_enabled,
&payload,
timestamp.timestamp(),
) {
kafaka_msg = payload.clone();
kafaka_msg.clone_from(&payload);
if let CGWUCentralEventType::State(_) = evt.evt_type {
if let Some(decompressed) = evt.decompressed.clone() {
kafaka_msg = decompressed;
}
if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.process_state_message(&device_type, evt).await;
topo_map.debug_dump_map().await;

// TODO: remove this Arc clone:
// Dirty hack for now: pass Arc ref of srv to topo map;
// Future rework and refactoring would require to separate
// NB api from being an internal obj of conn_server to be a
// standalone (singleton?) object.
topo_map.enqueue_event(
evt,
self.device_type,
self.serial,
self.group_id,
self.cgw_server.clone(),
);
}
} else if let CGWUCentralEventType::Reply(content) = evt.evt_type {
if *fsm_state != CGWUCentralMessageProcessorState::ResultPending {
Expand All @@ -313,10 +326,18 @@ impl CGWConnectionProcessor {
} else if let CGWUCentralEventType::RealtimeEvent(_) = evt.evt_type {
if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map
.process_device_topology_event(&device_type, evt)
.await;
topo_map.debug_dump_map().await;
// TODO: remove this Arc clone:
// Dirty hack for now: pass Arc ref of srv to topo map;
// Future rework and refactoring would require to separate
// NB api from being an internal obj of conn_server to be a
// standalone (singleton?) object.
topo_map.enqueue_event(
evt,
self.device_type,
self.serial,
self.group_id,
self.cgw_server.clone(),
);
}
}
}
Expand Down Expand Up @@ -403,7 +424,6 @@ impl CGWConnectionProcessor {
mut stream: SStream,
mut sink: SSink,
mut mbox_rx: UnboundedReceiver<CGWConnectionProcessorReqMsg>,
device_type: CGWDeviceType,
) {
#[derive(Debug)]
enum WakeupReason {
Expand Down Expand Up @@ -554,7 +574,7 @@ impl CGWConnectionProcessor {
let rc = match wakeup_reason {
WakeupReason::WSSRxMsg(res) => {
last_contact = Instant::now();
self.process_wss_rx_msg(res, device_type, &mut fsm_state, pending_req_id)
self.process_wss_rx_msg(res, &mut fsm_state, pending_req_id)
.await
}
WakeupReason::MboxRx(mbox_message) => {
Expand Down
97 changes: 74 additions & 23 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -225,7 +225,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -244,7 +244,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -263,7 +263,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -282,7 +282,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -301,7 +301,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand Down Expand Up @@ -340,20 +340,31 @@ impl CGWConnectionServer {
}
};

let config_validator =
match CGWUCentralConfigValidators::new(app_args.validation_schema.clone()) {
Ok(validator) => validator,
Err(e) => {
error!(
"Can't create CGW Connection server: Config validator create failed: {}",
e.to_string(),
);
return Err(Error::ConnectionServer(format!(
"Can't create CGW Connection server: Config validator create failed: {}",
e.to_string(),
)));
}
};
// TODO: proper fix.
// Ugly W/A for now;
// The reason behind this change (W/A), is that underlying validator
// uses sync call, which panics (due to it being called in async
// context).
// The proper fix would to be refactor all constructors to be sync,
// but use spawn_blocking where needed in contextes that rely on the
// underlying async calls.
let app_args_clone = app_args.validation_schema.clone();
let get_config_validator_fut = tokio::task::spawn_blocking(move || {
CGWUCentralConfigValidators::new(app_args_clone).unwrap()
});
let config_validator = match get_config_validator_fut.await {
Ok(res) => res,
Err(e) => {
error!(
"Failed to retrieve json config validators: {}",
e.to_string()
);
return Err(Error::ConnectionServer(format!(
"Failed to retrieve json config validators: {}",
e
)));
}
};

let server = Arc::new(CGWConnectionServer {
allow_mismatch: app_args.wss_args.allow_mismatch,
Expand Down Expand Up @@ -412,6 +423,12 @@ impl CGWConnectionServer {
.await;
});

if server.feature_topomap_enabled {
info!("Topomap enabled, starting queue processor...");
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.start(&server.wss_rx_tx_runtime).await;
}

Ok(server)
}

Expand Down Expand Up @@ -828,6 +845,32 @@ impl CGWConnectionServer {
.await
{
Ok(()) => {
// We try to help free topomap memory usage
// by notifying it whenever GID get's destroyed.
// Howover, for allocation we let topo map
// handle it's mem alloc whenever necessary
// on it's own, when data from specific gid
// arrives - we rely on topo map to
// allocate necessary structures on it's own.
//
// In this way, we make sure that we handle
// properly the GID resoration scenario:
// if CGW restarts and loads GID info from
// DB, there would be no notification about
// create/del, and there's no need to
// oflload this responsibility to
// remote_discovery module for example,
// due to the fact that CGW is not designed
// for management of redis without CGW knowledge:
// if something disrupts the redis state / sql
// state without CGW's prior knowledge,
// it's not a responsibility of CGW to be aware
// of such changes and handle it correspondingly.
if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.remove_gid(gid).await;
}

if let Ok(resp) =
cgw_construct_infra_group_delete_response(gid, uuid, true, None)
{
Expand Down Expand Up @@ -1237,7 +1280,7 @@ impl CGWConnectionServer {
if let Ok(resp) = cgw_construct_device_enqueue_response(
uuid,
false,
Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\n{}", e.to_string())),
Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\n{}", e)),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand Down Expand Up @@ -1377,6 +1420,8 @@ impl CGWConnectionServer {
conn_processor_mbox_tx,
) = msg
{
let device_platform: String = caps.platform.clone();

// if connection is unique: simply insert new conn
//
// if duplicate exists: notify server about such incident.
Expand Down Expand Up @@ -1540,7 +1585,9 @@ impl CGWConnectionServer {

if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.insert_device(&device_mac).await;
topo_map
.insert_device(&device_mac, device_platform.as_str(), device_group_id)
.await;
}

connmap_w_lock.insert(device_mac, conn_processor_mbox_tx);
Expand All @@ -1551,6 +1598,7 @@ impl CGWConnectionServer {
let _ = conn_processor_mbox_tx_clone.send(msg);
});
} else if let CGWConnectionServerReqMsg::ConnectionClosed(device_mac) = msg {
let mut device_group_id: i32 = 0;
// Insert device to disconnected device list
{
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
Expand All @@ -1565,6 +1613,7 @@ impl CGWConnectionServer {

let mut devices_cache = self.devices_cache.write().await;
if let Some(device) = devices_cache.get_device_mut(&device_mac) {
device_group_id = device.get_device_group_id();
if device.get_device_remains_in_db() {
device.set_device_state(CGWDeviceState::CGWDeviceDisconnected);
} else {
Expand All @@ -1574,7 +1623,9 @@ impl CGWConnectionServer {

if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.remove_device(&device_mac).await;
topo_map
.remove_device(&device_mac, device_group_id, self.clone())
.await;
}

CGWMetrics::get_ref().change_counter(
Expand Down
11 changes: 6 additions & 5 deletions src/cgw_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ pub struct CGWDeviceCapabilities {

impl CGWDeviceCapabilities {
pub fn update_device_capabilities(&mut self, new_capabilities: &CGWDeviceCapabilities) {
self.firmware = new_capabilities.firmware.clone();
self.firmware.clone_from(&new_capabilities.firmware);
self.uuid = new_capabilities.uuid;
self.compatible = new_capabilities.compatible.clone();
self.model = new_capabilities.model.clone();
self.platform = new_capabilities.platform.clone();
self.label_macaddr = new_capabilities.label_macaddr.clone();
self.compatible.clone_from(&new_capabilities.compatible);
self.model.clone_from(&new_capabilities.model);
self.platform.clone_from(&new_capabilities.platform);
self.label_macaddr
.clone_from(&new_capabilities.label_macaddr);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/cgw_devices_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl CGWDevicesCache {
}

pub fn check_device_exists(&self, key: &MacAddress) -> bool {
self.cache.get(key).is_some()
self.cache.contains_key(key)
}

pub fn get_device_mut(&mut self, key: &MacAddress) -> Option<&mut CGWDevice> {
Expand Down
9 changes: 4 additions & 5 deletions src/cgw_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,11 @@ pub enum Error {
Empty(()),
}

impl ToString for Error {
fn to_string(&self) -> String {
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::AppArgsParser(message) => message.clone(),
Error::Tls(message) => message.clone(),
_ => format!("{:?}", self),
Error::AppArgsParser(message) | Error::Tls(message) => write!(f, "{}", message),
_ => write!(f, "{:?}", self),
}
}
}
Expand Down
Loading

0 comments on commit 99241ad

Please sign in to comment.