From 3f82f025225c71715fd0774f54c34ff845643a10 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Thu, 16 Nov 2023 05:16:10 -0700 Subject: [PATCH 1/8] Update message_handler.rs --- .../src/message_handler.rs | 71 ++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/rust/libraries/acars_connection_manager/src/message_handler.rs b/rust/libraries/acars_connection_manager/src/message_handler.rs index 09c7661b..9f3c7612 100644 --- a/rust/libraries/acars_connection_manager/src/message_handler.rs +++ b/rust/libraries/acars_connection_manager/src/message_handler.rs @@ -17,6 +17,11 @@ use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::time::{sleep, Duration}; +pub struct FrequencyCount { + freq: String, + count: u32, +} + #[derive(Clone, Debug, Default)] pub struct MessageHandlerConfig { pub add_proxy_id: bool, @@ -65,6 +70,8 @@ impl MessageHandlerConfig { Arc::new(Mutex::new(VecDeque::with_capacity(100))); let total_messages_processed: Arc> = Arc::new(Mutex::new(0)); let total_messages_since_last: Arc> = Arc::new(Mutex::new(0)); + let all_frequencies_logged: Arc>> = + Arc::new(Mutex::new(Vec::new())); let queue_type_stats: String = self.queue_type.clone(); let queue_type_dedupe: String = self.queue_type.clone(); let stats_every: u64 = self.stats_every * 60; // Value has to be in seconds. Input is in minutes. @@ -77,11 +84,14 @@ impl MessageHandlerConfig { let stats_total_messages_context: Arc> = Arc::clone(&total_messages_processed); let stats_total_messages_since_last_context: Arc> = Arc::clone(&total_messages_since_last); + let stats_frequency_context: Arc>> = + Arc::clone(&all_frequencies_logged); tokio::spawn(async move { print_stats( stats_total_messages_context, stats_total_messages_since_last_context, + stats_frequency_context, stats_every, queue_type_stats.as_str(), ) @@ -132,6 +142,56 @@ impl MessageHandlerConfig { Err(_) => f64::default(), }; + // See if the frequency is in the list of frequencies we've seen + // If not, add it to the list and log it + // match the message type + + match &message { + AcarsVdlm2Message::Vdlm2Message(m) => { + // get the freq from Vdlm2Message::Vdlm2Body + let frequency: String = m.vdl2.freq.to_string(); + // check and see if we have the frequency in all_frequencies_logged. If so, increment the count. + // if not, add it + let mut found: bool = false; + for freq in all_frequencies_logged.lock().await.iter_mut() { + if freq.freq == frequency { + freq.count += 1; + found = true; + break; + } + } + + if !found { + let new_frequency: FrequencyCount = FrequencyCount { + freq: frequency, + count: 1, + }; + all_frequencies_logged.lock().await.push(new_frequency); + } + } + AcarsVdlm2Message::AcarsMessage(m) => { + // get the freq from AcarsMessage::AcarsBody + let frequency: String = m.freq.to_string(); + + let mut found: bool = false; + for freq in all_frequencies_logged.lock().await.iter_mut() { + if freq.freq == frequency { + freq.count += 1; + found = true; + break; + } + } + + if !found { + let new_frequency: FrequencyCount = FrequencyCount { + freq: frequency, + count: 1, + }; + all_frequencies_logged.lock().await.push(new_frequency); + } + } + } + let get_message_time: Option = message.get_time(); match get_message_time { @@ -251,15 +311,24 @@ impl MessageHandlerConfig { pub async fn print_stats( total_all_time: Arc>, total_since_last: Arc>, + frequencies: Arc>>, stats_every: u64, queue_type: &str, ) { let stats_minutes = stats_every / 60; loop { sleep(Duration::from_secs(stats_every)).await; + let total_all_time_locked = *total_all_time.lock().await; info!("{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}", - queue_type, stats_minutes, total_all_time.lock().await, total_since_last.lock().await); + queue_type, stats_minutes, total_all_time_locked, total_since_last.lock().await); *total_since_last.lock().await = 0; + + // now print the frequencies, and show each as a percentage of the total_all_time + + for freq in frequencies.lock().await.iter_mut() { + let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0; + info!("{}: {} ({:.2}%)", freq.freq, freq.count, percentage); + } } } From f912794b54434e68ce492f3148ccf387693c9c21 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Thu, 16 Nov 2023 18:24:09 -0700 Subject: [PATCH 2/8] create ability to quiet extra stat logging --- README.md | 1 + rust/libraries/acars_config/src/lib.rs | 3 +++ rust/libraries/acars_connection_manager/src/message_handler.rs | 3 +++ 3 files changed, 7 insertions(+) diff --git a/README.md b/README.md index dde04398..a45ffea8 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ services: - AR_SEND_UDP_VDLM2=acarshub:5555 - AR_RECV_ZMQ_VDLM2=dumpvdl2:45555 - AR_OVERRIDE_STATION_NAME=${FEEDER_NAME} + - AR_STATS_VERBOSE=false tmpfs: - /run:exec,size=64M - /var/log diff --git a/rust/libraries/acars_config/src/lib.rs b/rust/libraries/acars_config/src/lib.rs index b5a290fd..55ecaf3b 100644 --- a/rust/libraries/acars_config/src/lib.rs +++ b/rust/libraries/acars_config/src/lib.rs @@ -44,6 +44,9 @@ pub struct Input { /// Print statistics every N minutes #[clap(long, env = "AR_STATS_EVERY", value_parser, default_value = "5")] pub stats_every: u64, + /// Chatty logging of stats + #[clap(long, env = "AR_STATS_VERBOSE", value_parser)] + pub stats_verbose: bool, /// Attempt message reassembly on incomplete messages within the specified number of seconds #[clap( long, diff --git a/rust/libraries/acars_connection_manager/src/message_handler.rs b/rust/libraries/acars_connection_manager/src/message_handler.rs index 9f3c7612..f9116089 100644 --- a/rust/libraries/acars_connection_manager/src/message_handler.rs +++ b/rust/libraries/acars_connection_manager/src/message_handler.rs @@ -32,6 +32,7 @@ pub struct MessageHandlerConfig { pub should_override_station_name: bool, pub station_name: String, pub stats_every: u64, + pub stats_verbose: bool, } impl MessageHandlerConfig { @@ -46,6 +47,7 @@ impl MessageHandlerConfig { should_override_station_name: args.override_station_name.is_some(), station_name: station_name.to_string(), stats_every: args.stats_every, + stats_verbose: args.stats_verbose, } } else { Self { @@ -57,6 +59,7 @@ impl MessageHandlerConfig { should_override_station_name: false, station_name: Default::default(), stats_every: args.stats_every, + stats_verbose: args.stats_verbose, } } } From cefad19a3a829d9bf2021ba109cef05e3bb15fb9 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Thu, 16 Nov 2023 18:28:20 -0700 Subject: [PATCH 3/8] reformat output to be nicer --- .../acars_connection_manager/src/message_handler.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/rust/libraries/acars_connection_manager/src/message_handler.rs b/rust/libraries/acars_connection_manager/src/message_handler.rs index f9116089..97d2b04e 100644 --- a/rust/libraries/acars_connection_manager/src/message_handler.rs +++ b/rust/libraries/acars_connection_manager/src/message_handler.rs @@ -322,16 +322,21 @@ pub async fn print_stats( loop { sleep(Duration::from_secs(stats_every)).await; let total_all_time_locked = *total_all_time.lock().await; - info!("{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}", - queue_type, stats_minutes, total_all_time_locked, total_since_last.lock().await); + let mut output: String = String::new(); + output.push_str(&format!( + "{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}\n", + queue_type, stats_minutes, total_all_time_locked, total_since_last.lock().await + )); *total_since_last.lock().await = 0; // now print the frequencies, and show each as a percentage of the total_all_time for freq in frequencies.lock().await.iter_mut() { let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0; - info!("{}: {} ({:.2}%)", freq.freq, freq.count, percentage); + output.push_str(format!("{} {}: {}/{} ({:.2}%)\n", queue_type, freq.freq, freq.count, total_all_time_locked, percentage)); } + + println!("{}", output); } } From 475075fda770bd77c492c7a8fff57b228cf5b158 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Thu, 16 Nov 2023 18:35:07 -0700 Subject: [PATCH 4/8] Update message_handler.rs --- .../acars_connection_manager/src/message_handler.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rust/libraries/acars_connection_manager/src/message_handler.rs b/rust/libraries/acars_connection_manager/src/message_handler.rs index 97d2b04e..ec50a7e9 100644 --- a/rust/libraries/acars_connection_manager/src/message_handler.rs +++ b/rust/libraries/acars_connection_manager/src/message_handler.rs @@ -333,7 +333,13 @@ pub async fn print_stats( for freq in frequencies.lock().await.iter_mut() { let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0; - output.push_str(format!("{} {}: {}/{} ({:.2}%)\n", queue_type, freq.freq, freq.count, total_all_time_locked, percentage)); + output.push_str( + format!( + "{} {}: {}/{} ({:.2}%)\n", + queue_type, freq.freq, freq.count, total_all_time_locked, percentage + ) + .as_str(), + ); } println!("{}", output); From 711a5c8d3dce020ed5d8980edf820347b6075d34 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Thu, 16 Nov 2023 22:50:01 -0700 Subject: [PATCH 5/8] Actually silence output of extra stats --- .../src/message_handler.rs | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/rust/libraries/acars_connection_manager/src/message_handler.rs b/rust/libraries/acars_connection_manager/src/message_handler.rs index ec50a7e9..06fb47ab 100644 --- a/rust/libraries/acars_connection_manager/src/message_handler.rs +++ b/rust/libraries/acars_connection_manager/src/message_handler.rs @@ -87,8 +87,12 @@ impl MessageHandlerConfig { let stats_total_messages_context: Arc> = Arc::clone(&total_messages_processed); let stats_total_messages_since_last_context: Arc> = Arc::clone(&total_messages_since_last); - let stats_frequency_context: Arc>> = - Arc::clone(&all_frequencies_logged); + let stats_frequency_context: Option(Arc>>) = if self.stats_verbose + { + Some(Arc::clone(&all_frequencies_logged)) + } else { + None + }; tokio::spawn(async move { print_stats( @@ -314,7 +318,7 @@ impl MessageHandlerConfig { pub async fn print_stats( total_all_time: Arc>, total_since_last: Arc>, - frequencies: Arc>>, + frequencies: Option(Arc>>), stats_every: u64, queue_type: &str, ) { @@ -331,15 +335,22 @@ pub async fn print_stats( // now print the frequencies, and show each as a percentage of the total_all_time - for freq in frequencies.lock().await.iter_mut() { - let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0; - output.push_str( - format!( - "{} {}: {}/{} ({:.2}%)\n", - queue_type, freq.freq, freq.count, total_all_time_locked, percentage - ) - .as_str(), - ); + match frequencies { + None => {} + Some(frequencies) => { + // now print the frequencies, and show each as a percentage of the total_all_time + for freq in frequencies.lock().await.iter_mut() { + let percentage: f64 = + (freq.count as f64 / total_all_time_locked as f64) * 100.0; + output.push_str( + format!( + "{} {}: {}/{} ({:.2}%)\n", + queue_type, freq.freq, freq.count, total_all_time_locked, percentage + ) + .as_str(), + ); + } + } } println!("{}", output); From e66323b303f076700e877d228afbdc4e4e61506c Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Thu, 16 Nov 2023 22:54:32 -0700 Subject: [PATCH 6/8] fix issues --- .../src/message_handler.rs | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/rust/libraries/acars_connection_manager/src/message_handler.rs b/rust/libraries/acars_connection_manager/src/message_handler.rs index 06fb47ab..97012f5f 100644 --- a/rust/libraries/acars_connection_manager/src/message_handler.rs +++ b/rust/libraries/acars_connection_manager/src/message_handler.rs @@ -87,7 +87,7 @@ impl MessageHandlerConfig { let stats_total_messages_context: Arc> = Arc::clone(&total_messages_processed); let stats_total_messages_since_last_context: Arc> = Arc::clone(&total_messages_since_last); - let stats_frequency_context: Option(Arc>>) = if self.stats_verbose + let stats_frequency_context: Option>>> = if self.stats_verbose { Some(Arc::clone(&all_frequencies_logged)) } else { @@ -318,7 +318,7 @@ impl MessageHandlerConfig { pub async fn print_stats( total_all_time: Arc>, total_since_last: Arc>, - frequencies: Option(Arc>>), + frequencies: Option>>>, stats_every: u64, queue_type: &str, ) { @@ -335,21 +335,16 @@ pub async fn print_stats( // now print the frequencies, and show each as a percentage of the total_all_time - match frequencies { - None => {} - Some(frequencies) => { - // now print the frequencies, and show each as a percentage of the total_all_time - for freq in frequencies.lock().await.iter_mut() { - let percentage: f64 = - (freq.count as f64 / total_all_time_locked as f64) * 100.0; - output.push_str( - format!( - "{} {}: {}/{} ({:.2}%)\n", - queue_type, freq.freq, freq.count, total_all_time_locked, percentage - ) - .as_str(), - ); - } + if let Some(f) = &frequencies { + for freq in f.lock().await.iter_mut() { + let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0; + output.push_str( + format!( + "{} {}: {}/{} ({:.2}%)\n", + queue_type, freq.freq, freq.count, total_all_time_locked, percentage + ) + .as_str(), + ); } } From b96b40d436ab9642ed79b02067a21affd6098512 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Thu, 16 Nov 2023 23:00:34 -0700 Subject: [PATCH 7/8] bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b7398a13..ab597bd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "1.0.18" +version = "1.0.19" authors = ["Fred Clausen", "Mike Nye", "Alex Austin"] description = "ACARS Router: A Utility to ingest ACARS/VDLM2 from many sources, process, and feed out to many consumers." documentation = "https://github.com/sdr-enthusiasts/acars_router" From 4e2e1bdf1a86a4792b2dc85ea44f237212f5e3d4 Mon Sep 17 00:00:00 2001 From: Fred Clausen <43556888+fredclausen@users.noreply.github.com> Date: Thu, 16 Nov 2023 23:10:51 -0700 Subject: [PATCH 8/8] sort by count --- .../acars_connection_manager/src/message_handler.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rust/libraries/acars_connection_manager/src/message_handler.rs b/rust/libraries/acars_connection_manager/src/message_handler.rs index 97012f5f..ee3e951e 100644 --- a/rust/libraries/acars_connection_manager/src/message_handler.rs +++ b/rust/libraries/acars_connection_manager/src/message_handler.rs @@ -336,7 +336,12 @@ pub async fn print_stats( // now print the frequencies, and show each as a percentage of the total_all_time if let Some(f) = &frequencies { - for freq in f.lock().await.iter_mut() { + // sort the frequencies by count + if let Some(f) = &frequencies { + f.lock().await.sort_by(|a, b| b.count.cmp(&a.count)); + } + + for freq in f.lock().await.iter() { let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0; output.push_str( format!(