diff --git a/src/api/v2/README.md b/src/api/v2/README.md index 4234a515b..40db217a0 100644 --- a/src/api/v2/README.md +++ b/src/api/v2/README.md @@ -67,7 +67,7 @@ Content-Type: application/json } ``` -## Topics +### Topics - **header-verified** - header finality is verified and header is available - **confidence-achieved** - confidence is achieved @@ -76,3 +76,57 @@ Content-Type: application/json ### Data fields Filters **data-verified** message. Optional parameter used when **raw** transaction data is needed. If omitted, only decoded **data** is present in the message. + +## GET `/v2/ws/{subscription-id}` + +Connects to Avail Light Client web socket. Multiple connections are currently allowed. + +## Client-to-server messages + +Every request should contain unique **request_id** field, used to correlate request with response. + +### Request version + +Request Avail Light Client version data. + +```json +{ + "type": "version", + "request_id": "{uuid}" +} +``` +## Server-to-client messages + +If response contains ******request_id****** field, it will be pushed to the client which initiated request. Those messages are not subject to a topic filtering at the moment. + +### Version + +Version response. + +```json +{ + "topic": "version", + "request_id": "{uuid}", + "message": { + "version": "{version-string}", + "network_version": "{version-string}" + } +} +``` + +### Errors + +In case of errors, descriptive error message is sent: + +```json +{ + "topic": "error", + "request_id": "{uuid}", // Optional + "code": "{error-code}", + "message": "{descriptive-error-message}" +} +``` + +Error codes: + +- **bad-request** - request sent via web socket message is not valid diff --git a/src/api/v2/handlers.rs b/src/api/v2/handlers.rs index b1a47719d..207b6fd86 100644 --- a/src/api/v2/handlers.rs +++ b/src/api/v2/handlers.rs @@ -1,13 +1,10 @@ -use super::types::{Client, Clients, Subscription, SubscriptionId, Version}; +use super::{ + types::{Client, Clients, Subscription, SubscriptionId, Version}, + ws, +}; use std::convert::Infallible; use uuid::Uuid; - -pub fn version(version: String, network_version: String) -> Version { - Version { - version, - network_version, - } -} +use warp::{ws::Ws, Rejection, Reply}; pub async fn subscriptions( subscription: Subscription, @@ -15,6 +12,19 @@ pub async fn subscriptions( ) -> Result { let subscription_id = Uuid::new_v4().to_string(); let mut clients = clients.write().await; - clients.insert(subscription_id.clone(), Client { subscription }); + clients.insert(subscription_id.clone(), Client::new(subscription)); Ok(SubscriptionId { subscription_id }) } + +pub async fn ws( + subscription_id: String, + ws: Ws, + clients: Clients, + version: Version, +) -> Result { + if !clients.read().await.contains_key(&subscription_id) { + return Err(warp::reject::not_found()); + } + // Multiple connections to the same client are currently allowed + Ok(ws.on_upgrade(move |web_socket| ws::connect(subscription_id, web_socket, clients, version))) +} diff --git a/src/api/v2/mod.rs b/src/api/v2/mod.rs index db89da1c3..1f57e759c 100644 --- a/src/api/v2/mod.rs +++ b/src/api/v2/mod.rs @@ -1,15 +1,24 @@ -use self::types::Clients; +use self::types::{Clients, Version}; use std::{collections::HashMap, convert::Infallible, sync::Arc}; use tokio::sync::RwLock; use warp::{Filter, Rejection, Reply}; mod handlers; mod types; +mod ws; fn with_clients(clients: Clients) -> impl Filter + Clone { warp::any().map(move || clients.clone()) } +fn version_route( + version: Version, +) -> impl Filter + Clone { + warp::path!("v2" / "version") + .and(warp::get()) + .map(move || version.clone()) +} + fn subscriptions_route( clients: Clients, ) -> impl Filter + Clone { @@ -20,15 +29,15 @@ fn subscriptions_route( .and_then(handlers::subscriptions) } -fn version_route( - version: String, - network_version: String, +fn ws_route( + clients: Clients, + version: Version, ) -> impl Filter + Clone { - warp::path!("v2" / "version") - .and(warp::get()) + warp::path!("v2" / "ws" / String) + .and(warp::ws()) + .and(with_clients(clients)) .and(warp::any().map(move || version.clone())) - .and(warp::any().map(move || network_version.clone())) - .map(handlers::version) + .and_then(handlers::ws) } pub fn routes( @@ -36,22 +45,40 @@ pub fn routes( network_version: String, ) -> impl Filter + Clone { let clients: Clients = Arc::new(RwLock::new(HashMap::new())); - version_route(version, network_version).or(subscriptions_route(clients)) + let version = Version { + version, + network_version, + }; + version_route(version.clone()) + .or(subscriptions_route(clients.clone())) + .or(ws_route(clients, version)) } #[cfg(test)] mod tests { - use crate::api::v2::types::{Clients, DataFields, Subscription, SubscriptionId, Topics}; + use crate::api::v2::types::{ + Clients, DataFields, Subscription, SubscriptionId, Topics, Version, + }; use std::{ collections::{HashMap, HashSet}, str::FromStr, sync::Arc, }; use tokio::sync::RwLock; + use uuid::Uuid; + + use super::types::Client; + + fn v1() -> Version { + Version { + version: "v1.0.0".to_string(), + network_version: "nv1.0.0".to_string(), + } + } #[tokio::test] async fn version_route() { - let route = super::version_route("v1.0.0".to_string(), "nv1.0.0".to_string()); + let route = super::version_route(v1()); let response = warp::test::request() .method("GET") .path("/v2/version") @@ -104,4 +131,56 @@ mod tests { }; assert!(client.subscription == expected); } + + async fn init_clients() -> (Uuid, Clients) { + let uuid = uuid::Uuid::new_v4(); + let clients: Clients = Arc::new(RwLock::new(HashMap::new())); + + let client = Client::new(Subscription { + topics: HashSet::new(), + data_fields: HashSet::new(), + }); + clients.write().await.insert(uuid.to_string(), client); + (uuid, clients) + } + + #[tokio::test] + async fn ws_route_version() { + let (uuid, clients) = init_clients().await; + + let route = super::ws_route(clients.clone(), v1()); + let mut client = warp::test::ws() + .path(&format!("/v2/ws/{uuid}")) + .handshake(route) + .await + .expect("handshake"); + + client + .send_text(r#"{"type":"version","request_id":"1"}"#) + .await; + + let expected = r#"{"topic":"version","request_id":"1","message":{"version":"v1.0.0","network_version":"nv1.0.0"}}"#; + let message = client.recv().await.unwrap(); + assert_eq!(expected, message.to_str().unwrap()); + } + + #[tokio::test] + async fn ws_route_bad_request() { + let (uuid, clients) = init_clients().await; + + let route = super::ws_route(clients.clone(), v1()); + let mut client = warp::test::ws() + .path(&format!("/v2/ws/{uuid}")) + .handshake(route) + .await + .expect("handshake"); + + client + .send_text(r#"{"type":"vers1on","request_id":"1"}"#) + .await; + + let expected = r#"{"error_code":"bad-request","message":"Error handling web socket message: Failed to parse request"}"#; + let message = client.recv().await.unwrap(); + assert_eq!(expected, message.to_str().unwrap()) + } } diff --git a/src/api/v2/types.rs b/src/api/v2/types.rs index 6c22c786e..58b639723 100644 --- a/src/api/v2/types.rs +++ b/src/api/v2/types.rs @@ -1,12 +1,13 @@ +use anyhow::anyhow; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use tokio::sync::RwLock; -use warp::Reply; +use tokio::sync::{mpsc::UnboundedSender, RwLock}; +use warp::{ws, Reply}; -#[derive(Serialize)] +#[derive(Serialize, Clone)] pub struct Version { pub version: String, pub network_version: String, @@ -39,8 +40,20 @@ pub struct Subscription { pub data_fields: HashSet, } +pub type Sender = UnboundedSender>; + pub struct Client { pub subscription: Subscription, + pub sender: Option, +} + +impl Client { + pub fn new(subscription: Subscription) -> Self { + Client { + subscription, + sender: None, + } + } } pub type Clients = Arc>>; @@ -55,3 +68,55 @@ impl Reply for SubscriptionId { warp::reply::json(&self).into_response() } } + +#[derive(Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum RequestType { + Version, +} + +#[derive(Deserialize)] +pub struct Request { + #[serde(rename = "type")] + pub request_type: RequestType, + pub request_id: String, +} + +#[derive(Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum ResponseTopic { + Version, +} + +#[derive(Serialize)] +pub struct Response { + pub topic: ResponseTopic, + pub request_id: String, + pub message: T, +} + +impl TryFrom for Request { + type Error = anyhow::Error; + + fn try_from(value: ws::Message) -> Result { + serde_json::from_slice(value.as_bytes()).map_err(|error| anyhow!("{error}")) + } +} + +#[derive(Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum ErrorCode { + BadRequest, +} + +#[derive(Serialize)] +pub struct Error { + pub error_code: ErrorCode, + pub message: String, +} + +impl From for String { + fn from(error: Error) -> Self { + serde_json::to_string(&error).expect("Error is serializable") + } +} diff --git a/src/api/v2/ws.rs b/src/api/v2/ws.rs new file mode 100644 index 000000000..da8baa366 --- /dev/null +++ b/src/api/v2/ws.rs @@ -0,0 +1,82 @@ +use crate::api::v2::types::{Error, ErrorCode}; + +use super::types::{Clients, Request, RequestType, Response, ResponseTopic, Version}; +use anyhow::Context; +use futures::{FutureExt, StreamExt}; +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{error, info}; +use warp::ws::{self, Message, WebSocket}; + +pub async fn connect( + subscription_id: String, + web_socket: WebSocket, + clients: Clients, + version: Version, +) { + let (web_socket_sender, mut web_socket_receiver) = web_socket.split(); + let (sender, receiver) = mpsc::unbounded_channel(); + let receiver_stream = UnboundedReceiverStream::new(receiver); + + let mut clients = clients.write().await; + let Some(client) = clients.get_mut(&subscription_id) else { + info!("Client is not subscribed"); + return; + }; + client.sender = Some(sender.clone()); + + tokio::task::spawn(receiver_stream.forward(web_socket_sender).map(|result| { + if let Err(error) = result { + error!("Error sending web socket message: {error}"); + } + })); + + while let Some(result) = web_socket_receiver.next().await { + match result { + Ok(message) if !message.is_text() => return, + Ok(message) => { + if let Err(error) = handle_websocket_message(message, sender.clone(), &version) { + let error = Error { + error_code: ErrorCode::BadRequest, + message: format!("Error handling web socket message: {error}"), + }; + error!(error.message); + if let Err(error) = sender.send(Ok(Message::text::(error.into()))) { + error!("{error}"); + }; + break; + }; + }, + Err(error) => { + error!("Error receiving client message: {error}"); + break; + }, + } + } +} + +fn handle_websocket_message( + message: ws::Message, + sender: UnboundedSender>, + version: &Version, +) -> anyhow::Result<()> { + let request: Request = message.try_into().context("Failed to parse request")?; + + match request.request_type { + RequestType::Version => { + let response = Response { + topic: ResponseTopic::Version, + request_id: request.request_id, + message: version.clone(), + }; + + let message = serde_json::to_string(&response) + .map(ws::Message::text) + .map(Ok) + .context("Failed to serialize version")?; + + sender.send(message).context("Failed to send message")?; + }, + } + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index a90cfb1e1..2e504ef5e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -163,7 +163,7 @@ async fn run(error_sender: Sender) -> Result<()> { db: db.clone(), cfg: cfg.clone(), counter: counter.clone(), - version: clap::crate_version!().to_string(), + version: format!("v{}", clap::crate_version!().to_string()), network_version: network_version.to_string(), };