Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RPC endpoint to metric attributes #727

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
52 changes: 45 additions & 7 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ use crate::cli::CliOpts;
use avail_light_core::{
api,
data::{
self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey,
self, ClientIdKey, Database, IsFinalitySyncedKey, IsSyncedKey, LatestHeaderKey, RpcNodeKey,
SignerNonceKey, DB,
},
light_client::{self, OutputEvent as LcEvent},
maintenance::{self, OutputEvent as MaintenanceEvent},
network::{
self,
p2p::{self, extract_block_num, OutputEvent as P2pEvent, BOOTSTRAP_LIST_EMPTY_MESSAGE},
rpc, Network,
rpc::{self, OutputEvent as RpcEvent},
Network,
},
shutdown::Controller,
sync_client::SyncClient,
Expand Down Expand Up @@ -137,8 +138,15 @@ async fn run(
let public_params_len = hex::encode(raw_pp).len();
trace!("Public params ({public_params_len}): hash: {public_params_hash}");

let (rpc_client, rpc_events, rpc_subscriptions) =
rpc::init(db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone()).await?;
let (rpc_sender, rpc_receiver) = mpsc::unbounded_channel::<RpcEvent>();
let (rpc_client, rpc_events, rpc_subscriptions) = rpc::init(
db.clone(),
&cfg.genesis_hash,
&cfg.rpc,
shutdown.clone(),
Some(rpc_sender),
)
.await?;

let account_id = identity_cfg.avail_key_pair.public_key().to_account_id();
let client = rpc_client.current_client().await;
Expand Down Expand Up @@ -318,20 +326,31 @@ async fn run(
),
];

let host = db
.get(RpcNodeKey)
.map(|connected_ws| connected_ws.host)
.ok_or_else(|| eyre!("No connected host found"))?;

let metrics =
telemetry::otlp::initialize(cfg.project_name.clone(), &cfg.origin, cfg.otel.clone())
.wrap_err("Unable to initialize OpenTelemetry service")?;

let mut state = ClientState::new(
metrics,
cfg.libp2p.kademlia.operation_mode.into(),
host,
Multiaddr::empty(),
metric_attributes,
);

spawn_in_span(shutdown.with_cancel(async move {
state
.handle_events(p2p_event_receiver, maintenance_receiver, lc_receiver)
.handle_events(
p2p_event_receiver,
maintenance_receiver,
lc_receiver,
rpc_receiver,
)
.await;
}));

Expand Down Expand Up @@ -448,6 +467,7 @@ struct ClientState {
metrics: Metrics,
kad_mode: Mode,
multiaddress: Multiaddr,
connected_host: String,
metric_attributes: Vec<(String, String)>,
active_blocks: HashMap<u32, BlockStat>,
}
Expand All @@ -456,13 +476,15 @@ impl ClientState {
fn new(
metrics: Metrics,
kad_mode: Mode,
connected_host: String,
multiaddress: Multiaddr,
metric_attributes: Vec<(String, String)>,
) -> Self {
ClientState {
metrics,
kad_mode,
multiaddress,
connected_host,
metric_attributes,
active_blocks: Default::default(),
}
Expand All @@ -476,10 +498,18 @@ impl ClientState {
self.kad_mode = value;
}

fn update_connected_host(&mut self, value: String) {
self.connected_host = value;
}

fn attributes(&self) -> Vec<(String, String)> {
let mut attrs = vec![
("operating_mode".to_string(), self.kad_mode.to_string()),
("multiaddress".to_string(), self.multiaddress.to_string()),
(
"connected_host".to_string(),
self.connected_host.to_string(),
),
];

attrs.extend(self.metric_attributes.clone());
Expand Down Expand Up @@ -566,6 +596,7 @@ impl ClientState {
mut p2p_receiver: UnboundedReceiver<P2pEvent>,
mut maintenance_receiver: UnboundedReceiver<MaintenanceEvent>,
mut lc_receiver: UnboundedReceiver<LcEvent>,
mut rpc_receiver: UnboundedReceiver<RpcEvent>,
) {
self.metrics.count(MetricCounter::Starts, self.attributes());
loop {
Expand Down Expand Up @@ -674,10 +705,17 @@ impl ClientState {
},
LcEvent::RecordRPCFetchDuration(duration) => {
self.metrics.record(MetricValue::RPCFetchDuration(duration));
}
},
LcEvent::RecordBlockConfidence(confidence) => {
self.metrics.record(MetricValue::BlockConfidence(confidence));
}
},
}
}
Some(rpc_event) = rpc_receiver.recv() => {
match rpc_event {
RpcEvent::ConnectedHost(host) => {
self.update_connected_host(host);
},
}
}
// break the loop if all channels are closed
Expand Down
2 changes: 1 addition & 1 deletion compatibility-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() -> Result<()> {
};

let shutdown = Controller::new();
let (rpc_client, _, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown).await?;
let (rpc_client, _, subscriptions) = rpc::init(db, "DEV", &rpc_cfg, shutdown, None).await?;
tokio::spawn(subscriptions.run());

let mut correct: bool = true;
Expand Down
1 change: 0 additions & 1 deletion core/src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ pub enum OutputEvent {
IncomingGetRecord,
IncomingPutRecord,
KadModeChange(Mode),

Ping(Duration),
IncomingConnection,
IncomingConnectionError,
Expand Down
16 changes: 14 additions & 2 deletions core/src/network/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ use rand::{seq::SliceRandom, thread_rng, Rng};
use serde::{de, Deserialize, Serialize};
use std::{collections::HashSet, fmt::Display};
use tokio::{
sync::broadcast,
sync::{broadcast, mpsc::UnboundedSender},
time::{self, timeout},
};
use tracing::{debug, info};

use crate::{data::Database, network::rpc, shutdown::Controller, types::GrandpaJustification};
use crate::{
data::Database,
network::rpc::{self, OutputEvent as RpcEvent},
shutdown::Controller,
types::GrandpaJustification,
};

mod client;
pub mod configuration;
Expand All @@ -34,6 +39,11 @@ pub enum Subscription {
Justification(GrandpaJustification),
}

#[derive(Clone, Debug)]
pub enum OutputEvent {
ConnectedHost(String),
}

#[derive(Debug, Deserialize, Clone)]
pub struct WrappedJustification(pub GrandpaJustification);

Expand Down Expand Up @@ -188,13 +198,15 @@ pub async fn init<T: Database + Clone>(
genesis_hash: &str,
rpc: &RPCConfig,
shutdown: Controller<String>,
rpc_sender: Option<UnboundedSender<RpcEvent>>,
) -> Result<(Client<T>, broadcast::Sender<Event>, SubscriptionLoop<T>)> {
let rpc_client = Client::new(
db.clone(),
Nodes::new(&rpc.full_node_ws),
genesis_hash,
rpc.retry.clone(),
shutdown,
rpc_sender,
)
.await?;
// create output channel for RPC Subscription Events
Expand Down
28 changes: 23 additions & 5 deletions core/src/network/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::RwLock;
use tokio::sync::{mpsc::UnboundedSender, RwLock};
use tokio_retry::Retry;
use tokio_stream::StreamExt;
use tracing::{debug, error, info, trace, warn};
Expand All @@ -31,6 +31,7 @@ use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof}
use crate::{
api::v2::types::Base64,
data::{Database, RpcNodeKey, SignerNonceKey},
network::rpc::OutputEvent as RpcEvent,
shutdown::Controller,
types::DEV_FLAG_GENHASH,
};
Expand Down Expand Up @@ -202,6 +203,7 @@ pub struct Client<T: Database> {
retry_config: RetryConfig,
expected_genesis_hash: String,
shutdown: Controller<String>,
rpc_sender: Option<UnboundedSender<RpcEvent>>,
}

pub struct SubmitResponse {
Expand All @@ -218,6 +220,7 @@ impl<D: Database> Client<D> {
expected_genesis_hash: &str,
retry_config: RetryConfig,
shutdown: Controller<String>,
rpc_sender: Option<UnboundedSender<RpcEvent>>,
) -> Result<Self> {
let (client, node) = Self::initialize_connection(
&nodes,
Expand All @@ -234,6 +237,7 @@ impl<D: Database> Client<D> {
retry_config,
expected_genesis_hash: expected_genesis_hash.to_string(),
shutdown,
rpc_sender,
};

client.db.put(RpcNodeKey, node);
Expand Down Expand Up @@ -270,8 +274,13 @@ impl<D: Database> Client<D> {
expected_genesis_hash: &str,
) -> Result<ConnectionAttempt<()>> {
// Not passing any RPC function calls since this is a first try of connecting RPC nodes
Self::try_connect_and_execute(nodes, expected_genesis_hash, |_| futures::future::ok(()))
.await
Self::try_connect_and_execute(
nodes,
expected_genesis_hash,
|_| futures::future::ok(()),
None,
)
.await
}

// Iterates through the RPC nodes, tries to create the first successful connection, verifies the genesis hash,
Expand All @@ -280,6 +289,7 @@ impl<D: Database> Client<D> {
nodes: &[Node],
expected_genesis_hash: &str,
f: F,
rpc_sender: Option<UnboundedSender<RpcEvent>>,
) -> Result<ConnectionAttempt<T>>
where
F: FnMut(SDK) -> Fut + Copy,
Expand All @@ -294,6 +304,9 @@ impl<D: Database> Client<D> {
match Self::try_node_connection_and_exec(node, expected_genesis_hash, f).await {
Ok(attempt) => {
info!("Successfully connected to RPC: {}", node.host);
if let Some(sender) = rpc_sender {
sender.send(RpcEvent::ConnectedHost(node.host.clone()))?;
}
return Ok(attempt);
},
Err(err) => {
Expand Down Expand Up @@ -444,8 +457,13 @@ impl<D: Database> Client<D> {
F: FnMut(SDK) -> Fut + Copy,
Fut: std::future::Future<Output = Result<T>>,
{
let nodes_fn =
|| async { Self::try_connect_and_execute(nodes, &self.expected_genesis_hash, f).await };
let nodes_fn = move || {
let rpc_sender = self.rpc_sender.clone();
async move {
Self::try_connect_and_execute(nodes, &self.expected_genesis_hash, f, rpc_sender)
.await
}
};

match self
.shutdown
Expand Down
1 change: 1 addition & 0 deletions crawler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ async fn run(config: Config, db: DB, shutdown: Controller<String>) -> Result<()>
&config.genesis_hash,
&config.rpc,
shutdown.clone(),
None,
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions fat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async fn run(config: Config, db: DB, shutdown: Controller<String>) -> Result<()>
&config.genesis_hash,
&config.rpc,
shutdown.clone(),
None,
)
.await?;

Expand Down
Loading