From 5b0d3bd9f000ce9690840c7777ae8ca05043c513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Terenti=C4=87?= Date: Thu, 3 Aug 2023 12:38:53 +0200 Subject: [PATCH] Add status endpoint to V2 API. --- src/api/server.rs | 13 +++++- src/api/v2/README.md | 42 ++++++++++++++++++ src/api/v2/handlers.rs | 54 +++++++++++++++++++++++- src/api/v2/mod.rs | 96 +++++++++++++++++++++++++++++++++++++++--- src/api/v2/types.rs | 63 +++++++++++++++++++++++++++ src/main.rs | 41 +++++++++++------- src/rpc.rs | 6 +-- src/sync_client.rs | 62 ++++++++++++++++++++++----- src/types.rs | 2 +- 9 files changed, 338 insertions(+), 41 deletions(-) diff --git a/src/api/server.rs b/src/api/server.rs index 3b9bb90db..b77f611f7 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -31,6 +31,8 @@ pub struct Server { pub state: Arc>, pub version: String, pub network_version: String, + pub genesis_hash: String, + pub network: String, } impl Server { @@ -41,7 +43,7 @@ impl Server { http_server_port: port, app_id, .. - } = self.cfg; + } = self.cfg.clone(); let port = (port.1 > 0) .then(|| thread_rng().gen_range(port.0..=port.1)) @@ -49,7 +51,14 @@ impl Server { let v1_api = v1::routes(self.db.clone(), app_id, self.state.clone()); #[cfg(feature = "api-v2")] - let v2_api = v2::routes(self.version.clone(), self.network_version.clone()); + let v2_api = v2::routes( + self.version.clone(), + self.network_version.clone(), + self.genesis_hash, + self.network, + self.state.clone(), + self.cfg, + ); let cors = warp::cors() .allow_any_origin() diff --git a/src/api/v2/README.md b/src/api/v2/README.md index 4af6c8f43..35a67dd94 100644 --- a/src/api/v2/README.md +++ b/src/api/v2/README.md @@ -34,6 +34,48 @@ Content-Type: application/json - **version** - the Avail Light Client version - **network_version** - Avail network version supported by the Avail Light Client +## **GET** `/v2/status` + +Gets current status and active modes of the light client. + +- Use cases + - Monitoring of the active light clients + - Reconfiguration verification + - Development tooling + +Response: + +```yaml +HTTP/1.1 200 OK +Content-Type: application/json + +{ + "modes": ["light", "app", "partition"], + "app_id": {app-id}, // Optional + "genesis_hash": "{genesis-hash}", + "network": "{network}", + "latest_block": {latest-block}, // Optional + "latest_synced_block": {sync-block}, // Optional + "sync_depth": {sync-depth}, // Optional + "partition": "{partition}" // Optional +} +``` + +- **modes** - active modes +- **app_id** - if **app** mode is active, this field contains configured application ID +- **genesis_hash** - genesis hash of the network to which the light client is connected +- **network** - network host, version and spec version light client is currently con +- **latest_block** - latest processed block +- **latest_synced_block** - the latest processed block in the sync range +- **sync_depth** - number of blocks before the latest to sync on light client start +- **partition** - if configured, displays partition which light client distributes to the peer to peer network + +### Modes + +- **light** - data availability sampling mode, the light client performs random sampling and calculates confidence +- **app** - light client fetches, verifies, and stores application-related data +- **partition** - light client fetches configured block partition and publishes it to the DHT + # WebSocket API The Avail Light Client WebSocket API allows real-time communication between a client and a server over a persistent connection, enabling push notifications as an alternative to polling. Web socket API can be used on its own or in combination with HTTP API to enable different pull/push use cases. diff --git a/src/api/v2/handlers.rs b/src/api/v2/handlers.rs index 207b6fd86..86988ee49 100644 --- a/src/api/v2/handlers.rs +++ b/src/api/v2/handlers.rs @@ -1,8 +1,18 @@ +use crate::{ + api::v2::types::InternalServerError, + types::{RuntimeConfig, State}, +}; + use super::{ - types::{Client, Clients, Subscription, SubscriptionId, Version}, + types::{Client, Clients, Status, Subscription, SubscriptionId, Version}, ws, }; -use std::convert::Infallible; +use hyper::StatusCode; +use std::{ + convert::Infallible, + sync::{Arc, Mutex}, +}; +use tracing::info; use uuid::Uuid; use warp::{ws::Ws, Rejection, Reply}; @@ -28,3 +38,43 @@ pub async fn ws( // Multiple connections to the same client are currently allowed Ok(ws.on_upgrade(move |web_socket| ws::connect(subscription_id, web_socket, clients, version))) } + +pub async fn status( + config: RuntimeConfig, + genesis_hash: String, + network: String, + state: Arc>, +) -> Result { + let state = match state.lock() { + Ok(state) => state, + Err(error) => { + info!("Cannot acquire lock for last_block: {error}"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }, + }; + + let latest_block = state.confidence_achieved.as_ref().map(|range| range.last); + let latest_synced_block = state + .sync_confidence_achieved + .as_ref() + .map(|range| range.last); + + let status = Status { + modes: (&config).into(), + app_id: config.app_id, + genesis_hash, + network, + latest_block, + latest_synced_block, + sync_start_block: config.sync_start_block, + partition: config.block_matrix_partition, + }; + Ok(status) +} + +pub async fn handle_rejection(error: Rejection) -> Result { + if error.find::().is_some() { + return Ok(StatusCode::INTERNAL_SERVER_ERROR.into_response()); + } + Err(error) +} diff --git a/src/api/v2/mod.rs b/src/api/v2/mod.rs index 1f57e759c..56ccb9886 100644 --- a/src/api/v2/mod.rs +++ b/src/api/v2/mod.rs @@ -1,5 +1,14 @@ -use self::types::{Clients, Version}; -use std::{collections::HashMap, convert::Infallible, sync::Arc}; +use crate::types::{RuntimeConfig, State}; + +use self::{ + handlers::handle_rejection, + types::{Clients, Version}, +}; +use std::{ + collections::HashMap, + convert::Infallible, + sync::{Arc, Mutex}, +}; use tokio::sync::RwLock; use warp::{Filter, Rejection, Reply}; @@ -19,6 +28,22 @@ fn version_route( .map(move || version.clone()) } +fn status_route( + config: RuntimeConfig, + genesis_hash: String, + network: String, + state: Arc>, +) -> impl Filter + Clone { + warp::path!("v2" / "status") + .and(warp::get()) + .and(warp::any().map(move || config.clone())) + .and(warp::any().map(move || genesis_hash.clone())) + .and(warp::any().map(move || network.clone())) + .and(warp::any().map(move || state.clone())) + .then(handlers::status) + .map(types::handle_result) +} + fn subscriptions_route( clients: Clients, ) -> impl Filter + Clone { @@ -43,6 +68,10 @@ fn ws_route( pub fn routes( version: String, network_version: String, + genesis_hash: String, + network: String, + state: Arc>, + config: RuntimeConfig, ) -> impl Filter + Clone { let clients: Clients = Arc::new(RwLock::new(HashMap::new())); let version = Version { @@ -50,25 +79,28 @@ pub fn routes( network_version, }; version_route(version.clone()) + .or(status_route(config, genesis_hash, network, state)) .or(subscriptions_route(clients.clone())) .or(ws_route(clients, version)) + .recover(handle_rejection) } #[cfg(test)] mod tests { - use crate::api::v2::types::{ - Clients, DataFields, Subscription, SubscriptionId, Topics, Version, + use super::types::Client; + use crate::{ + api::v2::types::{Clients, DataFields, Subscription, SubscriptionId, Topics, Version}, + types::{RuntimeConfig, State}, }; + use kate_recovery::matrix::Partition; use std::{ collections::{HashMap, HashSet}, str::FromStr, - sync::Arc, + sync::{Arc, Mutex}, }; use tokio::sync::RwLock; use uuid::Uuid; - use super::types::Client; - fn v1() -> Version { Version { version: "v1.0.0".to_string(), @@ -91,6 +123,56 @@ mod tests { ); } + #[tokio::test] + async fn status_route_defaults() { + let genesis_hash = "{genesis-hash}".to_string(); + let network = "{network}".to_string(); + let state = Arc::new(Mutex::new(State::default())); + let route = super::status_route(RuntimeConfig::default(), genesis_hash, network, state); + let response = warp::test::request() + .method("GET") + .path("/v2/status") + .reply(&route) + .await; + + assert_eq!( + response.body(), + r#"{"modes":["light"],"genesis_hash":"{genesis-hash}","network":"{network}"}"# + ); + } + + #[tokio::test] + async fn status_route() { + let runtime_config = RuntimeConfig { + app_id: Some(1), + sync_start_block: Some(10), + block_matrix_partition: Some(Partition { + number: 1, + fraction: 10, + }), + ..Default::default() + }; + let state = Arc::new(Mutex::new(State::default())); + { + let mut state = state.lock().unwrap(); + state.set_confidence_achieved(20); + state.set_sync_confidence_achieved(10); + } + let genesis_hash = "{genesis-hash}".to_string(); + let network = "{network}".to_string(); + let route = super::status_route(runtime_config, genesis_hash, network, state); + let response = warp::test::request() + .method("GET") + .path("/v2/status") + .reply(&route) + .await; + + assert_eq!( + response.body(), + r#"{"modes":["light","app","partition"],"app_id":1,"genesis_hash":"{genesis-hash}","network":"{network}","latest_block":20,"latest_synced_block":10,"sync_start_block":10,"partition":"1/10"}"# + ); + } + fn all_topics() -> HashSet { vec![ Topics::HeaderVerified, diff --git a/src/api/v2/types.rs b/src/api/v2/types.rs index 58b639723..c1bbad959 100644 --- a/src/api/v2/types.rs +++ b/src/api/v2/types.rs @@ -1,4 +1,5 @@ use anyhow::anyhow; +use kate_recovery::matrix::Partition; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, @@ -7,6 +8,13 @@ use std::{ use tokio::sync::{mpsc::UnboundedSender, RwLock}; use warp::{ws, Reply}; +use crate::types::{block_matrix_partition_format, RuntimeConfig}; + +#[derive(Debug)] +pub struct InternalServerError {} + +impl warp::reject::Reject for InternalServerError {} + #[derive(Serialize, Clone)] pub struct Version { pub version: String, @@ -19,6 +27,54 @@ impl Reply for Version { } } +#[derive(Serialize)] +pub struct Status { + pub modes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub app_id: Option, + pub genesis_hash: String, + pub network: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub latest_block: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub latest_synced_block: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub sync_start_block: Option, + #[serde( + skip_serializing_if = "Option::is_none", + with = "block_matrix_partition_format" + )] + pub partition: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum Mode { + Light, + App, + Partition, +} + +impl From<&RuntimeConfig> for Vec { + fn from(value: &RuntimeConfig) -> Self { + let mut result: Vec = vec![]; + result.push(Mode::Light); + if value.app_id.is_some() { + result.push(Mode::App); + } + if value.block_matrix_partition.is_some() { + result.push(Mode::Partition) + } + result + } +} + +impl Reply for Status { + fn into_response(self) -> warp::reply::Response { + warp::reply::json(&self).into_response() + } +} + #[derive(Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "kebab-case")] pub enum Topics { @@ -120,3 +176,10 @@ impl From for String { serde_json::to_string(&error).expect("Error is serializable") } } + +pub fn handle_result(result: Result) -> impl Reply { + match result { + Ok(ok) => ok.into_response(), + Err(err) => err.into_response(), + } +} diff --git a/src/main.rs b/src/main.rs index ce941892b..eef93fc71 100644 --- a/src/main.rs +++ b/src/main.rs @@ -150,19 +150,6 @@ async fn run(error_sender: Sender) -> Result<()> { let db = init_db(&cfg.avail_path).context("Cannot initialize database")?; - // Spawn tokio task which runs one http server for handling RPC - let state = Arc::new(Mutex::new(State::default())); - - let server = api::server::Server { - db: db.clone(), - cfg: cfg.clone(), - state: state.clone(), - version: format!("v{}", clap::crate_version!()), - network_version: EXPECTED_NETWORK_VERSION.to_string(), - }; - - tokio::task::spawn(server.run()); - // If in fat client mode, enable deleting local Kademlia records // This is a fat client memory optimization let kad_remove_local_record = cfg.block_matrix_partition.is_some(); @@ -247,14 +234,14 @@ async fn run(error_sender: Sender) -> Result<()> { let last_full_node_ws = data::get_last_full_node_ws_from_db(db.clone())?; - let (rpc_client, last_full_node_ws) = rpc::connect_to_the_full_node( + let (rpc_client, last_full_node_ws, node_version) = rpc::connect_to_the_full_node( &cfg.full_node_ws, last_full_node_ws, EXPECTED_NETWORK_VERSION, ) .await?; - store_last_full_node_ws_in_db(db.clone(), last_full_node_ws)?; + store_last_full_node_ws_in_db(db.clone(), last_full_node_ws.clone())?; let genesis_hash = rpc_client.genesis_hash(); info!("Genesis hash: {genesis_hash:?}"); @@ -269,6 +256,29 @@ async fn run(error_sender: Sender) -> Result<()> { data::store_genesis_hash(db.clone(), genesis_hash)?; } + let node_network = format!( + "{host}/{node_version}/{spec_name}/{spec_version}", + host = last_full_node_ws, + spec_name = EXPECTED_NETWORK_VERSION.spec_name, + spec_version = rpc_client.runtime_version().spec_version + ); + + // Spawn tokio task which runs one http server for handling RPC + let state = Arc::new(Mutex::new(State::default())); + let latest_synced_block = Arc::new(Mutex::new(0u64)); + + let server = api::server::Server { + db: db.clone(), + cfg: cfg.clone(), + state: state.clone(), + version: format!("v{}", clap::crate_version!()), + network_version: EXPECTED_NETWORK_VERSION.to_string(), + genesis_hash: genesis_hash.to_string(), + network: node_network, + }; + + tokio::task::spawn(server.run()); + let block_tx = if let Mode::AppClient(app_id) = Mode::from(cfg.app_id) { // communication channels being established for talking to // libp2p backed application client @@ -305,6 +315,7 @@ async fn run(error_sender: Sender) -> Result<()> { pp.clone(), block_tx.clone(), state.clone(), + latest_synced_block.clone(), )); } diff --git a/src/rpc.rs b/src/rpc.rs index 088c4c2e1..f524666e1 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -222,7 +222,7 @@ pub async fn connect_to_the_full_node( full_nodes: &[String], last_full_node: Option, expected_version: ExpectedVersion<'_>, -) -> Result<(avail::Client, String)> { +) -> Result<(avail::Client, String, String)> { for full_node_ws in shuffle_full_nodes(full_nodes, last_full_node).iter() { let log_warn = |error| { warn!("Skipping connection to {full_node_ws}: {error}"); @@ -238,13 +238,13 @@ pub async fn connect_to_the_full_node( system_version, runtime_version.spec_name, runtime_version.spec_version, ); - if !expected_version.matches(system_version, runtime_version.spec_name) { + if !expected_version.matches(system_version.clone(), runtime_version.spec_name) { log_warn(anyhow!("expected {expected_version}, found {version}")); continue; } info!("Connection established to the node: {full_node_ws} <{version}>"); - return Ok((client, full_node_ws.clone())); + return Ok((client, full_node_ws.clone(), system_version)); } Err(anyhow!("No working nodes")) } diff --git a/src/sync_client.rs b/src/sync_client.rs index 9d70b367e..ac0bdcb1b 100644 --- a/src/sync_client.rs +++ b/src/sync_client.rs @@ -132,6 +132,7 @@ async fn process_block( cfg: &SyncClientConfig, pp: Arc, block_verified_sender: Option>, + latest_synced_block: Arc>, ) -> Result<()> { if sync_client .block_header_in_db(block_number) @@ -241,6 +242,11 @@ async fn process_block( let client_msg = BlockVerified::try_from(header).context("converting to message failed")?; + { + let mut lock = latest_synced_block.lock().unwrap(); + *lock = block_number.into(); + } + if let Some(ref channel) = block_verified_sender { if let Err(error) = channel.send(client_msg).await { error!("Cannot send block verified message: {error}"); @@ -267,6 +273,7 @@ pub async fn run( pp: Arc, block_verified_sender: Option>, state: Arc>, + latest_synced_block: Arc>, ) { let rpc_client = sync_client.get_client(); @@ -343,8 +350,15 @@ pub async fn run( // TODO: Should we handle unprocessed blocks differently? let block_verified_sender = block_verified_sender.clone(); let pp = pp.clone(); - if let Err(error) = - process_block(&sync_client, block_number, &cfg, pp, block_verified_sender).await + if let Err(error) = process_block( + &sync_client, + block_number, + &cfg, + pp, + block_verified_sender, + latest_synced_block.clone(), + ) + .await { error!(block_number, "Cannot process block: {error:#}"); } else { @@ -516,9 +530,18 @@ mod tests { .expect_insert_cells_into_dht() .withf(move |x, _| *x == 42) .returning(move |_, _| Box::pin(async move { 1f32 })); - process_block(&mock_client, 42, &cfg, pp, Some(block_tx)) - .await - .unwrap(); + + let last_synced_block = Arc::new(Mutex::new(0)); + process_block( + &mock_client, + 42, + &cfg, + pp, + Some(block_tx), + last_synced_block, + ) + .await + .unwrap(); } #[tokio::test] @@ -651,9 +674,17 @@ mod tests { .expect_insert_cells_into_dht() .withf(move |x, _| *x == 42) .returning(move |_, _| Box::pin(async move { 1f32 })); - process_block(&mock_client, 42, &cfg, pp, Some(block_tx)) - .await - .unwrap(); + let last_synced_block = Arc::new(Mutex::new(0)); + process_block( + &mock_client, + 42, + &cfg, + pp, + Some(block_tx), + last_synced_block, + ) + .await + .unwrap(); } #[tokio::test] pub async fn test_header_in_dbstore() { @@ -667,8 +698,17 @@ mod tests { .returning(|_| Ok(true)); mock_client.expect_get_header_by_block_number().never(); mock_client.block_header_in_db(42).unwrap(); - process_block(&mock_client, 42, &cfg, pp, Some(block_tx)) - .await - .unwrap(); + + let last_synced_block = Arc::new(Mutex::new(0)); + process_block( + &mock_client, + 42, + &cfg, + pp, + Some(block_tx), + last_synced_block, + ) + .await + .unwrap(); } } diff --git a/src/types.rs b/src/types.rs index 442af0e02..f8b51c953 100644 --- a/src/types.rs +++ b/src/types.rs @@ -89,7 +89,7 @@ impl From> for Mode { } } -mod block_matrix_partition_format { +pub mod block_matrix_partition_format { use kate_recovery::matrix::Partition; use serde::{self, Deserialize, Deserializer, Serializer};