From adcfd80817a65ab68c9e5f41106e75c0a254653e Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Mon, 21 Oct 2024 13:19:56 +0530 Subject: [PATCH 01/11] feat: add fullnode_ws to metrics attributes --- client/src/main.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index 56dfbd691..cfde56d26 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -4,7 +4,7 @@ use crate::cli::CliOpts; use avail_light_core::{ api, data::{ - self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, + self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, RpcNodeKey, SignerNonceKey, DB, }, light_client::{self, OutputEvent as LcEvent}, @@ -303,7 +303,7 @@ async fn run( ))); // construct Metric Attributes and initialize Metrics - let metric_attributes = vec![ + let mut metric_attributes = vec![ ("version".to_string(), version.to_string()), ("role".to_string(), "lightnode".to_string()), ("peerID".to_string(), peer_id.to_string()), @@ -317,6 +317,10 @@ async fn run( ), ]; + if let Some(connected_ws) = db.get(RpcNodeKey) { + metric_attributes.push(("fullnode_ws".to_string(), connected_ws.host)); + } + let metrics = telemetry::otlp::initialize(cfg.project_name.clone(), &cfg.origin, cfg.otel.clone()) .wrap_err("Unable to initialize OpenTelemetry service")?; From 676fef8c228b22b392717c846479a65db7d6e205 Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Mon, 21 Oct 2024 18:52:18 +0530 Subject: [PATCH 02/11] add host to attributes --- client/src/main.rs | 17 ++++++++++++++--- core/src/light_client.rs | 7 ++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index cfde56d26..2c827984c 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -318,7 +318,7 @@ async fn run( ]; if let Some(connected_ws) = db.get(RpcNodeKey) { - metric_attributes.push(("fullnode_ws".to_string(), connected_ws.host)); + metric_attributes.push(("connected_host".to_string(), connected_ws.host)); } let metrics = @@ -662,6 +662,17 @@ impl ClientState { LcEvent::CountSessionBlocks => { self.metrics.count(MetricCounter::SessionBlocks,self.attributes()); }, + LcEvent::ConnectedHost(host) => { + if let Some((_, v)) = self.metric_attributes.iter_mut().find(|(k, _)| *k == "connected_host") { + *v = host; + } + + if let Err(error) = self.metrics.flush(self.attributes()) { + error!("Could not handle flush event properly: {error}"); + } else { + info!("Flushing metrics finished"); + }; + }, LcEvent::RecordBlockHeight(block_num) => { self.metrics.record(MetricValue::BlockHeight(block_num)); }, @@ -677,10 +688,10 @@ impl ClientState { }, LcEvent::RecordRPCFetchDuration(duration) => { self.metrics.record(MetricValue::RPCFetchDuration(duration)); - } + }, LcEvent::RecordBlockConfidence(confidence) => { self.metrics.record(MetricValue::BlockConfidence(confidence)); - } + }, } } // break the loop if all channels are closed diff --git a/core/src/light_client.rs b/core/src/light_client.rs index e09ee7186..cf7fd5006 100644 --- a/core/src/light_client.rs +++ b/core/src/light_client.rs @@ -29,7 +29,7 @@ use tokio::sync::mpsc::UnboundedSender; use tracing::{error, info}; use crate::{ - data::{AchievedConfidenceKey, BlockHeaderKey, Database, VerifiedCellCountKey}, + data::{AchievedConfidenceKey, BlockHeaderKey, Database, RpcNodeKey, VerifiedCellCountKey}, network::{ self, rpc::{self, Event}, @@ -43,6 +43,7 @@ pub enum OutputEvent { RecordBlockProcessingDelay(f64), CountSessionBlocks, RecordBlockHeight(u32), + ConnectedHost(String), RecordDHTStats { fetched: f64, fetched_percentage: f64, @@ -130,6 +131,10 @@ pub async fn process_block( event_sender.send(OutputEvent::RecordRPCFetched(rpc_fetched))?; } + if let Some(connected_host) = db.get(RpcNodeKey) { + event_sender.send(OutputEvent::ConnectedHost(connected_host.host))?; + } + if let Some(rpc_fetch_duration) = fetch_stats.rpc_fetch_duration { event_sender.send(OutputEvent::RecordRPCFetchDuration(rpc_fetch_duration))?; } From 2e39339b6af19388a7a6de2efe10a06acc05a550 Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Mon, 21 Oct 2024 20:11:09 +0530 Subject: [PATCH 03/11] add connected host to client state --- client/src/main.rs | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index 2c827984c..0b95b2767 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -303,7 +303,7 @@ async fn run( ))); // construct Metric Attributes and initialize Metrics - let mut metric_attributes = vec![ + let metric_attributes = vec![ ("version".to_string(), version.to_string()), ("role".to_string(), "lightnode".to_string()), ("peerID".to_string(), peer_id.to_string()), @@ -317,9 +317,10 @@ async fn run( ), ]; - if let Some(connected_ws) = db.get(RpcNodeKey) { - metric_attributes.push(("connected_host".to_string(), connected_ws.host)); - } + let host = db + .get(RpcNodeKey) + .map(|connected_ws| connected_ws.host) + .ok_or_else(|| eyre!("No connected host found"))?; let metrics = telemetry::otlp::initialize(cfg.project_name.clone(), &cfg.origin, cfg.otel.clone()) @@ -328,6 +329,7 @@ async fn run( let mut state = ClientState::new( metrics, cfg.libp2p.kademlia.operation_mode.into(), + host, Multiaddr::empty(), metric_attributes, ); @@ -451,6 +453,7 @@ struct ClientState { metrics: Metrics, kad_mode: Mode, multiaddress: Multiaddr, + connected_host: String, metric_attributes: Vec<(String, String)>, active_blocks: HashMap, } @@ -459,6 +462,7 @@ impl ClientState { fn new( metrics: Metrics, kad_mode: Mode, + connected_host: String, multiaddress: Multiaddr, metric_attributes: Vec<(String, String)>, ) -> Self { @@ -466,6 +470,7 @@ impl ClientState { metrics, kad_mode, multiaddress, + connected_host, metric_attributes, active_blocks: Default::default(), } @@ -479,10 +484,18 @@ impl ClientState { self.kad_mode = value; } + fn update_connected_host(&mut self, value: String) { + self.connected_host = value; + } + fn attributes(&self) -> Vec<(String, String)> { let mut attrs = vec![ ("operating_mode".to_string(), self.kad_mode.to_string()), ("multiaddress".to_string(), self.multiaddress.to_string()), + ( + "connected_host".to_string(), + self.connected_host.to_string(), + ), ]; attrs.extend(self.metric_attributes.clone()); @@ -663,16 +676,8 @@ impl ClientState { self.metrics.count(MetricCounter::SessionBlocks,self.attributes()); }, LcEvent::ConnectedHost(host) => { - if let Some((_, v)) = self.metric_attributes.iter_mut().find(|(k, _)| *k == "connected_host") { - *v = host; - } - - if let Err(error) = self.metrics.flush(self.attributes()) { - error!("Could not handle flush event properly: {error}"); - } else { - info!("Flushing metrics finished"); - }; - }, + self.update_connected_host(host); + } LcEvent::RecordBlockHeight(block_num) => { self.metrics.record(MetricValue::BlockHeight(block_num)); }, From 45159fd484722ead20b46dc346dd72947d2c1f96 Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Tue, 22 Oct 2024 16:53:29 +0530 Subject: [PATCH 04/11] push changes --- client/src/main.rs | 12 +++++++++--- compatibility-tests/src/main.rs | 2 +- core/src/network/rpc.rs | 9 +++++++-- core/src/network/rpc/client.rs | 10 +++++++++- crawler/src/main.rs | 1 + fat/src/main.rs | 1 + 6 files changed, 28 insertions(+), 7 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index 0b95b2767..59e0e3dd6 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -136,9 +136,16 @@ async fn run( let public_params_hash = hex::encode(blake2_128(&raw_pp)); let public_params_len = hex::encode(raw_pp).len(); trace!("Public params ({public_params_len}): hash: {public_params_hash}"); + let (lc_sender, lc_receiver) = mpsc::unbounded_channel::(); - let (rpc_client, rpc_events, rpc_subscriptions) = - rpc::init(db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone()).await?; + let (rpc_client, rpc_events, rpc_subscriptions) = rpc::init( + db.clone(), + &cfg.genesis_hash, + &cfg.rpc, + shutdown.clone(), + Some(lc_sender.clone()), + ) + .await?; let account_id = identity_cfg.avail_key_pair.public_key().to_account_id(); let client = rpc_client.current_client().await; @@ -291,7 +298,6 @@ async fn run( }; let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc); - let (lc_sender, lc_receiver) = mpsc::unbounded_channel::(); spawn_in_span(shutdown.with_cancel(light_client::run( db.clone(), light_network_client, diff --git a/compatibility-tests/src/main.rs b/compatibility-tests/src/main.rs index 085fc8468..988d7497a 100644 --- a/compatibility-tests/src/main.rs +++ b/compatibility-tests/src/main.rs @@ -40,7 +40,7 @@ async fn main() -> Result<()> { }; let shutdown = Controller::new(); - let (rpc_client, _, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown).await?; + let (rpc_client, _, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown, None).await?; tokio::spawn(subscriptions.run()); let mut correct: bool = true; diff --git a/core/src/network/rpc.rs b/core/src/network/rpc.rs index f624c748e..83c6cedec 100644 --- a/core/src/network/rpc.rs +++ b/core/src/network/rpc.rs @@ -10,12 +10,15 @@ use rand::{seq::SliceRandom, thread_rng, Rng}; use serde::{de, Deserialize, Serialize}; use std::{collections::HashSet, fmt::Display}; use tokio::{ - sync::broadcast, + sync::{broadcast, mpsc::UnboundedSender}, time::{self, timeout}, }; use tracing::{debug, info}; -use crate::{data::Database, network::rpc, shutdown::Controller, types::GrandpaJustification}; +use crate::{ + data::Database, light_client::OutputEvent as LcEvent, network::rpc, shutdown::Controller, + types::GrandpaJustification, +}; mod client; pub mod configuration; @@ -188,6 +191,7 @@ pub async fn init( genesis_hash: &str, rpc: &RPCConfig, shutdown: Controller, + client_sender: Option>, ) -> Result<(Client, broadcast::Sender, SubscriptionLoop)> { let rpc_client = Client::new( db.clone(), @@ -195,6 +199,7 @@ pub async fn init( genesis_hash, rpc.retry.clone(), shutdown, + client_sender, ) .await?; // create output channel for RPC Subscription Events diff --git a/core/src/network/rpc/client.rs b/core/src/network/rpc/client.rs index 1bc88cff8..4cb79ac21 100644 --- a/core/src/network/rpc/client.rs +++ b/core/src/network/rpc/client.rs @@ -16,7 +16,7 @@ use color_eyre::{ }; use futures::{Stream, TryFutureExt, TryStreamExt}; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{mpsc::UnboundedSender, RwLock}; use tokio_retry::Retry; use tokio_stream::StreamExt; use tracing::{info, warn}; @@ -25,6 +25,7 @@ use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof} use crate::{ api::v2::types::Base64, data::{Database, RpcNodeKey, SignerNonceKey}, + light_client::OutputEvent as LcEvent, shutdown::Controller, types::DEV_FLAG_GENHASH, }; @@ -37,6 +38,7 @@ pub struct Client { retry_config: RetryConfig, expected_genesis_hash: String, shutdown: Controller, + client_sender: Option>, } pub struct SubmitResponse { @@ -52,6 +54,7 @@ impl Client { expected_genesis_hash: &str, retry_config: RetryConfig, shutdown: Controller, + client_sender: Option>, ) -> Result { // try and connect appropriate Node from the provided list // will do retries with the provided Retry Config @@ -85,6 +88,7 @@ impl Client { retry_config, expected_genesis_hash: expected_genesis_hash.to_string(), shutdown, + client_sender, }) } @@ -217,6 +221,10 @@ impl Client { // retries gave results, // update db with currently connected Node and keep a reference to the created Client *self.subxt_client.write().await = client; + if let Some(event_sender) = &self.client_sender { + let connected_host: String = node.host.clone(); + event_sender.send(LcEvent::ConnectedHost(connected_host))?; + } self.db.put(RpcNodeKey, node); return Ok(result); }, diff --git a/crawler/src/main.rs b/crawler/src/main.rs index 0e7bcf686..460e2dbd3 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -122,6 +122,7 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> &config.genesis_hash, &config.rpc, shutdown.clone(), + None, ) .await?; diff --git a/fat/src/main.rs b/fat/src/main.rs index 9370f7ae8..b313cda6a 100644 --- a/fat/src/main.rs +++ b/fat/src/main.rs @@ -124,6 +124,7 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> &config.genesis_hash, &config.rpc, shutdown.clone(), + None, ) .await?; From f969717b73d928031216e21e87d1d96c22472480 Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Tue, 22 Oct 2024 19:04:44 +0530 Subject: [PATCH 05/11] remove rpc node key on process block --- core/src/light_client.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/light_client.rs b/core/src/light_client.rs index cf7fd5006..287d7abfc 100644 --- a/core/src/light_client.rs +++ b/core/src/light_client.rs @@ -131,10 +131,6 @@ pub async fn process_block( event_sender.send(OutputEvent::RecordRPCFetched(rpc_fetched))?; } - if let Some(connected_host) = db.get(RpcNodeKey) { - event_sender.send(OutputEvent::ConnectedHost(connected_host.host))?; - } - if let Some(rpc_fetch_duration) = fetch_stats.rpc_fetch_duration { event_sender.send(OutputEvent::RecordRPCFetchDuration(rpc_fetch_duration))?; } From 2f3e1c2b451ab044ab766cfbf9fa71bc8f8fff03 Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Tue, 22 Oct 2024 19:10:00 +0530 Subject: [PATCH 06/11] Update main.rs --- client/src/main.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index 59e0e3dd6..3f2faafb9 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -138,14 +138,8 @@ async fn run( trace!("Public params ({public_params_len}): hash: {public_params_hash}"); let (lc_sender, lc_receiver) = mpsc::unbounded_channel::(); - let (rpc_client, rpc_events, rpc_subscriptions) = rpc::init( - db.clone(), - &cfg.genesis_hash, - &cfg.rpc, - shutdown.clone(), - Some(lc_sender.clone()), - ) - .await?; + let (rpc_client, rpc_events, rpc_subscriptions) = + rpc::init(db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone(), Some(lc_sender.clone())).await?; let account_id = identity_cfg.avail_key_pair.public_key().to_account_id(); let client = rpc_client.current_client().await; From 337eb1367f15aa4722cf1db427f723cc78331faa Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 24 Oct 2024 15:10:50 +0530 Subject: [PATCH 07/11] merge main --- core/src/network/rpc.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/network/rpc.rs b/core/src/network/rpc.rs index 83c6cedec..82e584de5 100644 --- a/core/src/network/rpc.rs +++ b/core/src/network/rpc.rs @@ -199,7 +199,6 @@ pub async fn init( genesis_hash, rpc.retry.clone(), shutdown, - client_sender, ) .await?; // create output channel for RPC Subscription Events From 274cbbc236a873e05c7f31ac09e78ee746f0586b Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:11:51 +0530 Subject: [PATCH 08/11] add switching event sender inside try and exec --- client/src/main.rs | 10 ++++++++-- core/src/light_client.rs | 2 +- core/src/network/rpc.rs | 1 + core/src/network/rpc/client.rs | 30 ++++++++++++++++++++++++------ 4 files changed, 34 insertions(+), 9 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index a4745322c..d8bd39f10 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -138,8 +138,14 @@ async fn run( trace!("Public params ({public_params_len}): hash: {public_params_hash}"); let (lc_sender, lc_receiver) = mpsc::unbounded_channel::(); - let (rpc_client, rpc_events, rpc_subscriptions) = - rpc::init(db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone(), Some(lc_sender.clone())).await?; + let (rpc_client, rpc_events, rpc_subscriptions) = rpc::init( + db.clone(), + &cfg.genesis_hash, + &cfg.rpc, + shutdown.clone(), + Some(lc_sender.clone()), + ) + .await?; let account_id = identity_cfg.avail_key_pair.public_key().to_account_id(); let client = rpc_client.current_client().await; diff --git a/core/src/light_client.rs b/core/src/light_client.rs index 287d7abfc..758973306 100644 --- a/core/src/light_client.rs +++ b/core/src/light_client.rs @@ -29,7 +29,7 @@ use tokio::sync::mpsc::UnboundedSender; use tracing::{error, info}; use crate::{ - data::{AchievedConfidenceKey, BlockHeaderKey, Database, RpcNodeKey, VerifiedCellCountKey}, + data::{AchievedConfidenceKey, BlockHeaderKey, Database, VerifiedCellCountKey}, network::{ self, rpc::{self, Event}, diff --git a/core/src/network/rpc.rs b/core/src/network/rpc.rs index 82e584de5..83c6cedec 100644 --- a/core/src/network/rpc.rs +++ b/core/src/network/rpc.rs @@ -199,6 +199,7 @@ pub async fn init( genesis_hash, rpc.retry.clone(), shutdown, + client_sender, ) .await?; // create output channel for RPC Subscription Events diff --git a/core/src/network/rpc/client.rs b/core/src/network/rpc/client.rs index 46c42e7ad..ecbe233e1 100644 --- a/core/src/network/rpc/client.rs +++ b/core/src/network/rpc/client.rs @@ -22,7 +22,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::sync::RwLock; +use tokio::sync::{mpsc::UnboundedSender, RwLock}; use tokio_retry::Retry; use tokio_stream::StreamExt; use tracing::{debug, error, info, trace, warn}; @@ -31,6 +31,7 @@ use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof} use crate::{ api::v2::types::Base64, data::{Database, RpcNodeKey, SignerNonceKey}, + light_client::OutputEvent as LcEvent, shutdown::Controller, types::DEV_FLAG_GENHASH, }; @@ -202,6 +203,7 @@ pub struct Client { retry_config: RetryConfig, expected_genesis_hash: String, shutdown: Controller, + client_sender: Option>, } pub struct SubmitResponse { @@ -218,6 +220,7 @@ impl Client { expected_genesis_hash: &str, retry_config: RetryConfig, shutdown: Controller, + client_sender: Option>, ) -> Result { let (client, node) = Self::initialize_connection( &nodes, @@ -234,6 +237,7 @@ impl Client { retry_config, expected_genesis_hash: expected_genesis_hash.to_string(), shutdown, + client_sender, }; client.db.put(RpcNodeKey, node); @@ -270,8 +274,13 @@ impl Client { expected_genesis_hash: &str, ) -> Result> { // Not passing any RPC function calls since this is a first try of connecting RPC nodes - Self::try_connect_and_execute(nodes, expected_genesis_hash, |_| futures::future::ok(())) - .await + Self::try_connect_and_execute( + nodes, + expected_genesis_hash, + |_| futures::future::ok(()), + None, + ) + .await } // Iterates through the RPC nodes, tries to create the first successful connection, verifies the genesis hash, @@ -280,6 +289,7 @@ impl Client { nodes: &[Node], expected_genesis_hash: &str, f: F, + client_sender: Option>, ) -> Result> where F: FnMut(SDK) -> Fut + Copy, @@ -294,6 +304,9 @@ impl Client { match Self::try_node_connection_and_exec(node, expected_genesis_hash, f).await { Ok(attempt) => { info!("Successfully connected to RPC: {}", node.host); + if let Some(event_sender) = client_sender.as_ref() { + event_sender.send(LcEvent::ConnectedHost(node.host.clone()))?; + } return Ok(attempt); }, Err(err) => { @@ -444,8 +457,13 @@ impl Client { F: FnMut(SDK) -> Fut + Copy, Fut: std::future::Future>, { - let nodes_fn = - || async { Self::try_connect_and_execute(nodes, &self.expected_genesis_hash, f).await }; + let nodes_fn = move || { + let client_sender = self.client_sender.clone(); + async move { + Self::try_connect_and_execute(nodes, &self.expected_genesis_hash, f, client_sender) + .await + } + }; match self .shutdown @@ -893,4 +911,4 @@ impl Client { Ok(gen_hash) } -} \ No newline at end of file +} From c181be66de24858aeb70f95de9ad1a17f3c91939 Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:14:03 +0530 Subject: [PATCH 09/11] fmt --- client/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main.rs b/client/src/main.rs index d8bd39f10..ca3716fd9 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -684,7 +684,7 @@ impl ClientState { }, LcEvent::ConnectedHost(host) => { self.update_connected_host(host); - } + }, LcEvent::RecordBlockHeight(block_num) => { self.metrics.record(MetricValue::BlockHeight(block_num)); }, From 4600bd5d6ecd15aad0896d6ea92faf75b0ddc6de Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 24 Oct 2024 18:56:51 +0530 Subject: [PATCH 10/11] move rpc event to its own mod --- client/src/main.rs | 26 +++++++++++++++++++------- core/src/light_client.rs | 1 - core/src/network/p2p.rs | 1 - core/src/network/rpc.rs | 13 ++++++++++--- core/src/network/rpc/client.rs | 18 +++++++++--------- crawler/src/main.rs | 3 ++- 6 files changed, 40 insertions(+), 22 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index ca3716fd9..89c1589ab 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -12,7 +12,8 @@ use avail_light_core::{ network::{ self, p2p::{self, extract_block_num, OutputEvent as P2pEvent, BOOTSTRAP_LIST_EMPTY_MESSAGE}, - rpc, Network, + rpc::{self, OutputEvent as RpcEvent}, + Network, }, shutdown::Controller, sync_client::SyncClient, @@ -136,14 +137,14 @@ async fn run( let public_params_hash = hex::encode(blake2_128(&raw_pp)); let public_params_len = hex::encode(raw_pp).len(); trace!("Public params ({public_params_len}): hash: {public_params_hash}"); - let (lc_sender, lc_receiver) = mpsc::unbounded_channel::(); + let (rpc_sender, rpc_receiver) = mpsc::unbounded_channel::(); let (rpc_client, rpc_events, rpc_subscriptions) = rpc::init( db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone(), - Some(lc_sender.clone()), + Some(rpc_sender), ) .await?; @@ -298,6 +299,7 @@ async fn run( }; let light_network_client = network::new(p2p_client, rpc_client, pp, cfg.disable_rpc); + let (lc_sender, lc_receiver) = mpsc::unbounded_channel::(); spawn_in_span(shutdown.with_cancel(light_client::run( db.clone(), light_network_client, @@ -343,7 +345,12 @@ async fn run( spawn_in_span(shutdown.with_cancel(async move { state - .handle_events(p2p_event_receiver, maintenance_receiver, lc_receiver) + .handle_events( + p2p_event_receiver, + maintenance_receiver, + lc_receiver, + rpc_receiver, + ) .await; })); @@ -589,6 +596,7 @@ impl ClientState { mut p2p_receiver: UnboundedReceiver, mut maintenance_receiver: UnboundedReceiver, mut lc_receiver: UnboundedReceiver, + mut rpc_receiver: UnboundedReceiver, ) { self.metrics.count(MetricCounter::Starts, self.attributes()); loop { @@ -682,9 +690,6 @@ impl ClientState { LcEvent::CountSessionBlocks => { self.metrics.count(MetricCounter::SessionBlocks,self.attributes()); }, - LcEvent::ConnectedHost(host) => { - self.update_connected_host(host); - }, LcEvent::RecordBlockHeight(block_num) => { self.metrics.record(MetricValue::BlockHeight(block_num)); }, @@ -706,6 +711,13 @@ impl ClientState { }, } } + Some(rpc_event) = rpc_receiver.recv() => { + match rpc_event { + RpcEvent::ConnectedHost(host) => { + self.update_connected_host(host); + }, + } + } // break the loop if all channels are closed else => break, } diff --git a/core/src/light_client.rs b/core/src/light_client.rs index 758973306..e09ee7186 100644 --- a/core/src/light_client.rs +++ b/core/src/light_client.rs @@ -43,7 +43,6 @@ pub enum OutputEvent { RecordBlockProcessingDelay(f64), CountSessionBlocks, RecordBlockHeight(u32), - ConnectedHost(String), RecordDHTStats { fetched: f64, fetched_percentage: f64, diff --git a/core/src/network/p2p.rs b/core/src/network/p2p.rs index dc4207d74..eb72b0757 100644 --- a/core/src/network/p2p.rs +++ b/core/src/network/p2p.rs @@ -72,7 +72,6 @@ pub enum OutputEvent { IncomingGetRecord, IncomingPutRecord, KadModeChange(Mode), - Ping(Duration), IncomingConnection, IncomingConnectionError, diff --git a/core/src/network/rpc.rs b/core/src/network/rpc.rs index 83c6cedec..176fc3077 100644 --- a/core/src/network/rpc.rs +++ b/core/src/network/rpc.rs @@ -16,7 +16,9 @@ use tokio::{ use tracing::{debug, info}; use crate::{ - data::Database, light_client::OutputEvent as LcEvent, network::rpc, shutdown::Controller, + data::Database, + network::rpc::{self, OutputEvent as RpcEvent}, + shutdown::Controller, types::GrandpaJustification, }; @@ -37,6 +39,11 @@ pub enum Subscription { Justification(GrandpaJustification), } +#[derive(Clone, Debug)] +pub enum OutputEvent { + ConnectedHost(String), +} + #[derive(Debug, Deserialize, Clone)] pub struct WrappedJustification(pub GrandpaJustification); @@ -191,7 +198,7 @@ pub async fn init( genesis_hash: &str, rpc: &RPCConfig, shutdown: Controller, - client_sender: Option>, + rpc_sender: Option>, ) -> Result<(Client, broadcast::Sender, SubscriptionLoop)> { let rpc_client = Client::new( db.clone(), @@ -199,7 +206,7 @@ pub async fn init( genesis_hash, rpc.retry.clone(), shutdown, - client_sender, + rpc_sender, ) .await?; // create output channel for RPC Subscription Events diff --git a/core/src/network/rpc/client.rs b/core/src/network/rpc/client.rs index ecbe233e1..1331d9500 100644 --- a/core/src/network/rpc/client.rs +++ b/core/src/network/rpc/client.rs @@ -31,7 +31,7 @@ use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof} use crate::{ api::v2::types::Base64, data::{Database, RpcNodeKey, SignerNonceKey}, - light_client::OutputEvent as LcEvent, + network::rpc::OutputEvent as RpcEvent, shutdown::Controller, types::DEV_FLAG_GENHASH, }; @@ -203,7 +203,7 @@ pub struct Client { retry_config: RetryConfig, expected_genesis_hash: String, shutdown: Controller, - client_sender: Option>, + rpc_sender: Option>, } pub struct SubmitResponse { @@ -220,7 +220,7 @@ impl Client { expected_genesis_hash: &str, retry_config: RetryConfig, shutdown: Controller, - client_sender: Option>, + rpc_sender: Option>, ) -> Result { let (client, node) = Self::initialize_connection( &nodes, @@ -237,7 +237,7 @@ impl Client { retry_config, expected_genesis_hash: expected_genesis_hash.to_string(), shutdown, - client_sender, + rpc_sender, }; client.db.put(RpcNodeKey, node); @@ -289,7 +289,7 @@ impl Client { nodes: &[Node], expected_genesis_hash: &str, f: F, - client_sender: Option>, + rpc_sender: Option>, ) -> Result> where F: FnMut(SDK) -> Fut + Copy, @@ -304,8 +304,8 @@ impl Client { match Self::try_node_connection_and_exec(node, expected_genesis_hash, f).await { Ok(attempt) => { info!("Successfully connected to RPC: {}", node.host); - if let Some(event_sender) = client_sender.as_ref() { - event_sender.send(LcEvent::ConnectedHost(node.host.clone()))?; + if let Some(sender) = rpc_sender { + sender.send(RpcEvent::ConnectedHost(node.host.clone()))?; } return Ok(attempt); }, @@ -458,9 +458,9 @@ impl Client { Fut: std::future::Future>, { let nodes_fn = move || { - let client_sender = self.client_sender.clone(); + let rpc_sender = self.rpc_sender.clone(); async move { - Self::try_connect_and_execute(nodes, &self.expected_genesis_hash, f, client_sender) + Self::try_connect_and_execute(nodes, &self.expected_genesis_hash, f, rpc_sender) .await } }; diff --git a/crawler/src/main.rs b/crawler/src/main.rs index 8654e7ff5..83c75f592 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -3,7 +3,8 @@ use avail_light_core::{ data::{Database, LatestHeaderKey, DB}, network::{ p2p::{self, OutputEvent as P2pEvent}, - rpc, Network, + rpc::{self, OutputEvent as RpcEvent}, + Network, }, shutdown::Controller, telemetry::{ From 35a60be85bf684838572e004c4b7fdbe1914140b Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 24 Oct 2024 19:00:06 +0530 Subject: [PATCH 11/11] clippy fixes --- crawler/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crawler/src/main.rs b/crawler/src/main.rs index 83c75f592..8654e7ff5 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -3,8 +3,7 @@ use avail_light_core::{ data::{Database, LatestHeaderKey, DB}, network::{ p2p::{self, OutputEvent as P2pEvent}, - rpc::{self, OutputEvent as RpcEvent}, - Network, + rpc, Network, }, shutdown::Controller, telemetry::{