From 986c3db56d2998f1d49c2b50ed1158e3167e900f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Terenti=C4=87?= Date: Tue, 3 Dec 2024 11:17:33 +0100 Subject: [PATCH] Support WASM in the v2 API --- Cargo.lock | 1 - core/Cargo.toml | 2 +- core/src/api/mod.rs | 3 + core/src/api/types.rs | 38 +- core/src/api/v2/messages.rs | 46 +++ core/src/api/v2/mod.rs | 793 ++---------------------------------- core/src/api/v2/routes.rs | 774 +++++++++++++++++++++++++++++++++++ core/src/api/v2/ws.rs | 37 +- core/src/lib.rs | 1 - 9 files changed, 880 insertions(+), 815 deletions(-) create mode 100644 core/src/api/v2/messages.rs create mode 100644 core/src/api/v2/routes.rs diff --git a/Cargo.lock b/Cargo.lock index 433a4f7e0..a8440b26e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1062,7 +1062,6 @@ dependencies = [ "uuid", "void", "warp", - "wasm-bindgen", "web-time", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 0f7cedd05..cfc481fcc 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -78,12 +78,12 @@ color-eyre = { workspace = true } ed25519-compact = "2.1.1" # NOTE: This is used due bug explained at: https://github.com/tomaka/wasm-timer/pull/13 fluvio-wasm-timer = "0.2.5" +hyper = { version = "0.14.23", features = ["http1"] } libp2p = { workspace = true, features = ["wasm-bindgen"] } libp2p-webrtc-websys = { workspace = true } rand = { workspace = true, features = ["std_rng"] } thiserror-no-std = "2.0.2" tokio_with_wasm = { version = "0.7.1", default-features = false, features = ["sync", "macros", "rt", "time"] } -wasm-bindgen = "0.2.90" web-time = "1.1.0" [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] diff --git a/core/src/api/mod.rs b/core/src/api/mod.rs index dd000c7eb..e50695161 100644 --- a/core/src/api/mod.rs +++ b/core/src/api/mod.rs @@ -1,6 +1,9 @@ pub mod configuration; +#[cfg(not(target_arch = "wasm32"))] pub mod diagnostics; +#[cfg(not(target_arch = "wasm32"))] pub mod server; pub mod types; +#[cfg(not(target_arch = "wasm32"))] mod v1; pub mod v2; diff --git a/core/src/api/types.rs b/core/src/api/types.rs index c3d25f311..bc00b7b80 100644 --- a/core/src/api/types.rs +++ b/core/src/api/types.rs @@ -8,19 +8,21 @@ use avail_rust::{ AvailHeader, H256, }; use codec::Encode; -use color_eyre::{ - eyre::{eyre, WrapErr}, - Report, Result, -}; +#[cfg(not(target_arch = "wasm32"))] +use color_eyre::eyre::eyre; +use color_eyre::{eyre::WrapErr, Report, Result}; use derive_more::From; -use hyper::{http, StatusCode}; +#[cfg(not(target_arch = "wasm32"))] +use hyper::http; +use hyper::StatusCode; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::collections::HashSet; +#[cfg(not(target_arch = "wasm32"))] +use std::{collections::HashMap, sync::Arc}; +#[cfg(not(target_arch = "wasm32"))] use tokio::sync::{mpsc::UnboundedSender, RwLock}; use uuid::Uuid; +#[cfg(not(target_arch = "wasm32"))] use warp::{ ws::{self, Message}, Reply, @@ -41,6 +43,7 @@ use crate::{ #[derive(Debug)] pub struct InternalServerError {} +#[cfg(not(target_arch = "wasm32"))] impl warp::reject::Reject for InternalServerError {} #[derive(Serialize, Deserialize, Clone, Debug)] @@ -49,6 +52,7 @@ pub struct Version { pub network_version: String, } +#[cfg(not(target_arch = "wasm32"))] impl Reply for Version { fn into_response(self) -> warp::reply::Response { warp::reply::json(&self).into_response() @@ -124,6 +128,7 @@ pub struct SubmitResponse { pub index: u32, } +#[cfg(not(target_arch = "wasm32"))] impl Reply for SubmitResponse { fn into_response(self) -> warp::reply::Response { warp::reply::json(&self).into_response() @@ -174,6 +179,7 @@ impl From<&SharedConfig> for Vec { } } +#[cfg(not(target_arch = "wasm32"))] impl Reply for Status { fn into_response(self) -> warp::reply::Response { warp::reply::json(&self).into_response() @@ -318,6 +324,7 @@ impl Block { } } +#[cfg(not(target_arch = "wasm32"))] impl Reply for Block { fn into_response(self) -> warp::reply::Response { warp::reply::json(&self).into_response() @@ -347,6 +354,7 @@ pub struct Header { digest: Digest, } +#[cfg(not(target_arch = "wasm32"))] impl Reply for Header { fn into_response(self) -> warp::reply::Response { warp::reply::json(&self).into_response() @@ -541,6 +549,7 @@ pub struct DataResponse { pub data_transactions: Vec, } +#[cfg(not(target_arch = "wasm32"))] impl Reply for DataResponse { fn into_response(self) -> warp::reply::Response { warp::reply::json(&self).into_response() @@ -612,6 +621,7 @@ pub enum PublishMessage { DataVerified(DataMessage), } +#[cfg(not(target_arch = "wasm32"))] impl PublishMessage { fn apply_filter(&mut self, fields: &HashSet) { match self { @@ -624,6 +634,7 @@ impl PublishMessage { } } +#[cfg(not(target_arch = "wasm32"))] impl TryFrom for Message { type Error = Report; fn try_from(value: PublishMessage) -> Result { @@ -633,13 +644,16 @@ impl TryFrom for Message { } } +#[cfg(not(target_arch = "wasm32"))] pub type Sender = UnboundedSender>; +#[cfg(not(target_arch = "wasm32"))] pub struct WsClient { pub subscription: Subscription, pub sender: Option, } +#[cfg(not(target_arch = "wasm32"))] impl WsClient { pub fn new(subscription: Subscription) -> Self { WsClient { @@ -659,9 +673,11 @@ impl WsClient { } } +#[cfg(not(target_arch = "wasm32"))] #[derive(Clone)] pub struct WsClients(pub Arc>>); +#[cfg(not(target_arch = "wasm32"))] impl WsClients { pub async fn set_sender(&self, subscription_id: &str, sender: Sender) -> Result<()> { let mut clients = self.0.write().await; @@ -701,6 +717,7 @@ impl WsClients { } } +#[cfg(not(target_arch = "wasm32"))] impl Default for WsClients { fn default() -> Self { Self(Arc::new(RwLock::new(HashMap::new()))) @@ -712,6 +729,7 @@ pub struct SubscriptionId { pub subscription_id: String, } +#[cfg(not(target_arch = "wasm32"))] impl Reply for SubscriptionId { fn into_response(self) -> warp::reply::Response { warp::reply::json(&self).into_response() @@ -748,6 +766,7 @@ impl Response { } } +#[cfg(not(target_arch = "wasm32"))] impl TryFrom for Request { type Error = Report; @@ -819,6 +838,7 @@ impl Error { } } +#[cfg(not(target_arch = "wasm32"))] impl Reply for Error { fn into_response(self) -> warp::reply::Response { http::Response::builder() diff --git a/core/src/api/v2/messages.rs b/core/src/api/v2/messages.rs new file mode 100644 index 000000000..a77f00689 --- /dev/null +++ b/core/src/api/v2/messages.rs @@ -0,0 +1,46 @@ +use super::transactions; +use crate::{ + api::{ + configuration::SharedConfig, + types::{Error, Payload, Request, Response, Status, Version, WsResponse}, + }, + data::{Database, RpcNodeKey}, +}; +use std::sync::Arc; + +pub async fn handle_request( + request: Request, + version: &str, + config: &SharedConfig, + submitter: Option>, + db: impl Database, +) -> Result { + let request_id = request.request_id; + match request.payload { + Payload::Version => { + let version = Version { + version: version.to_string(), + network_version: db.get(RpcNodeKey).unwrap_or_default().system_version, + }; + Ok(Response::new(request_id, version).into()) + }, + Payload::Status => { + let status = Status::new(config, db); + Ok(Response::new(request_id, status).into()) + }, + Payload::Submit(transaction) => { + let Some(submitter) = submitter else { + return Err(Error::bad_request(request_id, "Submit is not configured.")); + }; + if transaction.is_empty() { + return Err(Error::bad_request(request_id, "Transaction is empty.")); + } + + submitter + .submit(transaction) + .await + .map(|response| Response::new(request_id, response).into()) + .map_err(Error::internal_server_error) + }, + } +} diff --git a/core/src/api/v2/mod.rs b/core/src/api/v2/mod.rs index 98a68b189..0cb9ea981 100644 --- a/core/src/api/v2/mod.rs +++ b/core/src/api/v2/mod.rs @@ -1,139 +1,36 @@ +#[cfg(not(target_arch = "wasm32"))] use super::configuration::SharedConfig; -use std::{convert::Infallible, fmt::Display, sync::Arc}; +#[cfg(not(target_arch = "wasm32"))] +use crate::{ + api::types::{PublishMessage, Topic}, + data::Database, + network::rpc::Client, +}; +#[cfg(not(target_arch = "wasm32"))] +use std::{fmt::Display, sync::Arc}; +#[cfg(not(target_arch = "wasm32"))] use tokio::sync::broadcast; +#[cfg(not(target_arch = "wasm32"))] use tracing::{debug, error, info}; +#[cfg(not(target_arch = "wasm32"))] use warp::{Filter, Rejection, Reply}; +#[cfg(not(target_arch = "wasm32"))] use crate::{ - api::{ - server::{handle_rejection, log_internal_server_error}, - types::{DataQuery, PublishMessage, Topic, WsClients}, - }, - data::Database, - network::rpc::Client, + api::{server::handle_rejection, types::WsClients}, types::IdentityConfig, }; +#[cfg(not(target_arch = "wasm32"))] mod handlers; +mod messages; +#[cfg(not(target_arch = "wasm32"))] +mod routes; mod transactions; +#[cfg(not(target_arch = "wasm32"))] mod ws; -async fn optionally(value: Option) -> Result { - match value { - Some(value) => Ok(value), - None => Err(warp::reject::not_found()), - } -} - -fn with_db( - db: T, -) -> impl Filter + Clone { - warp::any().map(move || db.clone()) -} - -fn with_ws_clients( - clients: WsClients, -) -> impl Filter + Clone { - warp::any().map(move || clients.clone()) -} - -fn version_route( - version: String, - db: impl Database + Clone + Send, -) -> impl Filter + Clone { - warp::path!("v2" / "version") - .and(warp::get()) - .map(move || version.clone()) - .and(with_db(db)) - .map(handlers::version) -} - -fn status_route( - config: SharedConfig, - db: impl Database + Clone + Send, -) -> impl Filter + Clone { - warp::path!("v2" / "status") - .and(warp::get()) - .and(warp::any().map(move || config.clone())) - .and(with_db(db)) - .map(handlers::status) -} - -fn block_route( - config: SharedConfig, - db: impl Database + Clone + Send, -) -> impl Filter + Clone { - warp::path!("v2" / "blocks" / u32) - .and(warp::get()) - .and(warp::any().map(move || config.clone())) - .and(with_db(db)) - .then(handlers::block) - .map(log_internal_server_error) -} - -fn block_header_route( - config: SharedConfig, - db: impl Database + Clone + Send, -) -> impl Filter + Clone { - warp::path!("v2" / "blocks" / u32 / "header") - .and(warp::get()) - .and(warp::any().map(move || config.clone())) - .and(with_db(db)) - .then(handlers::block_header) - .map(log_internal_server_error) -} - -fn block_data_route( - config: SharedConfig, - db: impl Database + Clone + Send, -) -> impl Filter + Clone { - warp::path!("v2" / "blocks" / u32 / "data") - .and(warp::get()) - .and(warp::query::()) - .and(warp::any().map(move || config.clone())) - .and(with_db(db)) - .then(handlers::block_data) - .map(log_internal_server_error) -} - -fn submit_route( - submitter: Option>, -) -> impl Filter + Clone { - warp::path!("v2" / "submit") - .and(warp::post()) - .and_then(move || optionally(submitter.clone())) - .and(warp::body::json()) - .then(handlers::submit) - .map(log_internal_server_error) -} - -fn subscriptions_route( - clients: WsClients, -) -> impl Filter + Clone { - warp::path!("v2" / "subscriptions") - .and(warp::post()) - .and(warp::body::json()) - .and(with_ws_clients(clients)) - .and_then(handlers::subscriptions) -} - -fn ws_route( - clients: WsClients, - version: String, - config: SharedConfig, - submitter: Option>, - db: impl Database + Clone + Send + 'static, -) -> impl Filter + Clone { - warp::path!("v2" / "ws" / String) - .and(warp::ws()) - .and(with_ws_clients(clients)) - .and(warp::any().map(move || version.clone())) - .and(warp::any().map(move || config.clone())) - .and(warp::any().map(move || submitter.clone())) - .and(with_db(db)) - .and_then(handlers::ws) -} - +#[cfg(not(target_arch = "wasm32"))] pub async fn publish>>( topic: Topic, mut receiver: broadcast::Receiver, @@ -174,6 +71,7 @@ pub async fn publish>>( } } +#[cfg(not(target_arch = "wasm32"))] #[allow(clippy::too_many_arguments)] pub fn routes( version: String, @@ -193,6 +91,8 @@ pub fn routes( }) }); + use routes::*; + version_route(version.clone(), db.clone()) .or(status_route(config.clone(), db.clone())) .or(block_route(config.clone(), db.clone())) @@ -203,650 +103,3 @@ pub fn routes( .or(ws_route(ws_clients, version, config, submitter, db.clone())) .recover(handle_rejection) } - -#[cfg(test)] -mod tests { - use super::transactions; - use crate::{ - api::{ - configuration::SharedConfig, - types::{ - DataField, ErrorCode, SubmitResponse, Subscription, SubscriptionId, Topic, - Transaction, WsClients, WsError, WsResponse, - }, - }, - data::{ - self, AchievedConfidenceKey, AchievedSyncConfidenceKey, AppDataKey, BlockHeaderKey, - Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, MemoryDB, RpcNodeKey, - VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, VerifiedSyncDataKey, - }, - network::rpc::Node, - types::BlockRange, - }; - use async_trait::async_trait; - use avail_rust::{ - avail::runtime_types::avail_core::{ - data_lookup::compact::{CompactDataLookup, DataLookupItem}, - header::extension::{v3, HeaderExtension}, - kate_commitment::v3::KateCommitment, - AppId, - }, - subxt::config::substrate::Digest, - AvailHeader, H256, - }; - use hyper::StatusCode; - use std::{collections::HashSet, str::FromStr, sync::Arc}; - use test_case::test_case; - use uuid::Uuid; - - fn default_node() -> Node { - Node { - host: "host".to_string(), - system_version: "nv1.0.0".to_string(), - spec_version: 0, - genesis_hash: H256::zero(), - } - } - - #[tokio::test] - async fn version_route() { - let db = MemoryDB::default(); - db.put(RpcNodeKey, default_node()); - let route = super::version_route("v1.0.0".to_string(), db); - let response = warp::test::request() - .method("GET") - .path("/v2/version") - .reply(&route) - .await; - - assert_eq!( - response.body(), - r#"{"version":"v1.0.0","network_version":"nv1.0.0"}"# - ); - } - - #[tokio::test] - async fn status_route_defaults() { - let db = MemoryDB::default(); - db.put(RpcNodeKey, default_node()); - let route = super::status_route(SharedConfig::default(), db); - let response = warp::test::request() - .method("GET") - .path("/v2/status") - .reply(&route) - .await; - - let gen_hash = H256::default(); - let expected = format!( - r#"{{"modes":["light"],"genesis_hash":"{:x?}","network":"host/nv1.0.0/0","blocks":{{"latest":0}}}}"#, - gen_hash - ); - assert_eq!(response.body(), &expected); - } - - #[tokio::test] - async fn status_route() { - let runtime_config = SharedConfig { - app_id: Some(1), - sync_start_block: Some(10), - ..Default::default() - }; - let db = MemoryDB::default(); - db.put(RpcNodeKey, default_node()); - - db.put(IsSyncedKey, false); - db.put(LatestHeaderKey, 30); - - let mut achieved_confidence = BlockRange::init(20); - achieved_confidence.last = 29; - db.put(AchievedConfidenceKey, achieved_confidence); - - let mut verified_sync_data = BlockRange::init(10); - verified_sync_data.last = 18; - db.put(VerifiedSyncDataKey, verified_sync_data); - - let mut verified_data = BlockRange::init(20); - verified_data.last = 29; - db.put(VerifiedDataKey, verified_data.clone()); - - let mut achieved_sync_confidence = BlockRange::init(10); - achieved_sync_confidence.last = 19; - db.put(AchievedSyncConfidenceKey, achieved_sync_confidence); - - let route = super::status_route(runtime_config, db); - let response = warp::test::request() - .method("GET") - .path("/v2/status") - .reply(&route) - .await; - - let gen_hash = H256::default(); - let expected = format!( - r#"{{"modes":["light","app"],"app_id":1,"genesis_hash":"{:#x}","network":"host/nv1.0.0/0","blocks":{{"latest":30,"available":{{"first":20,"last":29}},"app_data":{{"first":20,"last":29}},"historical_sync":{{"synced":false,"available":{{"first":10,"last":19}},"app_data":{{"first":10,"last":18}}}}}}}}"#, - gen_hash - ); - assert_eq!(response.body(), &expected); - } - - #[test_case(1, 2)] - #[test_case(10, 11)] - #[test_case(10, 20)] - #[tokio::test] - async fn block_route_not_found(latest: u32, block_number: u32) { - let config = SharedConfig::default(); - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, latest); - let route = super::block_route(config, db); - let response = warp::test::request() - .method("GET") - .path(&format!("/v2/blocks/{block_number}")) - .reply(&route) - .await; - - assert_eq!(response.status(), StatusCode::NOT_FOUND); - } - - #[tokio::test] - async fn block_route_incomplete() { - let config = SharedConfig::default(); - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, 10); - db.put(VerifiedHeaderKey, BlockRange::init(10)); - db.put(VerifiedDataKey, BlockRange::init(10)); - db.put(BlockHeaderKey(10), incomplete_header()); - let route = super::block_route(config, db); - let response = warp::test::request() - .method("GET") - .path("/v2/blocks/10") - .reply(&route) - .await; - - assert_eq!(response.status(), StatusCode::OK); - assert_eq!( - response.body(), - r#"{"status":"incomplete","confidence":null}"# - ); - } - - #[tokio::test] - async fn block_route_finished() { - let config = SharedConfig::default(); - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, 10); - db.put(VerifiedHeaderKey, BlockRange::init(10)); - db.put(VerifiedDataKey, BlockRange::init(10)); - db.put(VerifiedCellCountKey(10), 4); - db.put(BlockHeaderKey(10), header()); - let route = super::block_route(config, db); - let response = warp::test::request() - .method("GET") - .path("/v2/blocks/10") - .reply(&route) - .await; - - assert_eq!(response.status(), StatusCode::OK); - assert_eq!( - response.body(), - r#"{"status":"finished","confidence":93.75}"# - ); - } - - #[test_case(0, r#"Block header is not available"# ; "Block is unavailable")] - #[test_case(6, r#"Block header is not available"# ; "Block is pending")] - #[test_case(10, r#"Block header is not available"# ; "Block is in verifying-header state")] - #[tokio::test] - async fn block_header_route_bad_request(block_number: u32, expected: &str) { - let config = SharedConfig { - sync_start_block: Some(1), - ..Default::default() - }; - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, 10); - db.put(VerifiedHeaderKey, BlockRange::init(9)); - db.put(LatestSyncKey, 5); - db.put(BlockHeaderKey(block_number), header()); - let route = super::block_header_route(config, db); - let response = warp::test::request() - .method("GET") - .path(&format!("/v2/blocks/{block_number}/header")) - .reply(&route) - .await; - assert_eq!(response.status(), StatusCode::BAD_REQUEST); - assert_eq!(response.body(), expected); - } - - #[tokio::test] - async fn block_header_route_not_found() { - let config = SharedConfig::default(); - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, 10); - let route = super::block_header_route(config, db); - let response = warp::test::request() - .method("GET") - .path("/v2/blocks/11/header") - .reply(&route) - .await; - assert_eq!(response.status(), StatusCode::NOT_FOUND); - } - - fn header() -> AvailHeader { - AvailHeader { - parent_hash: H256::default(), - number: 1, - state_root: H256::default(), - extrinsics_root: H256::default(), - extension: HeaderExtension::V3(v3::HeaderExtension { - commitment: KateCommitment::default(), - app_lookup: CompactDataLookup { - size: 1, - index: vec![], - }, - }), - digest: Digest { logs: vec![] }, - } - } - - fn incomplete_header() -> AvailHeader { - AvailHeader { - parent_hash: H256::default(), - number: 1, - state_root: H256::default(), - extrinsics_root: H256::default(), - extension: HeaderExtension::V3(v3::HeaderExtension { - commitment: KateCommitment::default(), - app_lookup: CompactDataLookup { - size: 0, - index: vec![DataLookupItem { - app_id: AppId(0), - start: 0, - }], - }, - }), - digest: Digest { logs: vec![] }, - } - } - - #[tokio::test] - async fn block_header_route_ok() { - let config = SharedConfig::default(); - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, 1); - db.put(VerifiedHeaderKey, BlockRange::init(1)); - db.put(BlockHeaderKey(1), header()); - let route = super::block_header_route(config, db); - let response = warp::test::request() - .method("GET") - .path("/v2/blocks/1/header") - .reply(&route) - .await; - assert_eq!( - response.body(), - r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]}}"# - ); - } - - #[test_case(0, r#"Block data is not available"# ; "Block is unavailable")] - #[test_case(6, r#"Block data is not available"# ; "Block is pending")] - #[test_case(8, r#"Block data is not available"# ; "Block is in verifying-data state")] - #[test_case(9, r#"Block data is not available"# ; "Block is in verifying-confidence state")] - #[test_case(10, r#"Block data is not available"# ; "Block is in verifying-header state")] - #[tokio::test] - async fn block_data_route_bad_request(block_number: u32, expected: &str) { - let config = SharedConfig { - app_id: Some(1), - sync_start_block: Some(1), - ..Default::default() - }; - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, 10); - db.put(VerifiedHeaderKey, BlockRange::init(10)); - db.put(AchievedConfidenceKey, BlockRange::init(9)); - db.put(VerifiedDataKey, BlockRange::init(8)); - db.put(LatestSyncKey, 5); - db.put(BlockHeaderKey(block_number), header()); - let route = super::block_data_route(config, db); - let response = warp::test::request() - .method("GET") - .path(&format!("/v2/blocks/{block_number}/data")) - .reply(&route) - .await; - assert_eq!(response.status(), StatusCode::BAD_REQUEST); - assert_eq!(response.body(), expected); - } - - #[tokio::test] - async fn block_data_route_not_found() { - let config = SharedConfig::default(); - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, 10); - let route = super::block_data_route(config, db); - let response = warp::test::request() - .method("GET") - .path("/v2/blocks/11/data") - .reply(&route) - .await; - assert_eq!(response.status(), StatusCode::NOT_FOUND); - } - - #[tokio::test] - async fn block_data_route_ok_empty() { - let config = SharedConfig { - app_id: Some(1), - ..Default::default() - }; - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, 10); - db.put(VerifiedHeaderKey, BlockRange::init(5)); - db.put(AchievedConfidenceKey, BlockRange::init(5)); - db.put(VerifiedDataKey, BlockRange::init(5)); - db.put(BlockHeaderKey(5), header()); - let route = super::block_data_route(config, db); - let response = warp::test::request() - .method("GET") - .path("/v2/blocks/5/data") - .reply(&route) - .await; - assert_eq!(response.status(), StatusCode::OK); - assert_eq!( - response.body(), - r#"{"block_number":5,"data_transactions":[]}"# - ); - } - - #[tokio::test] - async fn block_data_route_ok() { - let config = SharedConfig { - app_id: Some(1), - ..Default::default() - }; - let db = data::MemoryDB::default(); - db.put(LatestHeaderKey, 10); - db.put(VerifiedHeaderKey, BlockRange::init(5)); - db.put(AchievedConfidenceKey, BlockRange::init(5)); - db.put(VerifiedDataKey, BlockRange::init(5)); - db.put( - AppDataKey(1, 5), - vec![vec![ - 189, 1, 132, 0, 212, 53, 147, 199, 21, 253, 211, 28, 97, 20, 26, 189, 4, 169, 159, - 214, 130, 44, 133, 88, 133, 76, 205, 227, 154, 86, 132, 231, 165, 109, 162, 125, 1, - 50, 12, 43, 176, 19, 42, 23, 73, 70, 223, 198, 180, 103, 34, 60, 246, 184, 49, 140, - 113, 174, 234, 229, 95, 71, 18, 92, 158, 185, 168, 140, 126, 12, 191, 156, 50, 234, - 8, 4, 68, 137, 5, 156, 94, 209, 7, 169, 105, 62, 63, 1, 122, 253, 195, 112, 173, - 239, 21, 73, 163, 240, 106, 109, 131, 0, 4, 0, 4, 29, 1, 20, 116, 101, 115, 116, - 10, - ]], - ); - db.put(BlockHeaderKey(5), header()); - let route = super::block_data_route(config, db); - let response = warp::test::request() - .method("GET") - .path("/v2/blocks/5/data") - .reply(&route) - .await; - assert_eq!(response.status(), StatusCode::OK); - assert_eq!( - response.body(), - r#"{"block_number":5,"data_transactions":[{"data":"dGVzdAo=","extrinsic":"vQGEANQ1k8cV/dMcYRQavQSpn9aCLIVYhUzN45pWhOelbaJ9ATIMK7ATKhdJRt/GtGciPPa4MYxxrurlX0cSXJ65qIx+DL+cMuoIBESJBZxe0QepaT4/AXr9w3Ct7xVJo/BqbYMABAAEHQEUdGVzdAo="}]}"# - ); - } - - fn all_topics() -> HashSet { - vec![ - Topic::HeaderVerified, - Topic::ConfidenceAchieved, - Topic::DataVerified, - ] - .into_iter() - .collect() - } - - fn all_data_fields() -> HashSet { - vec![DataField::Extrinsic, DataField::Data] - .into_iter() - .collect() - } - - #[derive(Clone)] - struct MockSubmitter {} - - #[async_trait] - impl transactions::Submit for MockSubmitter { - async fn submit(&self, _: Transaction) -> color_eyre::Result { - Ok(SubmitResponse { - block_number: 0, - block_hash: H256::random(), - hash: H256::random(), - index: 0, - }) - } - } - - #[test_case(r#"{"raw":""}"#, b"Request body deserialize error: unknown variant `raw`" ; "Invalid json schema")] - #[test_case(r#"{"data":"dHJhbnooNhY3Rpb24:"}"#, b"Request body deserialize error: Invalid byte" ; "Invalid base64 value")] - #[tokio::test] - async fn submit_route_bad_request(json: &str, message: &[u8]) { - let route = super::submit_route(Some(Arc::new(MockSubmitter {}))); - let response = warp::test::request() - .method("POST") - .path("/v2/submit") - .body(json) - .reply(&route) - .await; - assert_eq!(response.status(), StatusCode::BAD_REQUEST); - assert!(response.body().starts_with(message)); - } - - #[test_case(r#"{"data":"dHJhbnNhY3Rpb24K"}"# ; "No errors in case of submitted data")] - #[test_case(r#"{"extrinsic":"dHJhbnNhY3Rpb24K"}"# ; "No errors in case of submitted extrinsic")] - #[tokio::test] - async fn submit_route_extrinsic(body: &str) { - let route = super::submit_route(Some(Arc::new(MockSubmitter {}))); - let response = warp::test::request() - .method("POST") - .path("/v2/submit") - .body(body) - .reply(&route) - .await; - assert_eq!(response.status(), StatusCode::OK); - let response: SubmitResponse = serde_json::from_slice(response.body()).unwrap(); - let _ = serde_json::to_string(&response).unwrap(); - } - - #[tokio::test] - async fn subscriptions_route() { - let clients = WsClients::default(); - let route = super::subscriptions_route(clients.clone()); - - let body = r#"{"topics":["confidence-achieved","data-verified","header-verified"],"data_fields":["data","extrinsic"]}"#; - let response = warp::test::request() - .method("POST") - .body(body) - .path("/v2/subscriptions") - .reply(&route) - .await; - - let SubscriptionId { subscription_id } = serde_json::from_slice(response.body()).unwrap(); - assert!(uuid::Uuid::from_str(&subscription_id).is_ok()); - - let clients = clients.0.read().await; - let client = clients.get(&subscription_id).unwrap(); - - let expected = Subscription { - topics: all_topics(), - data_fields: all_data_fields(), - }; - assert!(client.subscription == expected); - } - - struct MockSetup { - ws_client: warp::test::WsClient, - db: MemoryDB, - } - - impl MockSetup { - async fn new(config: SharedConfig, submitter: Option) -> Self { - let client_uuid = uuid::Uuid::new_v4().to_string(); - let clients = WsClients::default(); - clients - .subscribe(&client_uuid, Subscription::default()) - .await; - - let db = MemoryDB::default(); - - let node = Node { - host: "host".to_string(), - system_version: "nv1.0.0".to_string(), - spec_version: 0, - genesis_hash: H256::zero(), - }; - - db.put(RpcNodeKey, node); - - let route = super::ws_route( - clients.clone(), - "v1.0.0".to_string(), - config.clone(), - submitter.map(Arc::new), - db.clone(), - ); - let ws_client = warp::test::ws() - .path(&format!("/v2/ws/{client_uuid}")) - .handshake(route) - .await - .expect("handshake"); - - MockSetup { ws_client, db } - } - - async fn ws_send_text(&mut self, message: &str) -> String { - self.ws_client.send_text(message).await; - let message = self.ws_client.recv().await.unwrap(); - message.to_str().unwrap().to_string() - } - } - - #[tokio::test] - async fn ws_route_version() { - let mut test = MockSetup::new(SharedConfig::default(), None).await; - let request = r#"{"type":"version","request_id":"cae63fff-c4b8-4af9-b4fe-0605a5329aa0"}"#; - let response = test.ws_send_text(request).await; - assert_eq!( - r#"{"topic":"version","request_id":"cae63fff-c4b8-4af9-b4fe-0605a5329aa0","message":{"version":"v1.0.0","network_version":"nv1.0.0"}}"#, - response - ); - } - - #[tokio::test] - async fn ws_route_status() { - let config = SharedConfig { - app_id: Some(1), - sync_start_block: Some(10), - ..Default::default() - }; - - let mut test = MockSetup::new(config, None).await; - - test.db.put(LatestHeaderKey, 30); - test.db.put(IsSyncedKey, false); - - let mut achieved_confidence = BlockRange::init(20); - achieved_confidence.last = 29; - test.db.put(AchievedConfidenceKey, achieved_confidence); - - let mut verified_sync_data = BlockRange::init(10); - verified_sync_data.last = 18; - test.db.put(VerifiedSyncDataKey, verified_sync_data); - - let mut verified_data = BlockRange::init(20); - verified_data.last = 29; - test.db.put(VerifiedDataKey, verified_data); - - let mut achieved_sync_confidence = BlockRange::init(10); - achieved_sync_confidence.last = 19; - test.db - .put(AchievedSyncConfidenceKey, achieved_sync_confidence); - - let gen_hash = H256::default(); - let expected = format!( - r#"{{"topic":"status","request_id":"363c71fc-90f7-4276-a5b6-bec688bf01e2","message":{{"modes":["light","app"],"app_id":1,"genesis_hash":"{:x?}","network":"host/nv1.0.0/0","blocks":{{"latest":30,"available":{{"first":20,"last":29}},"app_data":{{"first":20,"last":29}},"historical_sync":{{"synced":false,"available":{{"first":10,"last":19}},"app_data":{{"first":10,"last":18}}}}}}}}}}"#, - gen_hash - ); - - let status_request = - r#"{"type":"status","request_id":"363c71fc-90f7-4276-a5b6-bec688bf01e2"}"#; - assert_eq!(expected, test.ws_send_text(status_request).await); - } - - #[test_case("", "Failed to parse request" ; "Empty request")] - #[test_case("abcd", "Failed to parse request" ; "Invalid json")] - #[test_case("{}", "Failed to parse request" ; "Empty json")] - #[test_case(r#"{"type":"unknown","request_id":"11043443-7e4c-4485-a21c-304b457b6cc7","message":""}"#, "Failed to parse request: Cannot parse json" ; "Wrong request type")] - #[tokio::test] - async fn ws_route_bad_request(request: &str, expected: &str) { - let mut test = MockSetup::new(SharedConfig::default(), None).await; - let response = test.ws_send_text(request).await; - assert!(response.contains(expected)); - } - - fn to_uuid(uuid: &str) -> Uuid { - Uuid::try_parse(uuid).unwrap() - } - - #[test_case(r#"{"type":"submit","request_id":"16b24956-2e01-4ba8-bad5-456c561c87d7","message":{"data":""}}"#, false, Some("16b24956-2e01-4ba8-bad5-456c561c87d7"), "Submit is not configured" ; "No submitter")] - #[test_case(r#"{"type":"submit","request_id":"36bc1f28-e093-422f-964b-1cb1b3882baf","message":{"extrinsic":""}}"#, true, Some("36bc1f28-e093-422f-964b-1cb1b3882baf"), "Transaction is empty" ; "Empty extrinsic")] - #[test_case(r#"{"type":"submit","request_id":"cc60b2f3-d9ff-4c73-9632-d21d07f7b620","message":{"data":""}}"#, true, Some("cc60b2f3-d9ff-4c73-9632-d21d07f7b620"), "Transaction is empty" ; "Empty data")] - #[test_case(r#"{"type":"submit","request_id":"9181df86-22f0-42a1-a965-60adb9fc6bdc","message":{"extrinsic":"bad"}}"#, true, None, "Failed to parse request" ; "Bad extrinsic")] - #[test_case(r#"{"type":"submit","request_id":"78cd7b7b-ba70-48e9-a1da-96b370db4d8f","message":{"data":"bad"}}"#, true, None, "Failed to parse request" ; "Bad data")] - #[tokio::test] - async fn ws_route_submit_bad_requests( - request: &str, - submitter: bool, - expected_request_id: Option<&str>, - expected: &str, - ) { - let submitter = submitter.then_some(MockSubmitter {}); - let expected_request_id = expected_request_id.map(to_uuid); - let mut test = MockSetup::new(SharedConfig::default(), submitter).await; - let response = test.ws_send_text(request).await; - let WsError::Error(error) = serde_json::from_str(&response).unwrap(); - assert_eq!(error.error_code, ErrorCode::BadRequest); - assert_eq!(error.request_id, expected_request_id); - assert!(error.message.contains(expected)); - } - - #[tokio::test] - async fn ws_route_submit_data() { - let submitter = Some(MockSubmitter {}); - let mut test = MockSetup::new(SharedConfig::default(), submitter).await; - - let request = r#"{"type":"submit","request_id":"fca2ff0c-7a26-42a2-a6f0-d0aeeaba8a9a","message":{"data":"dHJhbnNhY3Rpb24K"}}"#; - let response = test.ws_send_text(request).await; - - let WsResponse::DataTransactionSubmitted(response) = - serde_json::from_str(&response).unwrap() - else { - panic!("Invalid response"); - }; - let expected_request_id = to_uuid("fca2ff0c-7a26-42a2-a6f0-d0aeeaba8a9a"); - assert_eq!(response.request_id, expected_request_id); - assert_eq!(response.message.index, 0); - } - - #[tokio::test] - async fn ws_route_submit_extrinsic() { - let submitter = Some(MockSubmitter {}); - let mut test = MockSetup::new(SharedConfig::default(), submitter).await; - - let request = r#"{"type":"submit","request_id":"fca2ff0c-7a26-42a2-a6f0-d0aeeaba8a9a","message":{"extrinsic":"dHJhbnNhY3Rpb24K"}}"#; - let response = test.ws_send_text(request).await; - - let WsResponse::DataTransactionSubmitted(response) = - serde_json::from_str(&response).unwrap() - else { - panic!("Invalid response"); - }; - let expected_request_id = to_uuid("fca2ff0c-7a26-42a2-a6f0-d0aeeaba8a9a"); - assert_eq!(response.request_id, expected_request_id); - assert_eq!(response.message.index, 0); - } -} diff --git a/core/src/api/v2/routes.rs b/core/src/api/v2/routes.rs new file mode 100644 index 000000000..f8d5604a2 --- /dev/null +++ b/core/src/api/v2/routes.rs @@ -0,0 +1,774 @@ +use super::{handlers, transactions}; +use crate::{ + api::{ + configuration::SharedConfig, + server::log_internal_server_error, + types::{DataQuery, WsClients}, + }, + data::Database, +}; +use std::{convert::Infallible, sync::Arc}; +use warp::{Filter, Rejection, Reply}; + +async fn optionally(value: Option) -> Result { + match value { + Some(value) => Ok(value), + None => Err(warp::reject::not_found()), + } +} + +fn with_db( + db: T, +) -> impl Filter + Clone { + warp::any().map(move || db.clone()) +} + +fn with_ws_clients( + clients: WsClients, +) -> impl Filter + Clone { + warp::any().map(move || clients.clone()) +} + +pub fn version_route( + version: String, + db: impl Database + Clone + Send, +) -> impl Filter + Clone { + warp::path!("v2" / "version") + .and(warp::get()) + .map(move || version.clone()) + .and(with_db(db)) + .map(handlers::version) +} + +pub fn status_route( + config: SharedConfig, + db: impl Database + Clone + Send, +) -> impl Filter + Clone { + warp::path!("v2" / "status") + .and(warp::get()) + .and(warp::any().map(move || config.clone())) + .and(with_db(db)) + .map(handlers::status) +} + +pub fn block_route( + config: SharedConfig, + db: impl Database + Clone + Send, +) -> impl Filter + Clone { + warp::path!("v2" / "blocks" / u32) + .and(warp::get()) + .and(warp::any().map(move || config.clone())) + .and(with_db(db)) + .then(handlers::block) + .map(log_internal_server_error) +} + +pub fn block_header_route( + config: SharedConfig, + db: impl Database + Clone + Send, +) -> impl Filter + Clone { + warp::path!("v2" / "blocks" / u32 / "header") + .and(warp::get()) + .and(warp::any().map(move || config.clone())) + .and(with_db(db)) + .then(handlers::block_header) + .map(log_internal_server_error) +} + +pub fn block_data_route( + config: SharedConfig, + db: impl Database + Clone + Send, +) -> impl Filter + Clone { + warp::path!("v2" / "blocks" / u32 / "data") + .and(warp::get()) + .and(warp::query::()) + .and(warp::any().map(move || config.clone())) + .and(with_db(db)) + .then(handlers::block_data) + .map(log_internal_server_error) +} + +pub fn submit_route( + submitter: Option>, +) -> impl Filter + Clone { + warp::path!("v2" / "submit") + .and(warp::post()) + .and_then(move || optionally(submitter.clone())) + .and(warp::body::json()) + .then(handlers::submit) + .map(log_internal_server_error) +} + +pub fn subscriptions_route( + clients: WsClients, +) -> impl Filter + Clone { + warp::path!("v2" / "subscriptions") + .and(warp::post()) + .and(warp::body::json()) + .and(with_ws_clients(clients)) + .and_then(handlers::subscriptions) +} + +pub fn ws_route( + clients: WsClients, + version: String, + config: SharedConfig, + submitter: Option>, + db: impl Database + Clone + Send + 'static, +) -> impl Filter + Clone { + warp::path!("v2" / "ws" / String) + .and(warp::ws()) + .and(with_ws_clients(clients)) + .and(warp::any().map(move || version.clone())) + .and(warp::any().map(move || config.clone())) + .and(warp::any().map(move || submitter.clone())) + .and(with_db(db)) + .and_then(handlers::ws) +} + +#[cfg(test)] +mod tests { + use crate::{ + api::{ + configuration::SharedConfig, + types::{ + DataField, ErrorCode, SubmitResponse, Subscription, SubscriptionId, Topic, + Transaction, WsClients, WsError, WsResponse, + }, + v2::transactions, + }, + data::{ + self, AchievedConfidenceKey, AchievedSyncConfidenceKey, AppDataKey, BlockHeaderKey, + Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, MemoryDB, RpcNodeKey, + VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, VerifiedSyncDataKey, + }, + network::rpc::Node, + types::BlockRange, + }; + use async_trait::async_trait; + use avail_rust::{ + avail::runtime_types::avail_core::{ + data_lookup::compact::{CompactDataLookup, DataLookupItem}, + header::extension::{v3, HeaderExtension}, + kate_commitment::v3::KateCommitment, + AppId, + }, + subxt::config::substrate::Digest, + AvailHeader, H256, + }; + use hyper::StatusCode; + use std::{collections::HashSet, str::FromStr, sync::Arc}; + use test_case::test_case; + use uuid::Uuid; + + fn default_node() -> Node { + Node { + host: "host".to_string(), + system_version: "nv1.0.0".to_string(), + spec_version: 0, + genesis_hash: H256::zero(), + } + } + + #[tokio::test] + async fn version_route() { + let db = MemoryDB::default(); + db.put(RpcNodeKey, default_node()); + let route = super::version_route("v1.0.0".to_string(), db); + let response = warp::test::request() + .method("GET") + .path("/v2/version") + .reply(&route) + .await; + + assert_eq!( + response.body(), + r#"{"version":"v1.0.0","network_version":"nv1.0.0"}"# + ); + } + + #[tokio::test] + async fn status_route_defaults() { + let db = MemoryDB::default(); + db.put(RpcNodeKey, default_node()); + let route = super::status_route(SharedConfig::default(), db); + let response = warp::test::request() + .method("GET") + .path("/v2/status") + .reply(&route) + .await; + + let gen_hash = H256::default(); + let expected = format!( + r#"{{"modes":["light"],"genesis_hash":"{:x?}","network":"host/nv1.0.0/0","blocks":{{"latest":0}}}}"#, + gen_hash + ); + assert_eq!(response.body(), &expected); + } + + #[tokio::test] + async fn status_route() { + let runtime_config = SharedConfig { + app_id: Some(1), + sync_start_block: Some(10), + ..Default::default() + }; + let db = MemoryDB::default(); + db.put(RpcNodeKey, default_node()); + + db.put(IsSyncedKey, false); + db.put(LatestHeaderKey, 30); + + let mut achieved_confidence = BlockRange::init(20); + achieved_confidence.last = 29; + db.put(AchievedConfidenceKey, achieved_confidence); + + let mut verified_sync_data = BlockRange::init(10); + verified_sync_data.last = 18; + db.put(VerifiedSyncDataKey, verified_sync_data); + + let mut verified_data = BlockRange::init(20); + verified_data.last = 29; + db.put(VerifiedDataKey, verified_data.clone()); + + let mut achieved_sync_confidence = BlockRange::init(10); + achieved_sync_confidence.last = 19; + db.put(AchievedSyncConfidenceKey, achieved_sync_confidence); + + let route = super::status_route(runtime_config, db); + let response = warp::test::request() + .method("GET") + .path("/v2/status") + .reply(&route) + .await; + + let gen_hash = H256::default(); + let expected = format!( + r#"{{"modes":["light","app"],"app_id":1,"genesis_hash":"{:#x}","network":"host/nv1.0.0/0","blocks":{{"latest":30,"available":{{"first":20,"last":29}},"app_data":{{"first":20,"last":29}},"historical_sync":{{"synced":false,"available":{{"first":10,"last":19}},"app_data":{{"first":10,"last":18}}}}}}}}"#, + gen_hash + ); + assert_eq!(response.body(), &expected); + } + + #[test_case(1, 2)] + #[test_case(10, 11)] + #[test_case(10, 20)] + #[tokio::test] + async fn block_route_not_found(latest: u32, block_number: u32) { + let config = SharedConfig::default(); + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, latest); + let route = super::block_route(config, db); + let response = warp::test::request() + .method("GET") + .path(&format!("/v2/blocks/{block_number}")) + .reply(&route) + .await; + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn block_route_incomplete() { + let config = SharedConfig::default(); + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, 10); + db.put(VerifiedHeaderKey, BlockRange::init(10)); + db.put(VerifiedDataKey, BlockRange::init(10)); + db.put(BlockHeaderKey(10), incomplete_header()); + let route = super::block_route(config, db); + let response = warp::test::request() + .method("GET") + .path("/v2/blocks/10") + .reply(&route) + .await; + + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.body(), + r#"{"status":"incomplete","confidence":null}"# + ); + } + + #[tokio::test] + async fn block_route_finished() { + let config = SharedConfig::default(); + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, 10); + db.put(VerifiedHeaderKey, BlockRange::init(10)); + db.put(VerifiedDataKey, BlockRange::init(10)); + db.put(VerifiedCellCountKey(10), 4); + db.put(BlockHeaderKey(10), header()); + let route = super::block_route(config, db); + let response = warp::test::request() + .method("GET") + .path("/v2/blocks/10") + .reply(&route) + .await; + + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.body(), + r#"{"status":"finished","confidence":93.75}"# + ); + } + + #[test_case(0, r#"Block header is not available"# ; "Block is unavailable")] + #[test_case(6, r#"Block header is not available"# ; "Block is pending")] + #[test_case(10, r#"Block header is not available"# ; "Block is in verifying-header state")] + #[tokio::test] + async fn block_header_route_bad_request(block_number: u32, expected: &str) { + let config = SharedConfig { + sync_start_block: Some(1), + ..Default::default() + }; + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, 10); + db.put(VerifiedHeaderKey, BlockRange::init(9)); + db.put(LatestSyncKey, 5); + db.put(BlockHeaderKey(block_number), header()); + let route = super::block_header_route(config, db); + let response = warp::test::request() + .method("GET") + .path(&format!("/v2/blocks/{block_number}/header")) + .reply(&route) + .await; + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + assert_eq!(response.body(), expected); + } + + #[tokio::test] + async fn block_header_route_not_found() { + let config = SharedConfig::default(); + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, 10); + let route = super::block_header_route(config, db); + let response = warp::test::request() + .method("GET") + .path("/v2/blocks/11/header") + .reply(&route) + .await; + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + fn header() -> AvailHeader { + AvailHeader { + parent_hash: H256::default(), + number: 1, + state_root: H256::default(), + extrinsics_root: H256::default(), + extension: HeaderExtension::V3(v3::HeaderExtension { + commitment: KateCommitment::default(), + app_lookup: CompactDataLookup { + size: 1, + index: vec![], + }, + }), + digest: Digest { logs: vec![] }, + } + } + + fn incomplete_header() -> AvailHeader { + AvailHeader { + parent_hash: H256::default(), + number: 1, + state_root: H256::default(), + extrinsics_root: H256::default(), + extension: HeaderExtension::V3(v3::HeaderExtension { + commitment: KateCommitment::default(), + app_lookup: CompactDataLookup { + size: 0, + index: vec![DataLookupItem { + app_id: AppId(0), + start: 0, + }], + }, + }), + digest: Digest { logs: vec![] }, + } + } + + #[tokio::test] + async fn block_header_route_ok() { + let config = SharedConfig::default(); + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, 1); + db.put(VerifiedHeaderKey, BlockRange::init(1)); + db.put(BlockHeaderKey(1), header()); + let route = super::block_header_route(config, db); + let response = warp::test::request() + .method("GET") + .path("/v2/blocks/1/header") + .reply(&route) + .await; + assert_eq!( + response.body(), + r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]}}"# + ); + } + + #[test_case(0, r#"Block data is not available"# ; "Block is unavailable")] + #[test_case(6, r#"Block data is not available"# ; "Block is pending")] + #[test_case(8, r#"Block data is not available"# ; "Block is in verifying-data state")] + #[test_case(9, r#"Block data is not available"# ; "Block is in verifying-confidence state")] + #[test_case(10, r#"Block data is not available"# ; "Block is in verifying-header state")] + #[tokio::test] + async fn block_data_route_bad_request(block_number: u32, expected: &str) { + let config = SharedConfig { + app_id: Some(1), + sync_start_block: Some(1), + ..Default::default() + }; + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, 10); + db.put(VerifiedHeaderKey, BlockRange::init(10)); + db.put(AchievedConfidenceKey, BlockRange::init(9)); + db.put(VerifiedDataKey, BlockRange::init(8)); + db.put(LatestSyncKey, 5); + db.put(BlockHeaderKey(block_number), header()); + let route = super::block_data_route(config, db); + let response = warp::test::request() + .method("GET") + .path(&format!("/v2/blocks/{block_number}/data")) + .reply(&route) + .await; + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + assert_eq!(response.body(), expected); + } + + #[tokio::test] + async fn block_data_route_not_found() { + let config = SharedConfig::default(); + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, 10); + let route = super::block_data_route(config, db); + let response = warp::test::request() + .method("GET") + .path("/v2/blocks/11/data") + .reply(&route) + .await; + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn block_data_route_ok_empty() { + let config = SharedConfig { + app_id: Some(1), + ..Default::default() + }; + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, 10); + db.put(VerifiedHeaderKey, BlockRange::init(5)); + db.put(AchievedConfidenceKey, BlockRange::init(5)); + db.put(VerifiedDataKey, BlockRange::init(5)); + db.put(BlockHeaderKey(5), header()); + let route = super::block_data_route(config, db); + let response = warp::test::request() + .method("GET") + .path("/v2/blocks/5/data") + .reply(&route) + .await; + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.body(), + r#"{"block_number":5,"data_transactions":[]}"# + ); + } + + #[tokio::test] + async fn block_data_route_ok() { + let config = SharedConfig { + app_id: Some(1), + ..Default::default() + }; + let db = data::MemoryDB::default(); + db.put(LatestHeaderKey, 10); + db.put(VerifiedHeaderKey, BlockRange::init(5)); + db.put(AchievedConfidenceKey, BlockRange::init(5)); + db.put(VerifiedDataKey, BlockRange::init(5)); + db.put( + AppDataKey(1, 5), + vec![vec![ + 189, 1, 132, 0, 212, 53, 147, 199, 21, 253, 211, 28, 97, 20, 26, 189, 4, 169, 159, + 214, 130, 44, 133, 88, 133, 76, 205, 227, 154, 86, 132, 231, 165, 109, 162, 125, 1, + 50, 12, 43, 176, 19, 42, 23, 73, 70, 223, 198, 180, 103, 34, 60, 246, 184, 49, 140, + 113, 174, 234, 229, 95, 71, 18, 92, 158, 185, 168, 140, 126, 12, 191, 156, 50, 234, + 8, 4, 68, 137, 5, 156, 94, 209, 7, 169, 105, 62, 63, 1, 122, 253, 195, 112, 173, + 239, 21, 73, 163, 240, 106, 109, 131, 0, 4, 0, 4, 29, 1, 20, 116, 101, 115, 116, + 10, + ]], + ); + db.put(BlockHeaderKey(5), header()); + let route = super::block_data_route(config, db); + let response = warp::test::request() + .method("GET") + .path("/v2/blocks/5/data") + .reply(&route) + .await; + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.body(), + r#"{"block_number":5,"data_transactions":[{"data":"dGVzdAo=","extrinsic":"vQGEANQ1k8cV/dMcYRQavQSpn9aCLIVYhUzN45pWhOelbaJ9ATIMK7ATKhdJRt/GtGciPPa4MYxxrurlX0cSXJ65qIx+DL+cMuoIBESJBZxe0QepaT4/AXr9w3Ct7xVJo/BqbYMABAAEHQEUdGVzdAo="}]}"# + ); + } + + fn all_topics() -> HashSet { + vec![ + Topic::HeaderVerified, + Topic::ConfidenceAchieved, + Topic::DataVerified, + ] + .into_iter() + .collect() + } + + fn all_data_fields() -> HashSet { + vec![DataField::Extrinsic, DataField::Data] + .into_iter() + .collect() + } + + #[derive(Clone)] + struct MockSubmitter {} + + #[async_trait] + impl transactions::Submit for MockSubmitter { + async fn submit(&self, _: Transaction) -> color_eyre::Result { + Ok(SubmitResponse { + block_number: 0, + block_hash: H256::random(), + hash: H256::random(), + index: 0, + }) + } + } + + #[test_case(r#"{"raw":""}"#, b"Request body deserialize error: unknown variant `raw`" ; "Invalid json schema")] + #[test_case(r#"{"data":"dHJhbnooNhY3Rpb24:"}"#, b"Request body deserialize error: Invalid byte" ; "Invalid base64 value")] + #[tokio::test] + async fn submit_route_bad_request(json: &str, message: &[u8]) { + let route = super::submit_route(Some(Arc::new(MockSubmitter {}))); + let response = warp::test::request() + .method("POST") + .path("/v2/submit") + .body(json) + .reply(&route) + .await; + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + assert!(response.body().starts_with(message)); + } + + #[test_case(r#"{"data":"dHJhbnNhY3Rpb24K"}"# ; "No errors in case of submitted data")] + #[test_case(r#"{"extrinsic":"dHJhbnNhY3Rpb24K"}"# ; "No errors in case of submitted extrinsic")] + #[tokio::test] + async fn submit_route_extrinsic(body: &str) { + let route = super::submit_route(Some(Arc::new(MockSubmitter {}))); + let response = warp::test::request() + .method("POST") + .path("/v2/submit") + .body(body) + .reply(&route) + .await; + assert_eq!(response.status(), StatusCode::OK); + let response: SubmitResponse = serde_json::from_slice(response.body()).unwrap(); + let _ = serde_json::to_string(&response).unwrap(); + } + + #[tokio::test] + async fn subscriptions_route() { + let clients = WsClients::default(); + let route = super::subscriptions_route(clients.clone()); + + let body = r#"{"topics":["confidence-achieved","data-verified","header-verified"],"data_fields":["data","extrinsic"]}"#; + let response = warp::test::request() + .method("POST") + .body(body) + .path("/v2/subscriptions") + .reply(&route) + .await; + + let SubscriptionId { subscription_id } = serde_json::from_slice(response.body()).unwrap(); + assert!(uuid::Uuid::from_str(&subscription_id).is_ok()); + + let clients = clients.0.read().await; + let client = clients.get(&subscription_id).unwrap(); + + let expected = Subscription { + topics: all_topics(), + data_fields: all_data_fields(), + }; + assert!(client.subscription == expected); + } + + struct MockSetup { + ws_client: warp::test::WsClient, + db: MemoryDB, + } + + impl MockSetup { + async fn new(config: SharedConfig, submitter: Option) -> Self { + let client_uuid = uuid::Uuid::new_v4().to_string(); + let clients = WsClients::default(); + clients + .subscribe(&client_uuid, Subscription::default()) + .await; + + let db = MemoryDB::default(); + + let node = Node { + host: "host".to_string(), + system_version: "nv1.0.0".to_string(), + spec_version: 0, + genesis_hash: H256::zero(), + }; + + db.put(RpcNodeKey, node); + + let route = super::ws_route( + clients.clone(), + "v1.0.0".to_string(), + config.clone(), + submitter.map(Arc::new), + db.clone(), + ); + let ws_client = warp::test::ws() + .path(&format!("/v2/ws/{client_uuid}")) + .handshake(route) + .await + .expect("handshake"); + + MockSetup { ws_client, db } + } + + async fn ws_send_text(&mut self, message: &str) -> String { + self.ws_client.send_text(message).await; + let message = self.ws_client.recv().await.unwrap(); + message.to_str().unwrap().to_string() + } + } + + #[tokio::test] + async fn ws_route_version() { + let mut test = MockSetup::new(SharedConfig::default(), None).await; + let request = r#"{"type":"version","request_id":"cae63fff-c4b8-4af9-b4fe-0605a5329aa0"}"#; + let response = test.ws_send_text(request).await; + assert_eq!( + r#"{"topic":"version","request_id":"cae63fff-c4b8-4af9-b4fe-0605a5329aa0","message":{"version":"v1.0.0","network_version":"nv1.0.0"}}"#, + response + ); + } + + #[tokio::test] + async fn ws_route_status() { + let config = SharedConfig { + app_id: Some(1), + sync_start_block: Some(10), + ..Default::default() + }; + + let mut test = MockSetup::new(config, None).await; + + test.db.put(LatestHeaderKey, 30); + test.db.put(IsSyncedKey, false); + + let mut achieved_confidence = BlockRange::init(20); + achieved_confidence.last = 29; + test.db.put(AchievedConfidenceKey, achieved_confidence); + + let mut verified_sync_data = BlockRange::init(10); + verified_sync_data.last = 18; + test.db.put(VerifiedSyncDataKey, verified_sync_data); + + let mut verified_data = BlockRange::init(20); + verified_data.last = 29; + test.db.put(VerifiedDataKey, verified_data); + + let mut achieved_sync_confidence = BlockRange::init(10); + achieved_sync_confidence.last = 19; + test.db + .put(AchievedSyncConfidenceKey, achieved_sync_confidence); + + let gen_hash = H256::default(); + let expected = format!( + r#"{{"topic":"status","request_id":"363c71fc-90f7-4276-a5b6-bec688bf01e2","message":{{"modes":["light","app"],"app_id":1,"genesis_hash":"{:x?}","network":"host/nv1.0.0/0","blocks":{{"latest":30,"available":{{"first":20,"last":29}},"app_data":{{"first":20,"last":29}},"historical_sync":{{"synced":false,"available":{{"first":10,"last":19}},"app_data":{{"first":10,"last":18}}}}}}}}}}"#, + gen_hash + ); + + let status_request = + r#"{"type":"status","request_id":"363c71fc-90f7-4276-a5b6-bec688bf01e2"}"#; + assert_eq!(expected, test.ws_send_text(status_request).await); + } + + #[test_case("", "Failed to parse request" ; "Empty request")] + #[test_case("abcd", "Failed to parse request" ; "Invalid json")] + #[test_case("{}", "Failed to parse request" ; "Empty json")] + #[test_case(r#"{"type":"unknown","request_id":"11043443-7e4c-4485-a21c-304b457b6cc7","message":""}"#, "Failed to parse request: Cannot parse json" ; "Wrong request type")] + #[tokio::test] + async fn ws_route_bad_request(request: &str, expected: &str) { + let mut test = MockSetup::new(SharedConfig::default(), None).await; + let response = test.ws_send_text(request).await; + assert!(response.contains(expected)); + } + + fn to_uuid(uuid: &str) -> Uuid { + Uuid::try_parse(uuid).unwrap() + } + + #[test_case(r#"{"type":"submit","request_id":"16b24956-2e01-4ba8-bad5-456c561c87d7","message":{"data":""}}"#, false, Some("16b24956-2e01-4ba8-bad5-456c561c87d7"), "Submit is not configured" ; "No submitter")] + #[test_case(r#"{"type":"submit","request_id":"36bc1f28-e093-422f-964b-1cb1b3882baf","message":{"extrinsic":""}}"#, true, Some("36bc1f28-e093-422f-964b-1cb1b3882baf"), "Transaction is empty" ; "Empty extrinsic")] + #[test_case(r#"{"type":"submit","request_id":"cc60b2f3-d9ff-4c73-9632-d21d07f7b620","message":{"data":""}}"#, true, Some("cc60b2f3-d9ff-4c73-9632-d21d07f7b620"), "Transaction is empty" ; "Empty data")] + #[test_case(r#"{"type":"submit","request_id":"9181df86-22f0-42a1-a965-60adb9fc6bdc","message":{"extrinsic":"bad"}}"#, true, None, "Failed to parse request" ; "Bad extrinsic")] + #[test_case(r#"{"type":"submit","request_id":"78cd7b7b-ba70-48e9-a1da-96b370db4d8f","message":{"data":"bad"}}"#, true, None, "Failed to parse request" ; "Bad data")] + #[tokio::test] + async fn ws_route_submit_bad_requests( + request: &str, + submitter: bool, + expected_request_id: Option<&str>, + expected: &str, + ) { + let submitter = submitter.then_some(MockSubmitter {}); + let expected_request_id = expected_request_id.map(to_uuid); + let mut test = MockSetup::new(SharedConfig::default(), submitter).await; + let response = test.ws_send_text(request).await; + let WsError::Error(error) = serde_json::from_str(&response).unwrap(); + assert_eq!(error.error_code, ErrorCode::BadRequest); + assert_eq!(error.request_id, expected_request_id); + assert!(error.message.contains(expected)); + } + + #[tokio::test] + async fn ws_route_submit_data() { + let submitter = Some(MockSubmitter {}); + let mut test = MockSetup::new(SharedConfig::default(), submitter).await; + + let request = r#"{"type":"submit","request_id":"fca2ff0c-7a26-42a2-a6f0-d0aeeaba8a9a","message":{"data":"dHJhbnNhY3Rpb24K"}}"#; + let response = test.ws_send_text(request).await; + + let WsResponse::DataTransactionSubmitted(response) = + serde_json::from_str(&response).unwrap() + else { + panic!("Invalid response"); + }; + let expected_request_id = to_uuid("fca2ff0c-7a26-42a2-a6f0-d0aeeaba8a9a"); + assert_eq!(response.request_id, expected_request_id); + assert_eq!(response.message.index, 0); + } + + #[tokio::test] + async fn ws_route_submit_extrinsic() { + let submitter = Some(MockSubmitter {}); + let mut test = MockSetup::new(SharedConfig::default(), submitter).await; + + let request = r#"{"type":"submit","request_id":"fca2ff0c-7a26-42a2-a6f0-d0aeeaba8a9a","message":{"extrinsic":"dHJhbnNhY3Rpb24K"}}"#; + let response = test.ws_send_text(request).await; + + let WsResponse::DataTransactionSubmitted(response) = + serde_json::from_str(&response).unwrap() + else { + panic!("Invalid response"); + }; + let expected_request_id = to_uuid("fca2ff0c-7a26-42a2-a6f0-d0aeeaba8a9a"); + assert_eq!(response.request_id, expected_request_id); + assert_eq!(response.message.index, 0); + } +} diff --git a/core/src/api/v2/ws.rs b/core/src/api/v2/ws.rs index 9a59e3c3f..bc3b0e664 100644 --- a/core/src/api/v2/ws.rs +++ b/core/src/api/v2/ws.rs @@ -2,12 +2,10 @@ use super::transactions; use crate::{ api::{ configuration::SharedConfig, - types::{ - Error, Payload, Request, Response, Sender, Status, Version, WsClients, WsError, - WsResponse, - }, + types::{Error, Request, Sender, WsClients, WsError, WsResponse}, + v2::messages, }, - data::{Database, RpcNodeKey}, + data::Database, utils::spawn_in_span, }; use color_eyre::{eyre::WrapErr, Result}; @@ -94,32 +92,5 @@ async fn handle_request( Error::bad_request_unknown(&format!("Failed to parse request: {error}")) })?; - let request_id = request.request_id; - match request.payload { - Payload::Version => { - let version = Version { - version: version.to_string(), - network_version: db.get(RpcNodeKey).unwrap_or_default().system_version, - }; - Ok(Response::new(request_id, version).into()) - }, - Payload::Status => { - let status = Status::new(config, db); - Ok(Response::new(request_id, status).into()) - }, - Payload::Submit(transaction) => { - let Some(submitter) = submitter else { - return Err(Error::bad_request(request_id, "Submit is not configured.")); - }; - if transaction.is_empty() { - return Err(Error::bad_request(request_id, "Transaction is empty.")); - } - - submitter - .submit(transaction) - .await - .map(|response| Response::new(request_id, response).into()) - .map_err(Error::internal_server_error) - }, - } + messages::handle_request(request, version, config, submitter, db).await } diff --git a/core/src/lib.rs b/core/src/lib.rs index 0b79a0ea5..1a38ce253 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,4 +1,3 @@ -#[cfg(not(target_arch = "wasm32"))] pub mod api; #[cfg(not(target_arch = "wasm32"))] pub mod app_client;