diff --git a/Cargo.toml b/Cargo.toml index ab597bd9..146444e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "1.0.19" +version = "1.0.20" authors = ["Fred Clausen", "Mike Nye", "Alex Austin"] description = "ACARS Router: A Utility to ingest ACARS/VDLM2 from many sources, process, and feed out to many consumers." documentation = "https://github.com/sdr-enthusiasts/acars_router" diff --git a/rust/libraries/acars_connection_manager/Cargo.toml b/rust/libraries/acars_connection_manager/Cargo.toml index d3ae8dfc..03ebc289 100644 --- a/rust/libraries/acars_connection_manager/Cargo.toml +++ b/rust/libraries/acars_connection_manager/Cargo.toml @@ -6,7 +6,7 @@ edition.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -stubborn-io = "0.3.2" +stubborn-io = { git = "https://github.com/sdr-enthusiasts/sdre-stubborn-io.git" } log = "0.4.20" tokio = { version = "1.34.0", features = ["full", "tracing"] } tokio-util = { version = "0.7.10", features = ["full"] } diff --git a/rust/libraries/acars_connection_manager/src/lib.rs b/rust/libraries/acars_connection_manager/src/lib.rs index 7375792b..a5a62c27 100644 --- a/rust/libraries/acars_connection_manager/src/lib.rs +++ b/rust/libraries/acars_connection_manager/src/lib.rs @@ -94,10 +94,11 @@ pub(crate) struct OutputServerConfig { // to attempt to reconnect // See: https://docs.rs/stubborn-io/latest/src/stubborn_io/config.rs.html#93 -pub fn reconnect_options() -> ReconnectOptions { +pub fn reconnect_options(host: &str) -> ReconnectOptions { ReconnectOptions::new() .with_exit_if_first_connect_fails(false) .with_retries_generator(get_our_standard_reconnect_strategy) + .with_connection_name(host) } fn get_our_standard_reconnect_strategy() -> DurationIterator { diff --git a/rust/libraries/acars_connection_manager/src/message_handler.rs b/rust/libraries/acars_connection_manager/src/message_handler.rs index ee3e951e..54b8e150 100644 --- a/rust/libraries/acars_connection_manager/src/message_handler.rs +++ b/rust/libraries/acars_connection_manager/src/message_handler.rs @@ -315,6 +315,43 @@ impl MessageHandlerConfig { } } +pub fn print_formatted_stats( + total_all_time: i32, + total_since_last: i32, + frequencies: Option>, + stats_minutes: u64, + queue_type: &str, + has_counted_freqs: bool, +) -> String { + let mut output: String = String::new(); + output.push_str(&format!( + "{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}\n", + queue_type, stats_minutes, total_all_time, total_since_last + )); + + if has_counted_freqs && frequencies.is_some() { + output.push_str( + format!("{} Frequencies logged since container start:\n", queue_type).as_str(), + ); + if let Some(freqs) = frequencies { + for freq in freqs.iter() { + let percentage: f64 = (freq.count as f64 / total_all_time as f64) * 100.0; + output.push_str( + format!( + "\n{} {}: {}/{} ({:.2}%)", + queue_type, freq.freq, freq.count, total_all_time, percentage + ) + .as_str(), + ); + } + } + } else if frequencies.is_some() { + output.push_str(format!("{} No frequencies logged.\n", queue_type).as_str()); + } + + output +} + pub async fn print_stats( total_all_time: Arc>, total_since_last: Arc>, @@ -323,14 +360,12 @@ pub async fn print_stats( queue_type: &str, ) { let stats_minutes = stats_every / 60; + let mut has_counted_freqs = false; loop { sleep(Duration::from_secs(stats_every)).await; let total_all_time_locked = *total_all_time.lock().await; - let mut output: String = String::new(); - output.push_str(&format!( - "{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}\n", - queue_type, stats_minutes, total_all_time_locked, total_since_last.lock().await - )); + let total_since_last_locked = *total_since_last.lock().await; + *total_since_last.lock().await = 0; // now print the frequencies, and show each as a percentage of the total_all_time @@ -341,19 +376,38 @@ pub async fn print_stats( f.lock().await.sort_by(|a, b| b.count.cmp(&a.count)); } + if !has_counted_freqs && f.lock().await.len() > 0 { + has_counted_freqs = true; + } + } + + let mut freqs_locked = None; + + // TODO: This thing is ugly and bad + if let Some(f) = &frequencies { + // copy the contents of the frequencies vector into a new vector + let mut freqs: Vec = Vec::new(); for freq in f.lock().await.iter() { - let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0; - output.push_str( - format!( - "{} {}: {}/{} ({:.2}%)\n", - queue_type, freq.freq, freq.count, total_all_time_locked, percentage - ) - .as_str(), - ); + freqs.push(FrequencyCount { + freq: freq.freq.clone(), + count: freq.count, + }); } + + freqs_locked = Some(freqs); } - println!("{}", output); + info!( + "{}", + print_formatted_stats( + total_all_time_locked, + total_since_last_locked, + freqs_locked, + stats_minutes, + queue_type, + has_counted_freqs + ) + ); } } @@ -411,3 +465,70 @@ fn hash_message(mut message: AcarsVdlm2Message) -> MessageResult { } } } + +// create a test for print_stats + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_print_stats() { + let total_all_time: i32 = 100; + let total_since_last: i32 = 100; + let mut frequencies: Vec = Vec::new(); + let stats_every: u64 = 1; + let queue_type: &str = "TEST"; + + // fill the frequencies with some data + + let freq_three: FrequencyCount = FrequencyCount { + freq: "134.525".to_string(), + count: 70, + }; + + frequencies.push(freq_three); + + let freq_one: FrequencyCount = FrequencyCount { + freq: "131.550".to_string(), + count: 20, + }; + + frequencies.push(freq_one); + + let freq_two: FrequencyCount = FrequencyCount { + freq: "131.525".to_string(), + count: 10, + }; + + frequencies.push(freq_two); + + let output_with_extra_stats = print_formatted_stats( + total_all_time, + total_since_last, + Some(frequencies), + stats_every, + queue_type, + true, + ); + + assert_eq!( + output_with_extra_stats, + "TEST in the last 1 minute(s):\nTotal messages processed: 100\nTotal messages processed since last update: 100\nTEST Frequencies logged since container start:\n\nTEST 134.525: 70/100 (70.00%)\nTEST 131.550: 20/100 (20.00%)\nTEST 131.525: 10/100 (10.00%)" + ); + + let output_without_extra_stats = print_formatted_stats( + total_all_time, + total_since_last, + None, + stats_every, + queue_type, + false, + ); + + assert_eq!( + output_without_extra_stats, + "TEST in the last 1 minute(s):\nTotal messages processed: 100\nTotal messages processed since last update: 100\n" + ); + } +} diff --git a/rust/libraries/acars_connection_manager/src/service_init.rs b/rust/libraries/acars_connection_manager/src/service_init.rs index 0bcbeb0a..0456e8a8 100644 --- a/rust/libraries/acars_connection_manager/src/service_init.rs +++ b/rust/libraries/acars_connection_manager/src/service_init.rs @@ -475,7 +475,8 @@ impl SenderServers for Arc>>> { async fn start_tcp(self, socket_type: &str, host: &str) { // Start a TCP sender server for {server_type} let socket: Result, io::Error> = - StubbornTcpStream::connect_with_options(host.to_string(), reconnect_options()).await; + StubbornTcpStream::connect_with_options(host.to_string(), reconnect_options(host)) + .await; match socket { Err(e) => error!("[TCP SENDER {socket_type}]: Error connecting to {host}: {e}"), Ok(socket) => { @@ -537,7 +538,7 @@ impl SocketListenerServer { let open_stream: io::Result> = StubbornTcpStream::connect_with_options( socket_address, - reconnect_options(), + reconnect_options(&self.proto_name), ) .await; match open_stream { diff --git a/rust/libraries/acars_connection_manager/src/tcp_services.rs b/rust/libraries/acars_connection_manager/src/tcp_services.rs index d079868f..905a72fe 100644 --- a/rust/libraries/acars_connection_manager/src/tcp_services.rs +++ b/rust/libraries/acars_connection_manager/src/tcp_services.rs @@ -180,7 +180,11 @@ impl TCPReceiverServer { } }; - let stream = match StubbornTcpStream::connect_with_options(addr, reconnect_options()).await + let stream = match StubbornTcpStream::connect_with_options( + addr, + reconnect_options(&self.proto_name), + ) + .await { Ok(stream) => stream, Err(e) => {