diff --git a/client/src/main.rs b/client/src/main.rs index 123cbe071..89c1589ab 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -4,7 +4,7 @@ use crate::cli::CliOpts; use avail_light_core::{ api, data::{ - self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, + self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, RpcNodeKey, SignerNonceKey, DB, }, light_client::{self, OutputEvent as LcEvent}, @@ -12,7 +12,8 @@ use avail_light_core::{ network::{ self, p2p::{self, extract_block_num, OutputEvent as P2pEvent, BOOTSTRAP_LIST_EMPTY_MESSAGE}, - rpc, Network, + rpc::{self, OutputEvent as RpcEvent}, + Network, }, shutdown::Controller, sync_client::SyncClient, @@ -137,8 +138,15 @@ async fn run( let public_params_len = hex::encode(raw_pp).len(); trace!("Public params ({public_params_len}): hash: {public_params_hash}"); - let (rpc_client, rpc_events, rpc_subscriptions) = - rpc::init(db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone()).await?; + let (rpc_sender, rpc_receiver) = mpsc::unbounded_channel::(); + let (rpc_client, rpc_events, rpc_subscriptions) = rpc::init( + db.clone(), + &cfg.genesis_hash, + &cfg.rpc, + shutdown.clone(), + Some(rpc_sender), + ) + .await?; let account_id = identity_cfg.avail_key_pair.public_key().to_account_id(); let client = rpc_client.current_client().await; @@ -318,6 +326,11 @@ async fn run( ), ]; + let host = db + .get(RpcNodeKey) + .map(|connected_ws| connected_ws.host) + .ok_or_else(|| eyre!("No connected host found"))?; + let metrics = telemetry::otlp::initialize(cfg.project_name.clone(), &cfg.origin, cfg.otel.clone()) .wrap_err("Unable to initialize OpenTelemetry service")?; @@ -325,13 +338,19 @@ async fn run( let mut state = ClientState::new( metrics, cfg.libp2p.kademlia.operation_mode.into(), + host, Multiaddr::empty(), metric_attributes, ); spawn_in_span(shutdown.with_cancel(async move { state - .handle_events(p2p_event_receiver, maintenance_receiver, lc_receiver) + .handle_events( + p2p_event_receiver, + maintenance_receiver, + lc_receiver, + rpc_receiver, + ) .await; })); @@ -448,6 +467,7 @@ struct ClientState { metrics: Metrics, kad_mode: Mode, multiaddress: Multiaddr, + connected_host: String, metric_attributes: Vec<(String, String)>, active_blocks: HashMap, } @@ -456,6 +476,7 @@ impl ClientState { fn new( metrics: Metrics, kad_mode: Mode, + connected_host: String, multiaddress: Multiaddr, metric_attributes: Vec<(String, String)>, ) -> Self { @@ -463,6 +484,7 @@ impl ClientState { metrics, kad_mode, multiaddress, + connected_host, metric_attributes, active_blocks: Default::default(), } @@ -476,10 +498,18 @@ impl ClientState { self.kad_mode = value; } + fn update_connected_host(&mut self, value: String) { + self.connected_host = value; + } + fn attributes(&self) -> Vec<(String, String)> { let mut attrs = vec![ ("operating_mode".to_string(), self.kad_mode.to_string()), ("multiaddress".to_string(), self.multiaddress.to_string()), + ( + "connected_host".to_string(), + self.connected_host.to_string(), + ), ]; attrs.extend(self.metric_attributes.clone()); @@ -566,6 +596,7 @@ impl ClientState { mut p2p_receiver: UnboundedReceiver, mut maintenance_receiver: UnboundedReceiver, mut lc_receiver: UnboundedReceiver, + mut rpc_receiver: UnboundedReceiver, ) { self.metrics.count(MetricCounter::Starts, self.attributes()); loop { @@ -674,10 +705,17 @@ impl ClientState { }, LcEvent::RecordRPCFetchDuration(duration) => { self.metrics.record(MetricValue::RPCFetchDuration(duration)); - } + }, LcEvent::RecordBlockConfidence(confidence) => { self.metrics.record(MetricValue::BlockConfidence(confidence)); - } + }, + } + } + Some(rpc_event) = rpc_receiver.recv() => { + match rpc_event { + RpcEvent::ConnectedHost(host) => { + self.update_connected_host(host); + }, } } // break the loop if all channels are closed diff --git a/compatibility-tests/src/main.rs b/compatibility-tests/src/main.rs index 085fc8468..988d7497a 100644 --- a/compatibility-tests/src/main.rs +++ b/compatibility-tests/src/main.rs @@ -40,7 +40,7 @@ async fn main() -> Result<()> { }; let shutdown = Controller::new(); - let (rpc_client, _, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown).await?; + let (rpc_client, _, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown, None).await?; tokio::spawn(subscriptions.run()); let mut correct: bool = true; diff --git a/core/src/network/p2p.rs b/core/src/network/p2p.rs index dc4207d74..eb72b0757 100644 --- a/core/src/network/p2p.rs +++ b/core/src/network/p2p.rs @@ -72,7 +72,6 @@ pub enum OutputEvent { IncomingGetRecord, IncomingPutRecord, KadModeChange(Mode), - Ping(Duration), IncomingConnection, IncomingConnectionError, diff --git a/core/src/network/rpc.rs b/core/src/network/rpc.rs index f624c748e..176fc3077 100644 --- a/core/src/network/rpc.rs +++ b/core/src/network/rpc.rs @@ -10,12 +10,17 @@ use rand::{seq::SliceRandom, thread_rng, Rng}; use serde::{de, Deserialize, Serialize}; use std::{collections::HashSet, fmt::Display}; use tokio::{ - sync::broadcast, + sync::{broadcast, mpsc::UnboundedSender}, time::{self, timeout}, }; use tracing::{debug, info}; -use crate::{data::Database, network::rpc, shutdown::Controller, types::GrandpaJustification}; +use crate::{ + data::Database, + network::rpc::{self, OutputEvent as RpcEvent}, + shutdown::Controller, + types::GrandpaJustification, +}; mod client; pub mod configuration; @@ -34,6 +39,11 @@ pub enum Subscription { Justification(GrandpaJustification), } +#[derive(Clone, Debug)] +pub enum OutputEvent { + ConnectedHost(String), +} + #[derive(Debug, Deserialize, Clone)] pub struct WrappedJustification(pub GrandpaJustification); @@ -188,6 +198,7 @@ pub async fn init( genesis_hash: &str, rpc: &RPCConfig, shutdown: Controller, + rpc_sender: Option>, ) -> Result<(Client, broadcast::Sender, SubscriptionLoop)> { let rpc_client = Client::new( db.clone(), @@ -195,6 +206,7 @@ pub async fn init( genesis_hash, rpc.retry.clone(), shutdown, + rpc_sender, ) .await?; // create output channel for RPC Subscription Events diff --git a/core/src/network/rpc/client.rs b/core/src/network/rpc/client.rs index 697a2deb3..1331d9500 100644 --- a/core/src/network/rpc/client.rs +++ b/core/src/network/rpc/client.rs @@ -22,7 +22,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::sync::RwLock; +use tokio::sync::{mpsc::UnboundedSender, RwLock}; use tokio_retry::Retry; use tokio_stream::StreamExt; use tracing::{debug, error, info, trace, warn}; @@ -31,6 +31,7 @@ use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof} use crate::{ api::v2::types::Base64, data::{Database, RpcNodeKey, SignerNonceKey}, + network::rpc::OutputEvent as RpcEvent, shutdown::Controller, types::DEV_FLAG_GENHASH, }; @@ -202,6 +203,7 @@ pub struct Client { retry_config: RetryConfig, expected_genesis_hash: String, shutdown: Controller, + rpc_sender: Option>, } pub struct SubmitResponse { @@ -218,6 +220,7 @@ impl Client { expected_genesis_hash: &str, retry_config: RetryConfig, shutdown: Controller, + rpc_sender: Option>, ) -> Result { let (client, node) = Self::initialize_connection( &nodes, @@ -234,6 +237,7 @@ impl Client { retry_config, expected_genesis_hash: expected_genesis_hash.to_string(), shutdown, + rpc_sender, }; client.db.put(RpcNodeKey, node); @@ -270,8 +274,13 @@ impl Client { expected_genesis_hash: &str, ) -> Result> { // Not passing any RPC function calls since this is a first try of connecting RPC nodes - Self::try_connect_and_execute(nodes, expected_genesis_hash, |_| futures::future::ok(())) - .await + Self::try_connect_and_execute( + nodes, + expected_genesis_hash, + |_| futures::future::ok(()), + None, + ) + .await } // Iterates through the RPC nodes, tries to create the first successful connection, verifies the genesis hash, @@ -280,6 +289,7 @@ impl Client { nodes: &[Node], expected_genesis_hash: &str, f: F, + rpc_sender: Option>, ) -> Result> where F: FnMut(SDK) -> Fut + Copy, @@ -294,6 +304,9 @@ impl Client { match Self::try_node_connection_and_exec(node, expected_genesis_hash, f).await { Ok(attempt) => { info!("Successfully connected to RPC: {}", node.host); + if let Some(sender) = rpc_sender { + sender.send(RpcEvent::ConnectedHost(node.host.clone()))?; + } return Ok(attempt); }, Err(err) => { @@ -444,8 +457,13 @@ impl Client { F: FnMut(SDK) -> Fut + Copy, Fut: std::future::Future>, { - let nodes_fn = - || async { Self::try_connect_and_execute(nodes, &self.expected_genesis_hash, f).await }; + let nodes_fn = move || { + let rpc_sender = self.rpc_sender.clone(); + async move { + Self::try_connect_and_execute(nodes, &self.expected_genesis_hash, f, rpc_sender) + .await + } + }; match self .shutdown diff --git a/crawler/src/main.rs b/crawler/src/main.rs index 494ab801a..8654e7ff5 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -122,6 +122,7 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> &config.genesis_hash, &config.rpc, shutdown.clone(), + None, ) .await?; diff --git a/fat/src/main.rs b/fat/src/main.rs index 72a9bcb60..d948b1c1c 100644 --- a/fat/src/main.rs +++ b/fat/src/main.rs @@ -124,6 +124,7 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> &config.genesis_hash, &config.rpc, shutdown.clone(), + None, ) .await?;