Skip to content

Commit

Permalink
Add ws_client types and functions (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
willbach committed Jan 10, 2024
1 parent 412fbfe commit a2d3e9e
Showing 1 changed file with 202 additions and 40 deletions.
242 changes: 202 additions & 40 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::vfs::{FileType, VfsAction, VfsRequest, VfsResponse};
use crate::{
get_blob, Address, LazyLoadBlob as uqBlob, Message, ProcessId, Request as uqRequest,
Response as uqResponse,
Response as uqResponse, SendError,
};
pub use http::*;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::path::Path;
use std::str::FromStr;
use thiserror::Error;

//
Expand Down Expand Up @@ -47,18 +48,6 @@ pub struct IncomingHttpRequest {
// BODY is stored in the lazy_load_blob, as bytes
}

/// HTTP Request type that can be shared over WASM boundary to apps.
/// This is the one you send to the `http_client:sys:nectar` service.
#[derive(Debug, Serialize, Deserialize)]
pub struct OutgoingHttpRequest {
pub method: String, // must parse to http::Method
pub version: Option<String>, // must parse to http::Version
pub url: String, // must parse to url::Url
pub headers: HashMap<String, String>,
// BODY is stored in the lazy_load_blob, as bytes
// TIMEOUT is stored in the message expect_response
}

/// HTTP Response type that can be shared over WASM boundary to apps.
/// Respond to [`IncomingHttpRequest`] with this type.
#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -68,20 +57,6 @@ pub struct HttpResponse {
// BODY is stored in the lazy_load_blob, as bytes
}

#[derive(Error, Debug, Serialize, Deserialize)]
pub enum HttpClientError {
#[error("http_client: request could not be parsed to HttpRequest: {}.", req)]
BadRequest { req: String },
#[error("http_client: http method not supported: {}", method)]
BadMethod { method: String },
#[error("http_client: url could not be parsed: {}", url)]
BadUrl { url: String },
#[error("http_client: http version not supported: {}", version)]
BadVersion { version: String },
#[error("http_client: failed to execute request {}", error)]
RequestFailed { error: String },
}

/// Request type sent to `http_server:sys:nectar` in order to configure it.
/// You can also send [`type@HttpServerAction::WebSocketPush`], which
/// allows you to push messages across an existing open WebSocket connection.
Expand Down Expand Up @@ -234,6 +209,81 @@ impl IncomingHttpRequest {
}
}

/// Request type that can be shared over WASM boundary to apps.
/// This is the one you send to the `http_client:sys:nectar` service.
#[derive(Debug, Serialize, Deserialize)]
pub enum HttpClientAction {
Http(OutgoingHttpRequest),
WebSocketOpen {
url: String,
headers: HashMap<String, String>,
channel_id: u32,
},
WebSocketPush {
channel_id: u32,
message_type: WsMessageType,
},
WebSocketClose {
channel_id: u32,
},
}

/// HTTP Request type that can be shared over WASM boundary to apps.
/// This is the one you send to the `http_client:sys:nectar` service.
#[derive(Debug, Serialize, Deserialize)]
pub struct OutgoingHttpRequest {
pub method: String, // must parse to http::Method
pub version: Option<String>, // must parse to http::Version
pub url: String, // must parse to url::Url
pub headers: HashMap<String, String>,
// BODY is stored in the lazy_load_blob, as bytes
// TIMEOUT is stored in the message expect_response
}

/// WebSocket Client Request type that can be shared over WASM boundary to apps.
/// This comes from an open websocket client connection in the `http_client:sys:nectar` service.
#[derive(Debug, Serialize, Deserialize)]
pub enum HttpClientRequest {
WebSocketPush {
channel_id: u32,
message_type: WsMessageType,
},
WebSocketClose {
channel_id: u32,
},
}

/// HTTP Client Response type that can be shared over WASM boundary to apps.
/// This is the one you receive from the `http_client:sys:nectar` service.
#[derive(Debug, Serialize, Deserialize)]
pub enum HttpClientResponse {
Http(HttpResponse),
WebSocketAck,
}

#[derive(Error, Debug, Serialize, Deserialize)]
pub enum HttpClientError {
// HTTP errors, may also be applicable to OutgoingWebSocketClientRequest::Open
#[error("http_client: request is not valid HttpClientRequest: {}.", req)]
BadRequest { req: String },
#[error("http_client: http method not supported: {}", method)]
BadMethod { method: String },
#[error("http_client: url could not be parsed: {}", url)]
BadUrl { url: String },
#[error("http_client: http version not supported: {}", version)]
BadVersion { version: String },
#[error("http_client: failed to execute request {}", error)]
RequestFailed { error: String },

// WebSocket errors
#[error("websocket_client: failed to open connection {}", url)]
WsOpenFailed { url: String },
#[error("websocket_client: failed to send message {}", req)]
WsPushFailed { req: String },
#[error("websocket_client: failed to close connection {}", channel_id)]
WsCloseFailed { channel_id: u32 },
}

/// Register a new path with the HTTP server. This will cause the HTTP server to
/// forward any requests on this path to the calling process. Requests will be
/// given in the form of `Result<(), HttpServerError>`
Expand Down Expand Up @@ -342,12 +392,14 @@ pub fn send_request(
) -> anyhow::Result<()> {
let req = uqRequest::new()
.target(("our", "http_client", "sys", "nectar"))
.body(serde_json::to_vec(&OutgoingHttpRequest {
method: method.to_string(),
version: None,
url: url.to_string(),
headers: headers.unwrap_or_default(),
})?)
.body(serde_json::to_vec(&HttpClientAction::Http(
OutgoingHttpRequest {
method: method.to_string(),
version: None,
url: url.to_string(),
headers: headers.unwrap_or_default(),
},
))?)
.blob_bytes(body);
if let Some(timeout) = timeout {
req.expects_response(timeout).send()
Expand All @@ -363,16 +415,16 @@ pub fn send_request_await_response(
headers: Option<HashMap<String, String>>,
timeout: u64,
body: Vec<u8>,
) -> std::result::Result<HttpResponse, HttpClientError> {
) -> std::result::Result<HttpClientResponse, HttpClientError> {
let res = uqRequest::new()
.target(("our", "http_client", "sys", "nectar"))
.body(
serde_json::to_vec(&OutgoingHttpRequest {
serde_json::to_vec(&HttpClientAction::Http(OutgoingHttpRequest {
method: method.to_string(),
version: None,
url: url.to_string(),
headers: headers.unwrap_or_default(),
})
}))
.map_err(|e| HttpClientError::BadRequest {
req: format!("{e:?}"),
})?,
Expand All @@ -383,11 +435,12 @@ pub fn send_request_await_response(
error: e.to_string(),
})?;
match res {
Ok(Message::Response { body, .. }) => {
serde_json::from_slice(&body).map_err(|e| HttpClientError::RequestFailed {
Ok(Message::Response { body, .. }) => match serde_json::from_slice(&body) {
Ok(resp) => resp,
Err(e) => Err(HttpClientError::RequestFailed {
error: format!("http_client gave unparsable response: {e}"),
})
}
}),
},
_ => Err(HttpClientError::RequestFailed {
error: "http_client timed out".to_string(),
}),
Expand Down Expand Up @@ -580,3 +633,112 @@ pub fn send_ws_push(

Ok(())
}

pub fn open_ws_connection(
node: String,
url: String,
headers: Option<HashMap<String, String>>,
channel_id: u32,
) -> anyhow::Result<()> {
uqRequest::new()
.target(Address::new(
node,
ProcessId::from_str("http_client:sys:nectar").unwrap(),
))
.body(
serde_json::json!(HttpClientAction::WebSocketOpen {
url,
headers: headers.unwrap_or(HashMap::new()),
channel_id,
})
.to_string()
.as_bytes()
.to_vec(),
)
.send()?;

Ok(())
}

pub fn open_ws_connection_and_await(
node: String,
url: String,
headers: Option<HashMap<String, String>>,
channel_id: u32,
) -> std::result::Result<std::result::Result<Message, SendError>, anyhow::Error> {
uqRequest::new()
.target(Address::new(
node,
ProcessId::from_str("http_client:sys:nectar").unwrap(),
))
.body(
serde_json::json!(HttpClientAction::WebSocketOpen {
url,
headers: headers.unwrap_or(HashMap::new()),
channel_id,
})
.to_string()
.as_bytes()
.to_vec(),
)
.send_and_await_response(5)
}

pub fn send_ws_client_push(
node: String,
channel_id: u32,
message_type: WsMessageType,
blob: uqBlob,
) -> std::result::Result<(), anyhow::Error> {
uqRequest::new()
.target(Address::new(
node,
ProcessId::from_str("http_client:sys:nectar").unwrap(),
))
.body(
serde_json::json!(HttpClientAction::WebSocketPush {
channel_id,
message_type,
})
.to_string()
.as_bytes()
.to_vec(),
)
.blob(blob)
.send()
}

pub fn close_ws_connection(node: String, channel_id: u32) -> anyhow::Result<()> {
uqRequest::new()
.target(Address::new(
node,
ProcessId::from_str("http_client:sys:nectar").unwrap(),
))
.body(
serde_json::json!(HttpClientAction::WebSocketClose { channel_id })
.to_string()
.as_bytes()
.to_vec(),
)
.send()?;

Ok(())
}

pub fn close_ws_connection_and_await(
node: String,
channel_id: u32,
) -> std::result::Result<std::result::Result<Message, SendError>, anyhow::Error> {
uqRequest::new()
.target(Address::new(
node,
ProcessId::from_str("http_client:sys:nectar").unwrap(),
))
.body(
serde_json::json!(HttpClientAction::WebSocketClose { channel_id })
.to_string()
.as_bytes()
.to_vec(),
)
.send_and_await_response(5)
}

0 comments on commit a2d3e9e

Please sign in to comment.