Skip to content

Commit

Permalink
Merge pull request #68 from Telecominfraproject/dev-fix-compressed-event
Browse files Browse the repository at this point in the history
Send unzipped AP State Event message to Kafka
  • Loading branch information
Cahb authored Jul 30, 2024
2 parents d6f8f26 + 23d2977 commit 91f3996
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures_util::{
stream::{SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};

use std::{net::SocketAddr, str::FromStr, sync::Arc};
use tokio::{
net::TcpStream,
Expand Down Expand Up @@ -268,6 +269,7 @@ impl CGWConnectionProcessor {
// Make sure we always track the as accurate as possible the time
// of receiving of the event (where needed).
let timestamp = Local::now();
let mut kafaka_msg: String = String::new();

match msg {
Ok(msg) => match msg {
Expand All @@ -278,7 +280,11 @@ impl CGWConnectionProcessor {
if let Ok(evt) =
cgw_ucentral_event_parse(&device_type, &payload, timestamp.timestamp())
{
kafaka_msg = payload.clone();
if let CGWUCentralEventType::State(_) = evt.evt_type {
if let Some(decompressed) = evt.decompressed.clone() {
kafaka_msg = decompressed;
}
if self.feature_topomap_enabled {
let topo_map = CGWUCentralTopologyMap::get_ref();
topo_map.process_state_message(&device_type, evt).await;
Expand Down Expand Up @@ -313,7 +319,7 @@ impl CGWConnectionProcessor {
}

self.cgw_server
.enqueue_mbox_message_from_device_to_nb_api_c(self.group_id, payload)?;
.enqueue_mbox_message_from_device_to_nb_api_c(self.group_id, kafaka_msg)?;
return Ok(CGWConnectionState::IsActive);
}
Ping(_t) => {
Expand Down
30 changes: 30 additions & 0 deletions src/cgw_ucentral_ap_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,29 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
}
}

// Replace compressed data
let mut origin_msg = map.clone();
let params_value = match Value::from_str(unzipped_data.as_str()) {
Ok(val) => val,
Err(_e) => {
return Err(Error::ConnectionProcessor(
"Failed to cast decompressed message to JSON Value",
));
}
};
if let Some(value) = origin_msg.get_mut("params") {
*value = params_value;
}

let kafka_msg = match serde_json::to_string(&origin_msg) {
Ok(msg) => msg,
Err(_e) => {
return Err(Error::ConnectionProcessor(
"Failed to create decompressed Event message",
));
}
};

let state_event = CGWUCentralEvent {
serial,
evt_type: CGWUCentralEventType::State(CGWUCentralEventState {
Expand All @@ -388,6 +411,7 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
links: clients_links,
},
}),
decompressed: Some(kafka_msg),
};

return Ok(state_event);
Expand Down Expand Up @@ -438,6 +462,7 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
links: clients_links,
},
}),
decompressed: None,
};

return Ok(state_event);
Expand Down Expand Up @@ -655,6 +680,7 @@ fn parse_realtime_event_data(
},
),
}),
decompressed: None,
})
}
"client.leave" => {
Expand Down Expand Up @@ -734,6 +760,7 @@ fn parse_realtime_event_data(
},
),
}),
decompressed: None,
})
}
_ => {
Expand Down Expand Up @@ -775,6 +802,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG
log: params["log"].to_string(),
severity: serde_json::from_value(params["severity"].clone())?,
}),
decompressed: None,
};

return Ok(log_event);
Expand Down Expand Up @@ -802,6 +830,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG
uuid: 1,
capabilities: caps,
}),
decompressed: None,
};

return Ok(connect_event);
Expand All @@ -822,6 +851,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG
let reply_event = CGWUCentralEvent {
serial: Default::default(),
evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }),
decompressed: None,
};

return Ok(reply_event);
Expand Down
2 changes: 2 additions & 0 deletions src/cgw_ucentral_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub enum CGWUCentralEventType {
pub struct CGWUCentralEvent {
pub serial: MacAddress,
pub evt_type: CGWUCentralEventType,
pub decompressed: Option<String>,
}

#[derive(Deserialize, Debug, Serialize)]
Expand Down Expand Up @@ -262,6 +263,7 @@ pub fn cgw_ucentral_parse_connect_event(message: Message) -> Result<CGWUCentralE
uuid: 1,
capabilities: caps,
}),
decompressed: None,
};

Ok(event)
Expand Down
2 changes: 2 additions & 0 deletions src/cgw_ucentral_switch_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub fn cgw_ucentral_switch_parse_message(
log: params["log"].to_string(),
severity: serde_json::from_value(params["severity"].clone())?,
}),
decompressed: None,
};

return Ok(log_event);
Expand Down Expand Up @@ -216,6 +217,7 @@ pub fn cgw_ucentral_switch_parse_message(
links: clients_links,
},
}),
decompressed: None,
};

return Ok(state_event);
Expand Down

0 comments on commit 91f3996

Please sign in to comment.