Skip to content

Commit

Permalink
outputting events from light client
Browse files Browse the repository at this point in the history
  • Loading branch information
momosh-ethernal committed Aug 31, 2024
1 parent b857a35 commit c7a23c2
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 9 deletions.
10 changes: 6 additions & 4 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use avail_light_core::{
api,
consts::EXPECTED_SYSTEM_VERSION,
data::{self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, DB},
light_client, maintenance,
network::{
self,
p2p::{self, BOOTSTRAP_LIST_EMPTY_MESSAGE},
Expand All @@ -31,7 +32,7 @@ use color_eyre::{
Result,
};
use std::{fs, path::Path, sync::Arc};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};
use tracing::{error, info, span, trace, warn, Level};

#[cfg(feature = "network-analysis")]
Expand Down Expand Up @@ -288,7 +289,7 @@ async fn run(
}

let static_config_params: MaintenanceConfig = (&cfg).into();
spawn_in_span(shutdown.with_cancel(avail_light_core::maintenance::run(
spawn_in_span(shutdown.with_cancel(maintenance::run(
p2p_client.clone(),
ot_metrics.clone(),
block_rx,
Expand All @@ -302,14 +303,15 @@ async fn run(
};

let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc);

spawn_in_span(shutdown.with_cancel(avail_light_core::light_client::run(
let (lc_sender, _) = mpsc::channel::<light_client::OutputEvent>(1000);
spawn_in_span(shutdown.with_cancel(light_client::run(
db.clone(),
light_network_client,
(&cfg).into(),
ot_metrics.clone(),
channels,
shutdown.clone(),
lc_sender,
)));

ot_metrics.count(MetricCounter::Starts).await;
Expand Down
59 changes: 54 additions & 5 deletions core/src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use avail_rust::{
use codec::Encode;
use color_eyre::Result;
use std::{sync::Arc, time::Instant};
use tokio::sync::mpsc;
use tracing::{error, info};

use crate::{
Expand All @@ -39,22 +40,39 @@ use crate::{
utils::{calculate_confidence, extract_kate},
};

pub enum OutputEvent {
RecordBlockProcessingDelay(f64),
CountSessionBlocks,
RecordBlockHeight(u32),
RecordDHTStats {
fetched: f64,
fetched_percentage: f64,
fetch_duration: f64,
},
RecordRPCFetched(f64),
RecordRPCFetchDuration(f64),
RecordBlockConfidence(f64),
}

pub async fn process_block(
db: impl Database,
network_client: &impl network::Client,
metrics: &Arc<impl Metrics>,
cfg: &LightClientConfig,
header: AvailHeader,
received_at: Instant,
event_sender: mpsc::Sender<OutputEvent>,
) -> Result<Option<f64>> {
metrics.count(MetricCounter::SessionBlocks).await;
metrics
.record(MetricValue::BlockHeight(header.number))
.await;

let block_number = header.number;
let header_hash: H256 = Encode::using_encoded(&header, blake2_256).into();

metrics.count(MetricCounter::SessionBlocks).await;
event_sender.send(OutputEvent::CountSessionBlocks).await?;
metrics.record(MetricValue::BlockHeight(block_number)).await;
event_sender
.send(OutputEvent::RecordBlockHeight(block_number))
.await?;

info!(
{ block_number, block_delay = received_at.elapsed().as_secs()},
"Processing finalized block",
Expand Down Expand Up @@ -124,14 +142,28 @@ pub async fn process_block(
))
.await;

event_sender
.send(OutputEvent::RecordDHTStats {
fetched: fetch_stats.dht_fetched,
fetched_percentage: fetch_stats.dht_fetched_percentage,
fetch_duration: fetch_stats.dht_fetch_duration,
})
.await?;

if let Some(rpc_fetched) = fetch_stats.rpc_fetched {
metrics.record(MetricValue::RPCFetched(rpc_fetched)).await;
_ = event_sender
.send(OutputEvent::RecordRPCFetched(rpc_fetched))
.await;
}

if let Some(rpc_fetch_duration) = fetch_stats.rpc_fetch_duration {
metrics
.record(MetricValue::RPCFetchDuration(rpc_fetch_duration))
.await;
event_sender
.send(OutputEvent::RecordRPCFetchDuration(rpc_fetch_duration))
.await?;
}
(positions.len(), fetched.len(), unfetched.len())
},
Expand Down Expand Up @@ -164,6 +196,9 @@ pub async fn process_block(
metrics
.record(MetricValue::BlockConfidence(confidence))
.await;
event_sender
.send(OutputEvent::RecordBlockConfidence(confidence))
.await?;

// push latest mined block's header into column family specified
// for keeping block headers, to be used
Expand Down Expand Up @@ -195,10 +230,12 @@ pub async fn run(
metrics: Arc<impl Metrics>,
mut channels: ClientChannels,
shutdown: Controller<String>,
event_sender: mpsc::Sender<OutputEvent>,
) {
info!("Starting light client...");

loop {
let event_sender = event_sender.clone();
let (header, received_at) = match channels.rpc_event_receiver.recv().await {
Ok(event) => match event {
Event::HeaderUpdate {
Expand All @@ -216,6 +253,14 @@ pub async fn run(
metrics
.record(MetricValue::BlockProcessingDelay(seconds.as_secs_f64()))
.await;
if let Err(error) = event_sender
.send(OutputEvent::RecordBlockProcessingDelay(
seconds.as_secs_f64(),
))
.await
{
error!("Cannot send OutputEvent message: {error}");
}
info!("Sleeping for {seconds:?} seconds");
tokio::time::sleep(seconds).await;
}
Expand All @@ -227,6 +272,7 @@ pub async fn run(
&cfg,
header.clone(),
received_at,
event_sender,
)
.await;
let confidence = match process_block_result {
Expand Down Expand Up @@ -275,6 +321,7 @@ mod tests {
};
use hex_literal::hex;
use test_case::test_case;
use tokio::sync::mpsc;

#[test_case(99.9 => 10)]
#[test_case(99.99 => CELL_COUNT_99_99)]
Expand Down Expand Up @@ -337,6 +384,7 @@ mod tests {
}),
};
let recv = Instant::now();
let (sender, _) = mpsc::channel::<OutputEvent>(1);
mock_network_client
.expect_fetch_verified()
.returning(move |_, _, _, _, positions| {
Expand All @@ -358,6 +406,7 @@ mod tests {
&cfg,
header,
recv,
sender,
)
.await
.unwrap();
Expand Down

0 comments on commit c7a23c2

Please sign in to comment.