Skip to content

Commit

Permalink
More verbose logging (#266)
Browse files Browse the repository at this point in the history
* refactor stats, add in testing, and add make logs clearer it's showing stats from container start

* formatting changes

* use info and not println (duh)

* bump version

* pass in host name to updated stubborn io
  • Loading branch information
fredclausen authored Nov 18, 2023
1 parent 0d96bf7 commit d5cf682
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ resolver = "2"

[workspace.package]
edition = "2021"
version = "1.0.19"
version = "1.0.20"
authors = ["Fred Clausen", "Mike Nye", "Alex Austin"]
description = "ACARS Router: A Utility to ingest ACARS/VDLM2 from many sources, process, and feed out to many consumers."
documentation = "https://github.com/sdr-enthusiasts/acars_router"
Expand Down
2 changes: 1 addition & 1 deletion rust/libraries/acars_connection_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
stubborn-io = "0.3.2"
stubborn-io = { git = "https://github.com/sdr-enthusiasts/sdre-stubborn-io.git" }
log = "0.4.20"
tokio = { version = "1.34.0", features = ["full", "tracing"] }
tokio-util = { version = "0.7.10", features = ["full"] }
Expand Down
3 changes: 2 additions & 1 deletion rust/libraries/acars_connection_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ pub(crate) struct OutputServerConfig {
// to attempt to reconnect
// See: https://docs.rs/stubborn-io/latest/src/stubborn_io/config.rs.html#93

pub fn reconnect_options() -> ReconnectOptions {
pub fn reconnect_options(host: &str) -> ReconnectOptions {
ReconnectOptions::new()
.with_exit_if_first_connect_fails(false)
.with_retries_generator(get_our_standard_reconnect_strategy)
.with_connection_name(host)
}

fn get_our_standard_reconnect_strategy() -> DurationIterator {
Expand Down
149 changes: 135 additions & 14 deletions rust/libraries/acars_connection_manager/src/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,43 @@ impl MessageHandlerConfig {
}
}

pub fn print_formatted_stats(
total_all_time: i32,
total_since_last: i32,
frequencies: Option<Vec<FrequencyCount>>,
stats_minutes: u64,
queue_type: &str,
has_counted_freqs: bool,
) -> String {
let mut output: String = String::new();
output.push_str(&format!(
"{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}\n",
queue_type, stats_minutes, total_all_time, total_since_last
));

if has_counted_freqs && frequencies.is_some() {
output.push_str(
format!("{} Frequencies logged since container start:\n", queue_type).as_str(),
);
if let Some(freqs) = frequencies {
for freq in freqs.iter() {
let percentage: f64 = (freq.count as f64 / total_all_time as f64) * 100.0;
output.push_str(
format!(
"\n{} {}: {}/{} ({:.2}%)",
queue_type, freq.freq, freq.count, total_all_time, percentage
)
.as_str(),
);
}
}
} else if frequencies.is_some() {
output.push_str(format!("{} No frequencies logged.\n", queue_type).as_str());
}

output
}

pub async fn print_stats(
total_all_time: Arc<Mutex<i32>>,
total_since_last: Arc<Mutex<i32>>,
Expand All @@ -323,14 +360,12 @@ pub async fn print_stats(
queue_type: &str,
) {
let stats_minutes = stats_every / 60;
let mut has_counted_freqs = false;
loop {
sleep(Duration::from_secs(stats_every)).await;
let total_all_time_locked = *total_all_time.lock().await;
let mut output: String = String::new();
output.push_str(&format!(
"{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}\n",
queue_type, stats_minutes, total_all_time_locked, total_since_last.lock().await
));
let total_since_last_locked = *total_since_last.lock().await;

*total_since_last.lock().await = 0;

// now print the frequencies, and show each as a percentage of the total_all_time
Expand All @@ -341,19 +376,38 @@ pub async fn print_stats(
f.lock().await.sort_by(|a, b| b.count.cmp(&a.count));
}

if !has_counted_freqs && f.lock().await.len() > 0 {
has_counted_freqs = true;
}
}

let mut freqs_locked = None;

// TODO: This thing is ugly and bad
if let Some(f) = &frequencies {
// copy the contents of the frequencies vector into a new vector
let mut freqs: Vec<FrequencyCount> = Vec::new();
for freq in f.lock().await.iter() {
let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0;
output.push_str(
format!(
"{} {}: {}/{} ({:.2}%)\n",
queue_type, freq.freq, freq.count, total_all_time_locked, percentage
)
.as_str(),
);
freqs.push(FrequencyCount {
freq: freq.freq.clone(),
count: freq.count,
});
}

freqs_locked = Some(freqs);
}

println!("{}", output);
info!(
"{}",
print_formatted_stats(
total_all_time_locked,
total_since_last_locked,
freqs_locked,
stats_minutes,
queue_type,
has_counted_freqs
)
);
}
}

Expand Down Expand Up @@ -411,3 +465,70 @@ fn hash_message(mut message: AcarsVdlm2Message) -> MessageResult<u64> {
}
}
}

// create a test for print_stats

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_print_stats() {
let total_all_time: i32 = 100;
let total_since_last: i32 = 100;
let mut frequencies: Vec<FrequencyCount> = Vec::new();
let stats_every: u64 = 1;
let queue_type: &str = "TEST";

// fill the frequencies with some data

let freq_three: FrequencyCount = FrequencyCount {
freq: "134.525".to_string(),
count: 70,
};

frequencies.push(freq_three);

let freq_one: FrequencyCount = FrequencyCount {
freq: "131.550".to_string(),
count: 20,
};

frequencies.push(freq_one);

let freq_two: FrequencyCount = FrequencyCount {
freq: "131.525".to_string(),
count: 10,
};

frequencies.push(freq_two);

let output_with_extra_stats = print_formatted_stats(
total_all_time,
total_since_last,
Some(frequencies),
stats_every,
queue_type,
true,
);

assert_eq!(
output_with_extra_stats,
"TEST in the last 1 minute(s):\nTotal messages processed: 100\nTotal messages processed since last update: 100\nTEST Frequencies logged since container start:\n\nTEST 134.525: 70/100 (70.00%)\nTEST 131.550: 20/100 (20.00%)\nTEST 131.525: 10/100 (10.00%)"
);

let output_without_extra_stats = print_formatted_stats(
total_all_time,
total_since_last,
None,
stats_every,
queue_type,
false,
);

assert_eq!(
output_without_extra_stats,
"TEST in the last 1 minute(s):\nTotal messages processed: 100\nTotal messages processed since last update: 100\n"
);
}
}
5 changes: 3 additions & 2 deletions rust/libraries/acars_connection_manager/src/service_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ impl SenderServers for Arc<Mutex<Vec<Sender<AcarsVdlm2Message>>>> {
async fn start_tcp(self, socket_type: &str, host: &str) {
// Start a TCP sender server for {server_type}
let socket: Result<StubbornIo<TcpStream, String>, io::Error> =
StubbornTcpStream::connect_with_options(host.to_string(), reconnect_options()).await;
StubbornTcpStream::connect_with_options(host.to_string(), reconnect_options(host))
.await;
match socket {
Err(e) => error!("[TCP SENDER {socket_type}]: Error connecting to {host}: {e}"),
Ok(socket) => {
Expand Down Expand Up @@ -537,7 +538,7 @@ impl SocketListenerServer {
let open_stream: io::Result<StubbornIo<TcpStream, SocketAddr>> =
StubbornTcpStream::connect_with_options(
socket_address,
reconnect_options(),
reconnect_options(&self.proto_name),
)
.await;
match open_stream {
Expand Down
6 changes: 5 additions & 1 deletion rust/libraries/acars_connection_manager/src/tcp_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ impl TCPReceiverServer {
}
};

let stream = match StubbornTcpStream::connect_with_options(addr, reconnect_options()).await
let stream = match StubbornTcpStream::connect_with_options(
addr,
reconnect_options(&self.proto_name),
)
.await
{
Ok(stream) => stream,
Err(e) => {
Expand Down

0 comments on commit d5cf682

Please sign in to comment.