Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/topo map enhancements #77

Merged
merged 2 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions run_cgw.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export CGW_NB_INFRA_CERTS_PATH="${CGW_NB_INFRA_CERTS_PATH:-$DEFAULT_CERTS_PATH}"
export CGW_NB_INFRA_TLS="${CGW_NB_INFRA_TLS:-$DEFAULT_NB_INFRA_TLS}"
export CGW_UCENTRAL_AP_DATAMODEL_URI="${CGW_UCENTRAL_AP_DATAMODEL_URI:-$DEFAULT_UCENTRAL_AP_DATAMODEL_URI}"
export CGW_UCENTRAL_SWITCH_DATAMODEL_URI="${CGW_UCENTRAL_SWITCH_DATAMODEL_URI:-$DEFAULT_UCENTRAL_SWITCH_DATAMODEL_URI}"
export RUST_BACKTRACE=1

if [ -z "${!CGW_REDIS_USERNAME}" ]; then
export CGW_REDIS_USERNAME="${CGW_REDIS_USERNAME}"
Expand Down
48 changes: 34 additions & 14 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct CGWConnectionProcessor {
pub idx: i64,
pub group_id: i32,
pub feature_topomap_enabled: bool,
pub device_type: CGWDeviceType,
}

impl CGWConnectionProcessor {
Expand All @@ -82,6 +83,8 @@ impl CGWConnectionProcessor {
idx: conn_idx,
group_id: 0,
feature_topomap_enabled: server.feature_topomap_enabled,
// Default to AP, it's safe, as later-on it will be changed
device_type: CGWDeviceType::CGWDeviceAP,
};

conn_processor
Expand Down Expand Up @@ -187,7 +190,8 @@ impl CGWConnectionProcessor {
}

self.serial = evt.serial;
let device_type = CGWDeviceType::from_str(caps.platform.as_str())?;

self.device_type = CGWDeviceType::from_str(caps.platform.as_str())?;

// TODO: we accepted tls stream and split the WS into RX TX part,
// now we have to ASK cgw_connection_server's permission whether
Expand Down Expand Up @@ -253,16 +257,14 @@ impl CGWConnectionProcessor {
}
}

self.process_connection(stream, sink, mbox_rx, device_type)
.await;
self.process_connection(stream, sink, mbox_rx).await;

Ok(())
}

async fn process_wss_rx_msg(
&self,
msg: std::result::Result<Message, tungstenite::error::Error>,
device_type: CGWDeviceType,
fsm_state: &mut CGWUCentralMessageProcessorState,
pending_req_id: u64,
) -> Result<CGWConnectionState> {
Expand All @@ -278,20 +280,31 @@ impl CGWConnectionProcessor {
}
Text(payload) => {
if let Ok(evt) = cgw_ucentral_event_parse(
&device_type,
&self.device_type,
self.feature_topomap_enabled,
&payload,
timestamp.timestamp(),
) {
kafaka_msg = payload.clone();
kafaka_msg.clone_from(&payload);
if let CGWUCentralEventType::State(_) = evt.evt_type {
if let Some(decompressed) = evt.decompressed.clone() {
kafaka_msg = decompressed;
}
if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.process_state_message(&device_type, evt).await;
topo_map.debug_dump_map().await;

// TODO: remove this Arc clone:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment still relevant or should be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. it states that #306 should be removed, and some other way of accessing NB API should be made further on;
Currently, a copy of Arc is being passed around and copied, and we potentially have to come up with a new approach;

// Dirty hack for now: pass Arc ref of srv to topo map;
// Future rework and refactoring would require to separate
// NB api from being an internal obj of conn_server to be a
// standalone (singleton?) object.
topo_map.enqueue_event(
evt,
self.device_type,
self.serial,
self.group_id,
self.cgw_server.clone(),
);
}
} else if let CGWUCentralEventType::Reply(content) = evt.evt_type {
if *fsm_state != CGWUCentralMessageProcessorState::ResultPending {
Expand All @@ -313,10 +326,18 @@ impl CGWConnectionProcessor {
} else if let CGWUCentralEventType::RealtimeEvent(_) = evt.evt_type {
if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map
.process_device_topology_event(&device_type, evt)
.await;
topo_map.debug_dump_map().await;
// TODO: remove this Arc clone:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

// Dirty hack for now: pass Arc ref of srv to topo map;
// Future rework and refactoring would require to separate
// NB api from being an internal obj of conn_server to be a
// standalone (singleton?) object.
topo_map.enqueue_event(
evt,
self.device_type,
self.serial,
self.group_id,
self.cgw_server.clone(),
);
}
}
}
Expand Down Expand Up @@ -403,7 +424,6 @@ impl CGWConnectionProcessor {
mut stream: SStream,
mut sink: SSink,
mut mbox_rx: UnboundedReceiver<CGWConnectionProcessorReqMsg>,
device_type: CGWDeviceType,
) {
#[derive(Debug)]
enum WakeupReason {
Expand Down Expand Up @@ -554,7 +574,7 @@ impl CGWConnectionProcessor {
let rc = match wakeup_reason {
WakeupReason::WSSRxMsg(res) => {
last_contact = Instant::now();
self.process_wss_rx_msg(res, device_type, &mut fsm_state, pending_req_id)
self.process_wss_rx_msg(res, &mut fsm_state, pending_req_id)
.await
}
WakeupReason::MboxRx(mbox_message) => {
Expand Down
97 changes: 74 additions & 23 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -225,7 +225,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -244,7 +244,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -263,7 +263,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -282,7 +282,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand All @@ -301,7 +301,7 @@ impl CGWConnectionServer {
return Err(Error::ConnectionServer(format!(
"Failed to get runtime type {:?}, err: {}",
CGWRuntimeType::WssRxTx,
e.to_string()
e
)));
}
};
Expand Down Expand Up @@ -340,20 +340,31 @@ impl CGWConnectionServer {
}
};

let config_validator =
match CGWUCentralConfigValidators::new(app_args.validation_schema.clone()) {
Ok(validator) => validator,
Err(e) => {
error!(
"Can't create CGW Connection server: Config validator create failed: {}",
e.to_string(),
);
return Err(Error::ConnectionServer(format!(
"Can't create CGW Connection server: Config validator create failed: {}",
e.to_string(),
)));
}
};
// TODO: proper fix.
// Ugly W/A for now;
// The reason behind this change (W/A), is that underlying validator
// uses sync call, which panics (due to it being called in async
// context).
// The proper fix would to be refactor all constructors to be sync,
// but use spawn_blocking where needed in contextes that rely on the
// underlying async calls.
let app_args_clone = app_args.validation_schema.clone();
let get_config_validator_fut = tokio::task::spawn_blocking(move || {
CGWUCentralConfigValidators::new(app_args_clone).unwrap()
});
let config_validator = match get_config_validator_fut.await {
Ok(res) => res,
Err(e) => {
error!(
"Failed to retrieve json config validators: {}",
e.to_string()
);
return Err(Error::ConnectionServer(format!(
"Failed to retrieve json config validators: {}",
e
)));
}
};

let server = Arc::new(CGWConnectionServer {
allow_mismatch: app_args.wss_args.allow_mismatch,
Expand Down Expand Up @@ -412,6 +423,12 @@ impl CGWConnectionServer {
.await;
});

if server.feature_topomap_enabled {
info!("Topomap enabled, starting queue processor...");
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.start(&server.wss_rx_tx_runtime).await;
}

Ok(server)
}

Expand Down Expand Up @@ -828,6 +845,32 @@ impl CGWConnectionServer {
.await
{
Ok(()) => {
// We try to help free topomap memory usage
// by notifying it whenever GID get's destroyed.
// Howover, for allocation we let topo map
// handle it's mem alloc whenever necessary
// on it's own, when data from specific gid
// arrives - we rely on topo map to
// allocate necessary structures on it's own.
//
// In this way, we make sure that we handle
// properly the GID resoration scenario:
// if CGW restarts and loads GID info from
// DB, there would be no notification about
// create/del, and there's no need to
// oflload this responsibility to
// remote_discovery module for example,
// due to the fact that CGW is not designed
// for management of redis without CGW knowledge:
// if something disrupts the redis state / sql
// state without CGW's prior knowledge,
// it's not a responsibility of CGW to be aware
// of such changes and handle it correspondingly.
if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.remove_gid(gid).await;
}

if let Ok(resp) =
cgw_construct_infra_group_delete_response(gid, uuid, true, None)
{
Expand Down Expand Up @@ -1237,7 +1280,7 @@ impl CGWConnectionServer {
if let Ok(resp) = cgw_construct_device_enqueue_response(
uuid,
false,
Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\n{}", e.to_string())),
Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\n{}", e)),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand Down Expand Up @@ -1377,6 +1420,8 @@ impl CGWConnectionServer {
conn_processor_mbox_tx,
) = msg
{
let device_platform: String = caps.platform.clone();

// if connection is unique: simply insert new conn
//
// if duplicate exists: notify server about such incident.
Expand Down Expand Up @@ -1540,7 +1585,9 @@ impl CGWConnectionServer {

if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.insert_device(&device_mac).await;
topo_map
.insert_device(&device_mac, device_platform.as_str(), device_group_id)
.await;
}

connmap_w_lock.insert(device_mac, conn_processor_mbox_tx);
Expand All @@ -1551,6 +1598,7 @@ impl CGWConnectionServer {
let _ = conn_processor_mbox_tx_clone.send(msg);
});
} else if let CGWConnectionServerReqMsg::ConnectionClosed(device_mac) = msg {
let mut device_group_id: i32 = 0;
// Insert device to disconnected device list
{
let queue_lock = CGW_MESSAGES_QUEUE.read().await;
Expand All @@ -1565,6 +1613,7 @@ impl CGWConnectionServer {

let mut devices_cache = self.devices_cache.write().await;
if let Some(device) = devices_cache.get_device_mut(&device_mac) {
device_group_id = device.get_device_group_id();
if device.get_device_remains_in_db() {
device.set_device_state(CGWDeviceState::CGWDeviceDisconnected);
} else {
Expand All @@ -1574,7 +1623,9 @@ impl CGWConnectionServer {

if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.remove_device(&device_mac).await;
topo_map
.remove_device(&device_mac, device_group_id, self.clone())
.await;
}

CGWMetrics::get_ref().change_counter(
Expand Down
11 changes: 6 additions & 5 deletions src/cgw_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ pub struct CGWDeviceCapabilities {

impl CGWDeviceCapabilities {
pub fn update_device_capabilities(&mut self, new_capabilities: &CGWDeviceCapabilities) {
self.firmware = new_capabilities.firmware.clone();
self.firmware.clone_from(&new_capabilities.firmware);
self.uuid = new_capabilities.uuid;
self.compatible = new_capabilities.compatible.clone();
self.model = new_capabilities.model.clone();
self.platform = new_capabilities.platform.clone();
self.label_macaddr = new_capabilities.label_macaddr.clone();
self.compatible.clone_from(&new_capabilities.compatible);
self.model.clone_from(&new_capabilities.model);
self.platform.clone_from(&new_capabilities.platform);
self.label_macaddr
.clone_from(&new_capabilities.label_macaddr);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/cgw_devices_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl CGWDevicesCache {
}

pub fn check_device_exists(&self, key: &MacAddress) -> bool {
self.cache.get(key).is_some()
self.cache.contains_key(key)
}

pub fn get_device_mut(&mut self, key: &MacAddress) -> Option<&mut CGWDevice> {
Expand Down
9 changes: 4 additions & 5 deletions src/cgw_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,11 @@ pub enum Error {
Empty(()),
}

impl ToString for Error {
fn to_string(&self) -> String {
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::AppArgsParser(message) => message.clone(),
Error::Tls(message) => message.clone(),
_ => format!("{:?}", self),
Error::AppArgsParser(message) | Error::Tls(message) => write!(f, "{}", message),
_ => write!(f, "{:?}", self),
}
}
}
Expand Down
Loading