From 5fb0f95a756d162fd7881c832be2155b6747d306 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Terenti=C4=87?= Date: Wed, 16 Aug 2023 08:50:21 +0200 Subject: [PATCH] Add status endpoint to V2 API. --- src/api/server.rs | 12 ++++- src/api/v2/README.md | 59 ++++++++++++++++++++++ src/api/v2/handlers.rs | 67 +++++++++++++++++++++++- src/api/v2/mod.rs | 112 ++++++++++++++++++++++++++++++++++++++--- src/api/v2/types.rs | 83 ++++++++++++++++++++++++++++++ src/bin/avail-light.rs | 38 +++++++------- src/rpc.rs | 41 +++++++++++---- src/types.rs | 2 +- 8 files changed, 374 insertions(+), 40 deletions(-) diff --git a/src/api/server.rs b/src/api/server.rs index 3b9bb90db..28074d864 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -12,6 +12,7 @@ use crate::api::v2; use crate::{ api::v1::{self}, + rpc::Node, types::{RuntimeConfig, State}, }; use anyhow::Context; @@ -31,6 +32,7 @@ pub struct Server { pub state: Arc>, pub version: String, pub network_version: String, + pub node: Node, } impl Server { @@ -41,7 +43,7 @@ impl Server { http_server_port: port, app_id, .. - } = self.cfg; + } = self.cfg.clone(); let port = (port.1 > 0) .then(|| thread_rng().gen_range(port.0..=port.1)) @@ -49,7 +51,13 @@ impl Server { let v1_api = v1::routes(self.db.clone(), app_id, self.state.clone()); #[cfg(feature = "api-v2")] - let v2_api = v2::routes(self.version.clone(), self.network_version.clone()); + let v2_api = v2::routes( + self.version.clone(), + self.network_version.clone(), + self.node, + self.state.clone(), + self.cfg, + ); let cors = warp::cors() .allow_any_origin() diff --git a/src/api/v2/README.md b/src/api/v2/README.md index 4af6c8f43..442712dd6 100644 --- a/src/api/v2/README.md +++ b/src/api/v2/README.md @@ -34,6 +34,65 @@ Content-Type: application/json - **version** - the Avail Light Client version - **network_version** - Avail network version supported by the Avail Light Client +## **GET** `/v2/status` + +Gets current status and active modes of the light client. + +Response: + +```yaml +HTTP/1.1 200 OK +Content-Type: application/json + +{ + "modes": [ + "light", + "app", + "partition" + ], + "app_id": {app-id}, // Optional + "genesis_hash": "{genesis-hash}", + "network": "{network}", + "blocks": { + "synced": false, // Optional + "latest": {latest}, + "confidence": [ + { + "from": {from}, + "to": {to} + } // Optional + ], // Optional + "data": [ + { + "from": {from}, + "to": {to} + } // Optional + ] // Optional + }, + "partition": "{partition}" // Optional +} +``` + +- **modes** - active modes +- **app_id** - if **app** mode is active, this field contains configured application ID +- **genesis_hash** - genesis hash of the network to which the light client is connected +- **network** - network host, version and spec version light client is currently con +- **blocks** - state of processed blocks +- **partition** - if configured, displays partition which light client distributes to the peer to peer network + +### Modes + +- **light** - data availability sampling mode, the light client performs random sampling and calculates confidence +- **app** - light client fetches, verifies, and stores application-related data +- **partition** - light client fetches configured block partition and publishes it to the DHT + +### Blocks + +- **synced** - `true` if light client is synced past blocks, ommited if sync is not configured +- **latest** - number of latest block received from the node +- **confidence** - ranges of block which confidence has been achieved +- **data** - ranges of blocks which data has been retrieved and verified + # WebSocket API The Avail Light Client WebSocket API allows real-time communication between a client and a server over a persistent connection, enabling push notifications as an alternative to polling. Web socket API can be used on its own or in combination with HTTP API to enable different pull/push use cases. diff --git a/src/api/v2/handlers.rs b/src/api/v2/handlers.rs index 207b6fd86..9767477a3 100644 --- a/src/api/v2/handlers.rs +++ b/src/api/v2/handlers.rs @@ -1,8 +1,18 @@ use super::{ - types::{Client, Clients, Subscription, SubscriptionId, Version}, + types::{BlockRange, Blocks, Client, Clients, Status, Subscription, SubscriptionId, Version}, ws, }; -use std::convert::Infallible; +use crate::{ + api::v2::types::InternalServerError, + rpc::Node, + types::{RuntimeConfig, State}, +}; +use hyper::StatusCode; +use std::{ + convert::Infallible, + sync::{Arc, Mutex}, +}; +use tracing::info; use uuid::Uuid; use warp::{ws::Ws, Rejection, Reply}; @@ -28,3 +38,56 @@ pub async fn ws( // Multiple connections to the same client are currently allowed Ok(ws.on_upgrade(move |web_socket| ws::connect(subscription_id, web_socket, clients, version))) } + +pub async fn status( + config: RuntimeConfig, + node: Node, + state: Arc>, +) -> Result { + let state = match state.lock() { + Ok(state) => state, + Err(error) => { + info!("Cannot acquire lock for last_block: {error}"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }, + }; + let mut confidence: Vec = Vec::with_capacity(2); + if let Some(range) = state.sync_confidence_achieved.as_ref().map(From::from) { + confidence.push(range); + }; + if let Some(range) = state.confidence_achieved.as_ref().map(From::from) { + confidence.push(range); + }; + + let mut data: Vec = Vec::with_capacity(2); + if let Some(range) = state.sync_data_verified.as_ref().map(From::from) { + data.push(range); + }; + if let Some(range) = state.data_verified.as_ref().map(From::from) { + data.push(range); + }; + + let blocks = Blocks { + synced: state.synced.unwrap_or(false), + latest: state.latest, + confidence, + data, + }; + + let status = Status { + modes: (&config).into(), + app_id: config.app_id, + genesis_hash: format!("{:?}", node.genesis_hash), + network: node.network(), + blocks, + partition: config.block_matrix_partition, + }; + Ok(status) +} + +pub async fn handle_rejection(error: Rejection) -> Result { + if error.find::().is_some() { + return Ok(StatusCode::INTERNAL_SERVER_ERROR.into_response()); + } + Err(error) +} diff --git a/src/api/v2/mod.rs b/src/api/v2/mod.rs index 1f57e759c..8151c5030 100644 --- a/src/api/v2/mod.rs +++ b/src/api/v2/mod.rs @@ -1,5 +1,17 @@ -use self::types::{Clients, Version}; -use std::{collections::HashMap, convert::Infallible, sync::Arc}; +use crate::{ + rpc::Node, + types::{RuntimeConfig, State}, +}; + +use self::{ + handlers::handle_rejection, + types::{Clients, Version}, +}; +use std::{ + collections::HashMap, + convert::Infallible, + sync::{Arc, Mutex}, +}; use tokio::sync::RwLock; use warp::{Filter, Rejection, Reply}; @@ -19,6 +31,20 @@ fn version_route( .map(move || version.clone()) } +fn status_route( + config: RuntimeConfig, + node: Node, + state: Arc>, +) -> impl Filter + Clone { + warp::path!("v2" / "status") + .and(warp::get()) + .and(warp::any().map(move || config.clone())) + .and(warp::any().map(move || node.clone())) + .and(warp::any().map(move || state.clone())) + .then(handlers::status) + .map(types::handle_result) +} + fn subscriptions_route( clients: Clients, ) -> impl Filter + Clone { @@ -43,6 +69,9 @@ fn ws_route( pub fn routes( version: String, network_version: String, + node: Node, + state: Arc>, + config: RuntimeConfig, ) -> impl Filter + Clone { let clients: Clients = Arc::new(RwLock::new(HashMap::new())); let version = Version { @@ -50,25 +79,30 @@ pub fn routes( network_version, }; version_route(version.clone()) + .or(status_route(config, node, state)) .or(subscriptions_route(clients.clone())) .or(ws_route(clients, version)) + .recover(handle_rejection) } #[cfg(test)] mod tests { - use crate::api::v2::types::{ - Clients, DataFields, Subscription, SubscriptionId, Topics, Version, + use super::types::Client; + use crate::{ + api::v2::types::{Clients, DataFields, Subscription, SubscriptionId, Topics, Version}, + rpc::Node, + types::{RuntimeConfig, State}, }; + use kate_recovery::matrix::Partition; + use sp_core::H256; use std::{ collections::{HashMap, HashSet}, str::FromStr, - sync::Arc, + sync::{Arc, Mutex}, }; use tokio::sync::RwLock; use uuid::Uuid; - use super::types::Client; - fn v1() -> Version { Version { version: "v1.0.0".to_string(), @@ -76,6 +110,9 @@ mod tests { } } + const GENESIS_HASH: &str = "0xc590b3c924c35c2f241746522284e4709df490d73a38aaa7d6de4ed1eac2f500"; + const NETWORK: &str = "{host}/{system_version}/data-avail/0"; + #[tokio::test] async fn version_route() { let route = super::version_route(v1()); @@ -91,6 +128,67 @@ mod tests { ); } + impl Default for Node { + fn default() -> Self { + Self { + host: "{host}".to_string(), + system_version: "{system_version}".to_string(), + spec_version: 0, + genesis_hash: H256::from_str(GENESIS_HASH).unwrap(), + } + } + } + + #[tokio::test] + async fn status_route_defaults() { + let state = Arc::new(Mutex::new(State::default())); + let route = super::status_route(RuntimeConfig::default(), Node::default(), state); + let response = warp::test::request() + .method("GET") + .path("/v2/status") + .reply(&route) + .await; + + let expected = format!( + r#"{{"modes":["light"],"genesis_hash":"{GENESIS_HASH}","network":"{NETWORK}","blocks":{{"synced":false,"latest":0}}}}"# + ); + assert_eq!(response.body(), &expected); + } + + #[tokio::test] + async fn status_route() { + let runtime_config = RuntimeConfig { + app_id: Some(1), + sync_start_block: Some(10), + block_matrix_partition: Some(Partition { + number: 1, + fraction: 10, + }), + ..Default::default() + }; + let state = Arc::new(Mutex::new(State::default())); + { + let mut state = state.lock().unwrap(); + state.latest = 30; + state.set_confidence_achieved(20); + state.set_confidence_achieved(29); + state.set_sync_confidence_achieved(10); + state.set_sync_confidence_achieved(18); + } + + let route = super::status_route(runtime_config, Node::default(), state); + let response = warp::test::request() + .method("GET") + .path("/v2/status") + .reply(&route) + .await; + + let expected = format!( + r#"{{"modes":["light","app","partition"],"app_id":1,"genesis_hash":"{GENESIS_HASH}","network":"{NETWORK}","blocks":{{"synced":false,"latest":30,"confidence":[{{"first":10,"last":18}},{{"first":20,"last":29}}]}},"partition":"1/10"}}"# + ); + assert_eq!(response.body(), &expected); + } + fn all_topics() -> HashSet { vec![ Topics::HeaderVerified, diff --git a/src/api/v2/types.rs b/src/api/v2/types.rs index 58b639723..d2d9bb255 100644 --- a/src/api/v2/types.rs +++ b/src/api/v2/types.rs @@ -1,4 +1,5 @@ use anyhow::anyhow; +use kate_recovery::matrix::Partition; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, @@ -7,6 +8,13 @@ use std::{ use tokio::sync::{mpsc::UnboundedSender, RwLock}; use warp::{ws, Reply}; +use crate::types::{self, block_matrix_partition_format, RuntimeConfig}; + +#[derive(Debug)] +pub struct InternalServerError {} + +impl warp::reject::Reject for InternalServerError {} + #[derive(Serialize, Clone)] pub struct Version { pub version: String, @@ -19,6 +27,74 @@ impl Reply for Version { } } +#[derive(Serialize)] +pub struct BlockRange { + pub first: u32, + pub last: u32, +} + +impl From<&types::BlockRange> for BlockRange { + fn from(value: &types::BlockRange) -> Self { + BlockRange { + first: value.first, + last: value.last, + } + } +} + +#[derive(Serialize)] +pub struct Blocks { + pub synced: bool, + pub latest: u32, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub confidence: Vec, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub data: Vec, +} + +#[derive(Serialize)] +pub struct Status { + pub modes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub app_id: Option, + pub genesis_hash: String, + pub network: String, + pub blocks: Blocks, + #[serde( + skip_serializing_if = "Option::is_none", + with = "block_matrix_partition_format" + )] + pub partition: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum Mode { + Light, + App, + Partition, +} + +impl From<&RuntimeConfig> for Vec { + fn from(value: &RuntimeConfig) -> Self { + let mut result: Vec = vec![]; + result.push(Mode::Light); + if value.app_id.is_some() { + result.push(Mode::App); + } + if value.block_matrix_partition.is_some() { + result.push(Mode::Partition) + } + result + } +} + +impl Reply for Status { + fn into_response(self) -> warp::reply::Response { + warp::reply::json(&self).into_response() + } +} + #[derive(Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "kebab-case")] pub enum Topics { @@ -120,3 +196,10 @@ impl From for String { serde_json::to_string(&error).expect("Error is serializable") } } + +pub fn handle_result(result: Result) -> impl Reply { + match result { + Ok(ok) => ok.into_response(), + Err(err) => err.into_response(), + } +} diff --git a/src/bin/avail-light.rs b/src/bin/avail-light.rs index 597f73ac8..011238fdf 100644 --- a/src/bin/avail-light.rs +++ b/src/bin/avail-light.rs @@ -139,19 +139,6 @@ async fn run(error_sender: Sender) -> Result<()> { let db = init_db(&cfg.avail_path).context("Cannot initialize database")?; - // Spawn tokio task which runs one http server for handling RPC - let state = Arc::new(Mutex::new(State::default())); - - let server = avail_light::api::server::Server { - db: db.clone(), - cfg: cfg.clone(), - state: state.clone(), - version: format!("v{}", clap::crate_version!()), - network_version: EXPECTED_NETWORK_VERSION.to_string(), - }; - - tokio::task::spawn(server.run()); - // If in fat client mode, enable deleting local Kademlia records // This is a fat client memory optimization let kad_remove_local_record = cfg.block_matrix_partition.is_some(); @@ -236,28 +223,41 @@ async fn run(error_sender: Sender) -> Result<()> { let last_full_node_ws = avail_light::data::get_last_full_node_ws_from_db(db.clone())?; - let (rpc_client, last_full_node_ws) = avail_light::rpc::connect_to_the_full_node( + let (rpc_client, node) = avail_light::rpc::connect_to_the_full_node( &cfg.full_node_ws, last_full_node_ws, EXPECTED_NETWORK_VERSION, ) .await?; - store_last_full_node_ws_in_db(db.clone(), last_full_node_ws)?; + store_last_full_node_ws_in_db(db.clone(), node.host.clone())?; - let genesis_hash = rpc_client.genesis_hash(); - info!("Genesis hash: {genesis_hash:?}"); + info!("Genesis hash: {:?}", node.genesis_hash); if let Some(stored_genesis_hash) = avail_light::data::get_genesis_hash(db.clone())? { - if !genesis_hash.eq(&stored_genesis_hash) { + if !node.genesis_hash.eq(&stored_genesis_hash) { Err(anyhow!( "Genesis hash doesn't match the stored one! Clear the db or change nodes." ))? } } else { info!("No genesis hash is found in the db, storing the new hash now."); - avail_light::data::store_genesis_hash(db.clone(), genesis_hash)?; + avail_light::data::store_genesis_hash(db.clone(), node.genesis_hash)?; } + // Spawn tokio task which runs one http server for handling RPC + let state = Arc::new(Mutex::new(State::default())); + + let server = avail_light::api::server::Server { + db: db.clone(), + cfg: cfg.clone(), + state: state.clone(), + version: format!("v{}", clap::crate_version!()), + network_version: EXPECTED_NETWORK_VERSION.to_string(), + node, + }; + + tokio::task::spawn(server.run()); + let block_tx = if let Mode::AppClient(app_id) = Mode::from(cfg.app_id) { // communication channels being established for talking to // libp2p backed application client diff --git a/src/rpc.rs b/src/rpc.rs index e8d307894..de1608c4b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -17,7 +17,7 @@ use rand::{seq::SliceRandom, thread_rng, Rng}; use sp_core::ed25519; use tracing::{debug, info, instrument, warn}; -use crate::types::*; +use crate::{consts::EXPECTED_NETWORK_VERSION, types::*}; pub async fn get_block_hash(client: &avail::Client, block: u32) -> Result { client @@ -205,7 +205,7 @@ impl ExpectedVersion<'_> { /// Specification name is checked for exact match. /// Since runtime `spec_version` can be changed with runtime upgrade, `spec_version` is removed. /// NOTE: Runtime compatiblity check is currently not implemented. - pub fn matches(&self, node_version: String, spec_name: String) -> bool { + pub fn matches(&self, node_version: &str, spec_name: &str) -> bool { node_version.starts_with(self.version) && self.spec_name == spec_name } } @@ -216,13 +216,33 @@ impl Display for ExpectedVersion<'_> { } } +#[derive(Clone)] +pub struct Node { + pub host: String, + pub system_version: String, + pub spec_version: u32, + pub genesis_hash: H256, +} + +impl Node { + pub fn network(&self) -> String { + format!( + "{host}/{system_version}/{spec_name}/{spec_version}", + host = self.host, + system_version = self.system_version, + spec_name = EXPECTED_NETWORK_VERSION.spec_name, + spec_version = self.spec_version, + ) + } +} + /// Connects to the random full node from the list, /// trying to connect to the last connected full node as least priority. pub async fn connect_to_the_full_node( full_nodes: &[String], last_full_node: Option, expected_version: ExpectedVersion<'_>, -) -> Result<(avail::Client, String)> { +) -> Result<(avail::Client, Node)> { for full_node_ws in shuffle_full_nodes(full_nodes, last_full_node).iter() { let log_warn = |error| { warn!("Skipping connection to {full_node_ws}: {error}"); @@ -238,13 +258,19 @@ pub async fn connect_to_the_full_node( system_version, runtime_version.spec_name, runtime_version.spec_version, ); - if !expected_version.matches(system_version, runtime_version.spec_name) { + if !expected_version.matches(&system_version, &runtime_version.spec_name) { log_warn(anyhow!("expected {expected_version}, found {version}")); continue; } info!("Connection established to the node: {full_node_ws} <{version}>"); - return Ok((client, full_node_ws.clone())); + let node = Node { + host: full_node_ws.clone(), + system_version, + spec_version: client.runtime_version().spec_version, + genesis_hash: client.genesis_hash(), + }; + return Ok((client, node)); } Err(anyhow!("No working nodes")) } @@ -312,10 +338,7 @@ mod tests { spec_name: expected_spec_name, }; - assert_eq!( - expected.matches(version.to_string(), spec_name.to_string()), - matches - ); + assert_eq!(expected.matches(version, spec_name), matches); } proptest! { diff --git a/src/types.rs b/src/types.rs index 61274fc99..a7952110e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -89,7 +89,7 @@ impl From> for Mode { } } -mod block_matrix_partition_format { +pub mod block_matrix_partition_format { use kate_recovery::matrix::Partition; use serde::{self, Deserialize, Deserializer, Serializer};