Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making Metrics single threaded #690

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
uuid = { version = "1.3.4", features = ["v4", "fast-rng", "macro-diagnostics", "serde"] }
void = "1.0.2"
warp = "0.3.6"
futures = { version = "0.3.15", default-features = false, features = ["std", "async-await"] }

# OpenTelemetry
opentelemetry = "0.24.0"
Expand Down
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ better-panic = "0.3.0"
clap = { workspace = true }
color-eyre = { workspace = true }
confy = "0.4.0"
futures = { workspace = true }
hex = { workspace = true }
libp2p = { workspace = true }
strip-ansi-escapes = "0.2.0"
Expand Down
182 changes: 154 additions & 28 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ use avail_light_core::{
api,
consts::EXPECTED_SYSTEM_VERSION,
data::{self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, DB},
light_client, maintenance,
light_client::{self, OutputEvent as LcEvent},
maintenance::{self, OutputEvent as MaintenanceEvent},
network::{
self,
p2p::{self, BOOTSTRAP_LIST_EMPTY_MESSAGE},
p2p::{self, OutputEvent as P2pEvent, BOOTSTRAP_LIST_EMPTY_MESSAGE},
rpc, Network,
},
shutdown::Controller,
sync_client::SyncClient,
sync_finality::SyncFinality,
telemetry::{self, MetricCounter, Metrics},
telemetry::{self, MetricCounter, MetricValue, Metrics},
types::{
load_or_init_suri, IdentityConfig, MaintenanceConfig, MultiaddrConfig, RuntimeConfig,
SecretKey, Uuid,
Expand All @@ -32,7 +33,13 @@ use color_eyre::{
Result,
};
use std::{fs, path::Path, sync::Arc};
use tokio::sync::{broadcast, mpsc};
use tokio::{
select,
sync::{
broadcast,
mpsc::{self, UnboundedReceiver},
},
};
use tracing::{error, info, span, trace, warn, Level};

#[cfg(feature = "network-analysis")]
Expand Down Expand Up @@ -79,18 +86,16 @@ async fn run(
),
];

let ot_metrics = Arc::new(
telemetry::otlp::initialize(
metric_attributes,
cfg.project_name.clone(),
&cfg.origin,
&cfg.libp2p.kademlia.operation_mode.into(),
cfg.otel.clone(),
)
.wrap_err("Unable to initialize OpenTelemetry service")?,
);
let mut metrics = telemetry::otlp::initialize(
metric_attributes.clone(),
cfg.project_name.clone(),
&cfg.origin,
cfg.libp2p.kademlia.operation_mode.into(),
cfg.otel.clone(),
)
.wrap_err("Unable to initialize OpenTelemetry service")?;

let (p2p_client, p2p_event_loop, event_receiver) = p2p::init(
let (p2p_client, p2p_event_loop, p2p_event_receiver) = p2p::init(
cfg.libp2p.clone(),
cfg.project_name.clone(),
id_keys,
Expand All @@ -103,14 +108,6 @@ async fn run(
)
.await?;

let metrics_clone = ot_metrics.clone();
let shutdown_clone = shutdown.clone();
tokio::spawn(async move {
shutdown_clone
.with_cancel(metrics_clone.handle_event_stream(event_receiver))
.await
});

spawn_in_span(shutdown.with_cancel(p2p_event_loop.run()));

let addrs = vec![
Expand Down Expand Up @@ -289,10 +286,9 @@ async fn run(
}

let static_config_params: MaintenanceConfig = (&cfg).into();
let (maintenance_sender, _) = mpsc::channel::<maintenance::OutputEvent>(1000);
let (maintenance_sender, maintenance_receiver) = mpsc::unbounded_channel::<MaintenanceEvent>();
spawn_in_span(shutdown.with_cancel(maintenance::run(
p2p_client.clone(),
ot_metrics.clone(),
block_rx,
static_config_params,
shutdown.clone(),
Expand All @@ -305,18 +301,24 @@ async fn run(
};

let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc);
let (lc_sender, _) = mpsc::channel::<light_client::OutputEvent>(1000);
let (lc_sender, lc_receiver) = mpsc::unbounded_channel::<LcEvent>();
spawn_in_span(shutdown.with_cancel(light_client::run(
db.clone(),
light_network_client,
(&cfg).into(),
ot_metrics.clone(),
channels,
shutdown.clone(),
lc_sender,
)));

ot_metrics.count(MetricCounter::Starts).await;
metrics.count(MetricCounter::Starts);

spawn_in_span(shutdown.with_cancel(handle_events(
metrics,
p2p_event_receiver,
maintenance_receiver,
lc_receiver,
)));

Ok(())
}
Expand Down Expand Up @@ -382,6 +384,130 @@ pub fn load_runtime_config(opts: &CliOpts) -> Result<RuntimeConfig> {
Ok(cfg)
}

async fn handle_events(
mut metrics: impl Metrics,
mut p2p_receiver: UnboundedReceiver<P2pEvent>,
mut maintenance_receiver: UnboundedReceiver<MaintenanceEvent>,
mut lc_receiver: UnboundedReceiver<LcEvent>,
) {
loop {
select! {
Some(p2p_event) = p2p_receiver.recv() => {
match p2p_event {
P2pEvent::Count => {
metrics.count(MetricCounter::EventLoopEvent);
},
P2pEvent::IncomingGetRecord => {
metrics.count(MetricCounter::IncomingGetRecord);
},
P2pEvent::IncomingPutRecord => {
metrics.count(MetricCounter::IncomingPutRecord);
},
P2pEvent::KadModeChange(mode) => {
metrics.update_operating_mode(mode);
},
P2pEvent::Ping(rtt) => {
metrics.record(MetricValue::DHTPingLatency(rtt.as_millis() as f64));
},
P2pEvent::IncomingConnection => {
metrics.count(MetricCounter::IncomingConnections);
},
P2pEvent::IncomingConnectionError => {
metrics.count(MetricCounter::IncomingConnectionErrors);
},
P2pEvent::MultiaddressUpdate(address) => {
metrics.update_multiaddress(address);
},
P2pEvent::EstablishedConnection => {
metrics.count(MetricCounter::EstablishedConnections);
},
P2pEvent::OutgoingConnectionError => {
metrics.count(MetricCounter::OutgoingConnectionErrors);
},
P2pEvent::PutRecord { block_num, records } => {
metrics.handle_new_put_record(block_num, records);
},
P2pEvent::PutRecordSuccess {
record_key,
query_stats,
} => {
if let Err(error) = metrics.handle_successful_put_record(record_key, query_stats){
error!("Could not handle Successful PUT Record event properly: {error}");
};
},
P2pEvent::PutRecordFailed {
record_key,
query_stats,
} => {
if let Err(error) = metrics.handle_failed_put_record(record_key, query_stats) {
error!("Could not handle Failed PUT Record event properly: {error}");
};
},
}
}
Some(maintenance_event) = maintenance_receiver.recv() => {
match maintenance_event {
MaintenanceEvent::FlushMetrics(block_num) => {
if let Err(error) = metrics.flush() {
error!(
block_num,
"Could not handle Flush Maintenance event properly: {error}"
);
} else {
info!(block_num, "Flushing metrics finished");
};
},
MaintenanceEvent::RecordStats {
connected_peers,
block_confidence_treshold,
replication_factor,
query_timeout,
} => {
metrics.record(MetricValue::DHTConnectedPeers(connected_peers));
metrics.record(MetricValue::BlockConfidenceThreshold(block_confidence_treshold));
metrics.record(MetricValue::DHTReplicationFactor(replication_factor));
metrics.record(MetricValue::DHTQueryTimeout(query_timeout));
},
MaintenanceEvent::CountUps => {
metrics.count(MetricCounter::Up);
},
}
}
Some(lc_event) = lc_receiver.recv() => {
match lc_event {
LcEvent::RecordBlockProcessingDelay(delay) => {
metrics.record(MetricValue::BlockProcessingDelay(delay));
},
LcEvent::CountSessionBlocks => {
metrics.count(MetricCounter::SessionBlocks);
},
LcEvent::RecordBlockHeight(block_num) => {
metrics.record(MetricValue::BlockHeight(block_num));
},
LcEvent::RecordDHTStats {
fetched, fetched_percentage, fetch_duration
} => {
metrics.record(MetricValue::DHTFetched(fetched));
metrics.record(MetricValue::DHTFetchedPercentage(fetched_percentage));
metrics.record(MetricValue::DHTFetchDuration(fetch_duration));
},
LcEvent::RecordRPCFetched(fetched) => {
metrics.record(MetricValue::RPCFetched(fetched));
},
LcEvent::RecordRPCFetchDuration(duration) => {
metrics.record(MetricValue::RPCFetchDuration(duration));
}
LcEvent::RecordBlockConfidence(confidence) => {
metrics.record(MetricValue::BlockConfidence(confidence));
}
}
}
// break the loop if all channels are closed
else => break,
}
}
}

#[tokio::main]
pub async fn main() -> Result<()> {
let shutdown = Controller::new();
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ codec = { package = "parity-scale-codec", version = "3", default-features = fals
color-eyre = { workspace = true }
confy = { workspace = true }
derive_more = { version = "0.99.17", features = ["from"] }
futures = { version = "0.3.15", default-features = false, features = ["std", "async-await"] }
futures = { workspace = true }
hex = { workspace = true }
hyper = { version = "0.14.23", features = ["full", "http1"] }
itertools = "0.10.5"
Expand Down
43 changes: 26 additions & 17 deletions core/src/crawl_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@ use crate::{
p2p::Client,
rpc::{self, Event},
},
telemetry::{metric, otlp::Record, MetricName, Metrics},
telemetry::{metric, otlp::Record, MetricName},
types::{self, BlockVerified, Delay, Origin},
};
use avail_rust::kate_recovery::matrix::Partition;
use serde::{Deserialize, Serialize};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::broadcast;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc::UnboundedSender};
use tracing::{error, info};

#[derive(Clone)]
enum CrawlMetricValue {
pub enum CrawlMetricValue {
CellsSuccessRate(f64),
RowsSuccessRate(f64),
BlockDelay(f64),
Expand Down Expand Up @@ -55,6 +52,12 @@ pub enum CrawlMode {
Both,
}

pub enum OutputEvent {
RecordBlockDelay(f64),
RecordCellSuccessRate(f64),
RecordRowsSuccessRate(f64),
}

impl metric::Value for CrawlMetricValue {
// Metric filter for external peers
// Only the metrics we wish to send to OTel should be in this list
Expand All @@ -67,10 +70,10 @@ pub async fn run(
mut message_rx: broadcast::Receiver<Event>,
network_client: Client,
delay: u64,
metrics: Arc<impl Metrics>,
mode: CrawlMode,
partition: Partition,
block_sender: broadcast::Sender<BlockVerified>,
event_sender: UnboundedSender<OutputEvent>,
) {
info!("Starting crawl client...");

Expand All @@ -96,9 +99,11 @@ pub async fn run(

if let Some(seconds) = delay.sleep_duration(received_at) {
info!("Sleeping for {seconds:?} seconds");
let _ = metrics
.record(CrawlMetricValue::BlockDelay(seconds.as_secs_f64()))
.await;
if let Err(error) =
event_sender.send(OutputEvent::RecordBlockDelay(seconds.as_secs_f64()))
{
error!("Failed to send RecordBlockDelay event: {error}");
}
tokio::time::sleep(seconds).await;
}
let block_number = block.block_num;
Expand All @@ -125,9 +130,11 @@ pub async fn run(
block_number,
partition, success_rate, total, fetched, "Fetched block cells",
);
let _ = metrics
.record(CrawlMetricValue::CellsSuccessRate(success_rate))
.await;

if let Err(error) = event_sender.send(OutputEvent::RecordCellSuccessRate(success_rate))
{
error!("Failed to send RecordCellSuccessRate event: {error}");
}
}

if matches!(mode, CrawlMode::Cells | CrawlMode::Both) {
Expand All @@ -147,9 +154,11 @@ pub async fn run(
block_number,
success_rate, total, fetched, "Fetched block rows"
);
let _ = metrics
.record(CrawlMetricValue::RowsSuccessRate(success_rate))
.await;

if let Err(error) = event_sender.send(OutputEvent::RecordRowsSuccessRate(success_rate))
{
error!("Failed to send RecordRowsSuccessRate event: {error}");
}
}

if let Err(error) = block_sender.send(block) {
Expand Down
Loading
Loading