From c7a23c26e087dbe9163ea68dde089ebfe3aefc29 Mon Sep 17 00:00:00 2001 From: momosh Date: Sat, 31 Aug 2024 10:42:08 +0200 Subject: [PATCH] outputting events from light client --- client/src/main.rs | 10 ++++--- core/src/light_client.rs | 59 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index 1dcb10853..8b2a3d90d 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -5,6 +5,7 @@ use avail_light_core::{ api, consts::EXPECTED_SYSTEM_VERSION, data::{self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, DB}, + light_client, maintenance, network::{ self, p2p::{self, BOOTSTRAP_LIST_EMPTY_MESSAGE}, @@ -31,7 +32,7 @@ use color_eyre::{ Result, }; use std::{fs, path::Path, sync::Arc}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use tracing::{error, info, span, trace, warn, Level}; #[cfg(feature = "network-analysis")] @@ -288,7 +289,7 @@ async fn run( } let static_config_params: MaintenanceConfig = (&cfg).into(); - spawn_in_span(shutdown.with_cancel(avail_light_core::maintenance::run( + spawn_in_span(shutdown.with_cancel(maintenance::run( p2p_client.clone(), ot_metrics.clone(), block_rx, @@ -302,14 +303,15 @@ async fn run( }; let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc); - - spawn_in_span(shutdown.with_cancel(avail_light_core::light_client::run( + let (lc_sender, _) = mpsc::channel::(1000); + spawn_in_span(shutdown.with_cancel(light_client::run( db.clone(), light_network_client, (&cfg).into(), ot_metrics.clone(), channels, shutdown.clone(), + lc_sender, ))); ot_metrics.count(MetricCounter::Starts).await; diff --git a/core/src/light_client.rs b/core/src/light_client.rs index 7d3f2fc3f..35abc57ee 100644 --- a/core/src/light_client.rs +++ b/core/src/light_client.rs @@ -25,6 +25,7 @@ use avail_rust::{ use codec::Encode; use color_eyre::Result; use std::{sync::Arc, time::Instant}; +use tokio::sync::mpsc; use tracing::{error, info}; use crate::{ @@ -39,6 +40,20 @@ use crate::{ utils::{calculate_confidence, extract_kate}, }; +pub enum OutputEvent { + RecordBlockProcessingDelay(f64), + CountSessionBlocks, + RecordBlockHeight(u32), + RecordDHTStats { + fetched: f64, + fetched_percentage: f64, + fetch_duration: f64, + }, + RecordRPCFetched(f64), + RecordRPCFetchDuration(f64), + RecordBlockConfidence(f64), +} + pub async fn process_block( db: impl Database, network_client: &impl network::Client, @@ -46,15 +61,18 @@ pub async fn process_block( cfg: &LightClientConfig, header: AvailHeader, received_at: Instant, + event_sender: mpsc::Sender, ) -> Result> { - metrics.count(MetricCounter::SessionBlocks).await; - metrics - .record(MetricValue::BlockHeight(header.number)) - .await; - let block_number = header.number; let header_hash: H256 = Encode::using_encoded(&header, blake2_256).into(); + metrics.count(MetricCounter::SessionBlocks).await; + event_sender.send(OutputEvent::CountSessionBlocks).await?; + metrics.record(MetricValue::BlockHeight(block_number)).await; + event_sender + .send(OutputEvent::RecordBlockHeight(block_number)) + .await?; + info!( { block_number, block_delay = received_at.elapsed().as_secs()}, "Processing finalized block", @@ -124,14 +142,28 @@ pub async fn process_block( )) .await; + event_sender + .send(OutputEvent::RecordDHTStats { + fetched: fetch_stats.dht_fetched, + fetched_percentage: fetch_stats.dht_fetched_percentage, + fetch_duration: fetch_stats.dht_fetch_duration, + }) + .await?; + if let Some(rpc_fetched) = fetch_stats.rpc_fetched { metrics.record(MetricValue::RPCFetched(rpc_fetched)).await; + _ = event_sender + .send(OutputEvent::RecordRPCFetched(rpc_fetched)) + .await; } if let Some(rpc_fetch_duration) = fetch_stats.rpc_fetch_duration { metrics .record(MetricValue::RPCFetchDuration(rpc_fetch_duration)) .await; + event_sender + .send(OutputEvent::RecordRPCFetchDuration(rpc_fetch_duration)) + .await?; } (positions.len(), fetched.len(), unfetched.len()) }, @@ -164,6 +196,9 @@ pub async fn process_block( metrics .record(MetricValue::BlockConfidence(confidence)) .await; + event_sender + .send(OutputEvent::RecordBlockConfidence(confidence)) + .await?; // push latest mined block's header into column family specified // for keeping block headers, to be used @@ -195,10 +230,12 @@ pub async fn run( metrics: Arc, mut channels: ClientChannels, shutdown: Controller, + event_sender: mpsc::Sender, ) { info!("Starting light client..."); loop { + let event_sender = event_sender.clone(); let (header, received_at) = match channels.rpc_event_receiver.recv().await { Ok(event) => match event { Event::HeaderUpdate { @@ -216,6 +253,14 @@ pub async fn run( metrics .record(MetricValue::BlockProcessingDelay(seconds.as_secs_f64())) .await; + if let Err(error) = event_sender + .send(OutputEvent::RecordBlockProcessingDelay( + seconds.as_secs_f64(), + )) + .await + { + error!("Cannot send OutputEvent message: {error}"); + } info!("Sleeping for {seconds:?} seconds"); tokio::time::sleep(seconds).await; } @@ -227,6 +272,7 @@ pub async fn run( &cfg, header.clone(), received_at, + event_sender, ) .await; let confidence = match process_block_result { @@ -275,6 +321,7 @@ mod tests { }; use hex_literal::hex; use test_case::test_case; + use tokio::sync::mpsc; #[test_case(99.9 => 10)] #[test_case(99.99 => CELL_COUNT_99_99)] @@ -337,6 +384,7 @@ mod tests { }), }; let recv = Instant::now(); + let (sender, _) = mpsc::channel::(1); mock_network_client .expect_fetch_verified() .returning(move |_, _, _, _, positions| { @@ -358,6 +406,7 @@ mod tests { &cfg, header, recv, + sender, ) .await .unwrap();