Skip to content

Commit

Permalink
Add web sockets handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Jul 31, 2023
1 parent cf3c416 commit ac45f7f
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 23 deletions.
54 changes: 54 additions & 0 deletions src/api/v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,57 @@ Content-Type: application/json
### Data fields

Filters **data-verified** message. Optional parameter used when **raw** transaction data is needed. If omitted, only decoded **data** is present in the message.

## GET `/v2/ws/{subscription-id}`

Connects to Avail Light Client web socket. Multiple connections are currently allowed.

## Client-to-server messages

Every request should contain unique **request_id** field, used to correlate request with response.

### Request version

Request Avail Light Client version data.

```json
{
"type": "version",
"request_id": "{uuid}"
}
```
## Server-to-client messages

If response contains ******request_id****** field, it will be pushed to the client which initiated request. Those messages are not subject to a topic filtering at the moment.

### Version

Version response.

```json
{
"topic": "version",
"request_id": "{uuid}",
"message": {
"version": "{version-string}",
"network_version": "{version-string}"
}
}
```

### Errors

In case of errors, descriptive error message is sent:

```json
{
"topic": "error",
"request_id": "{uuid}", // Optional
"code": "{error-code}",
"message": "{descriptive-error-message}"
}
```

Error codes:

- **bad-request** - request sent via web socket message is not valid
28 changes: 19 additions & 9 deletions src/api/v2/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
use super::types::{Client, Clients, Subscription, SubscriptionId, Version};
use super::{
types::{Client, Clients, Subscription, SubscriptionId, Version},
ws,
};
use std::convert::Infallible;
use uuid::Uuid;

pub fn version(version: String, network_version: String) -> Version {
Version {
version,
network_version,
}
}
use warp::{ws::Ws, Rejection, Reply};

pub async fn subscriptions(
subscription: Subscription,
clients: Clients,
) -> Result<SubscriptionId, Infallible> {
let subscription_id = Uuid::new_v4().to_string();
let mut clients = clients.write().await;
clients.insert(subscription_id.clone(), Client { subscription });
clients.insert(subscription_id.clone(), Client::new(subscription));
Ok(SubscriptionId { subscription_id })
}

pub async fn ws(
subscription_id: String,
ws: Ws,
clients: Clients,
version: Version,
) -> Result<impl Reply, Rejection> {
if !clients.read().await.contains_key(&subscription_id) {
return Err(warp::reject::not_found());
}
// Multiple connections to the same client are currently allowed
Ok(ws.on_upgrade(move |web_socket| ws::connect(subscription_id, web_socket, clients, version)))
}
101 changes: 90 additions & 11 deletions src/api/v2/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
use self::types::Clients;
use self::types::{Clients, Version};
use std::{collections::HashMap, convert::Infallible, sync::Arc};
use tokio::sync::RwLock;
use warp::{Filter, Rejection, Reply};

mod handlers;
mod types;
mod ws;

fn with_clients(clients: Clients) -> impl Filter<Extract = (Clients,), Error = Infallible> + Clone {
warp::any().map(move || clients.clone())
}

fn version_route(
version: Version,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
warp::path!("v2" / "version")
.and(warp::get())
.map(move || version.clone())
}

fn subscriptions_route(
clients: Clients,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
Expand All @@ -20,38 +29,56 @@ fn subscriptions_route(
.and_then(handlers::subscriptions)
}

fn version_route(
version: String,
network_version: String,
fn ws_route(
clients: Clients,
version: Version,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
warp::path!("v2" / "version")
.and(warp::get())
warp::path!("v2" / "ws" / String)
.and(warp::ws())
.and(with_clients(clients))
.and(warp::any().map(move || version.clone()))
.and(warp::any().map(move || network_version.clone()))
.map(handlers::version)
.and_then(handlers::ws)
}

pub fn routes(
version: String,
network_version: String,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let clients: Clients = Arc::new(RwLock::new(HashMap::new()));
version_route(version, network_version).or(subscriptions_route(clients))
let version = Version {
version,
network_version,
};
version_route(version.clone())
.or(subscriptions_route(clients.clone()))
.or(ws_route(clients, version))
}

#[cfg(test)]
mod tests {
use crate::api::v2::types::{Clients, DataFields, Subscription, SubscriptionId, Topics};
use crate::api::v2::types::{
Clients, DataFields, Subscription, SubscriptionId, Topics, Version,
};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
};
use tokio::sync::RwLock;
use uuid::Uuid;

use super::types::Client;

fn v1() -> Version {
Version {
version: "v1.0.0".to_string(),
network_version: "nv1.0.0".to_string(),
}
}

#[tokio::test]
async fn version_route() {
let route = super::version_route("v1.0.0".to_string(), "nv1.0.0".to_string());
let route = super::version_route(v1());
let response = warp::test::request()
.method("GET")
.path("/v2/version")
Expand Down Expand Up @@ -104,4 +131,56 @@ mod tests {
};
assert!(client.subscription == expected);
}

async fn init_clients() -> (Uuid, Clients) {
let uuid = uuid::Uuid::new_v4();
let clients: Clients = Arc::new(RwLock::new(HashMap::new()));

let client = Client::new(Subscription {
topics: HashSet::new(),
data_fields: HashSet::new(),
});
clients.write().await.insert(uuid.to_string(), client);
(uuid, clients)
}

#[tokio::test]
async fn ws_route_version() {
let (uuid, clients) = init_clients().await;

let route = super::ws_route(clients.clone(), v1());
let mut client = warp::test::ws()
.path(&format!("/v2/ws/{uuid}"))
.handshake(route)
.await
.expect("handshake");

client
.send_text(r#"{"type":"version","request_id":"1"}"#)
.await;

let expected = r#"{"topic":"version","request_id":"1","message":{"version":"v1.0.0","network_version":"nv1.0.0"}}"#;
let message = client.recv().await.unwrap();
assert_eq!(expected, message.to_str().unwrap());
}

#[tokio::test]
async fn ws_route_bad_request() {
let (uuid, clients) = init_clients().await;

let route = super::ws_route(clients.clone(), v1());
let mut client = warp::test::ws()
.path(&format!("/v2/ws/{uuid}"))
.handshake(route)
.await
.expect("handshake");

client
.send_text(r#"{"type":"vers1on","request_id":"1"}"#)
.await;

let expected = r#"{"error_code":"bad-request","message":"Error handling web socket message: Failed to parse request"}"#;
let message = client.recv().await.unwrap();
assert_eq!(expected, message.to_str().unwrap())
}
}
71 changes: 68 additions & 3 deletions src/api/v2/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::RwLock;
use warp::Reply;
use tokio::sync::{mpsc::UnboundedSender, RwLock};
use warp::{ws, Reply};

#[derive(Serialize)]
#[derive(Serialize, Clone)]
pub struct Version {
pub version: String,
pub network_version: String,
Expand Down Expand Up @@ -39,8 +40,20 @@ pub struct Subscription {
pub data_fields: HashSet<DataFields>,
}

pub type Sender = UnboundedSender<Result<ws::Message, warp::Error>>;

pub struct Client {
pub subscription: Subscription,
pub sender: Option<Sender>,
}

impl Client {
pub fn new(subscription: Subscription) -> Self {
Client {
subscription,
sender: None,
}
}
}

pub type Clients = Arc<RwLock<HashMap<String, Client>>>;
Expand All @@ -55,3 +68,55 @@ impl Reply for SubscriptionId {
warp::reply::json(&self).into_response()
}
}

#[derive(Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum RequestType {
Version,
}

#[derive(Deserialize)]
pub struct Request {
#[serde(rename = "type")]
pub request_type: RequestType,
pub request_id: String,
}

#[derive(Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum ResponseTopic {
Version,
}

#[derive(Serialize)]
pub struct Response<T> {
pub topic: ResponseTopic,
pub request_id: String,
pub message: T,
}

impl TryFrom<ws::Message> for Request {
type Error = anyhow::Error;

fn try_from(value: ws::Message) -> Result<Self, Self::Error> {
serde_json::from_slice(value.as_bytes()).map_err(|error| anyhow!("{error}"))
}
}

#[derive(Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum ErrorCode {
BadRequest,
}

#[derive(Serialize)]
pub struct Error {
pub error_code: ErrorCode,
pub message: String,
}

impl From<Error> for String {
fn from(error: Error) -> Self {
serde_json::to_string(&error).expect("Error is serializable")
}
}
Loading

0 comments on commit ac45f7f

Please sign in to comment.