Skip to content

Commit

Permalink
outputting maintenance events
Browse files Browse the repository at this point in the history
  • Loading branch information
momosh-ethernal authored Sep 2, 2024
1 parent c9b31e5 commit 1d7ca61
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
2 changes: 2 additions & 0 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,14 @@ async fn run(
}

let static_config_params: MaintenanceConfig = (&cfg).into();
let (maintenance_sender, _) = mpsc::channel::<maintenance::OutputEvent>(1000);
spawn_in_span(shutdown.with_cancel(maintenance::run(
p2p_client.clone(),
ot_metrics.clone(),
block_rx,
static_config_params,
shutdown.clone(),
maintenance_sender,
)));

let channels = avail_light_core::types::ClientChannels {
Expand Down
36 changes: 33 additions & 3 deletions core/src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use color_eyre::{eyre::WrapErr, Result};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, error, info};

use crate::{
Expand All @@ -11,11 +11,23 @@ use crate::{
types::{BlockVerified, MaintenanceConfig},
};

pub enum OutputEvent {
FlushMetrics,
RecordStats {
connected_peers: usize,
block_confidence_treshold: f64,
replication_factor: u16,
query_timeout: u32,
},
CountUps,
}

pub async fn process_block(
block_number: u32,
p2p_client: &P2pClient,
maintenance_config: MaintenanceConfig,
metrics: &Arc<impl Metrics>,
event_sender: mpsc::Sender<OutputEvent>,
) -> Result<()> {
if cfg!(not(feature = "rocksdb")) && block_number % maintenance_config.pruning_interval == 0 {
info!(block_number, "Pruning...");
Expand All @@ -31,6 +43,7 @@ pub async fn process_block(
Ok(()) => info!(block_number, "Flushing metrics finished"),
Err(error) => error!(block_number, "Flushing metrics failed: {error:#}"),
}
event_sender.send(OutputEvent::FlushMetrics).await?;
}

p2p_client
Expand Down Expand Up @@ -78,7 +91,17 @@ pub async fn process_block(
maintenance_config.query_timeout.as_secs() as u32,
))
.await;
event_sender
.send(OutputEvent::RecordStats {
connected_peers: peers_num,
block_confidence_treshold: maintenance_config.block_confidence_treshold,
replication_factor: maintenance_config.replication_factor,
query_timeout: maintenance_config.query_timeout.as_secs() as u32,
})
.await?;

metrics.count(MetricCounter::Up).await;
event_sender.send(OutputEvent::CountUps).await?;

info!(block_number, map_size, "Maintenance completed");
Ok(())
Expand All @@ -90,13 +113,20 @@ pub async fn run(
mut block_receiver: broadcast::Receiver<BlockVerified>,
static_config_params: MaintenanceConfig,
shutdown: Controller<String>,
event_sender: mpsc::Sender<OutputEvent>,
) {
info!("Starting maintenance...");

loop {
let result = match block_receiver.recv().await {
Ok(block) => {
process_block(block.block_num, &p2p_client, static_config_params, &metrics).await
process_block(
block.block_num,
&p2p_client,
static_config_params,
&metrics,
event_sender.clone(),
)
.await
},
Err(error) => Err(error.into()),
};
Expand Down

0 comments on commit 1d7ca61

Please sign in to comment.