From b866f1c85670c322b008ec6a562b3369ba1087c1 Mon Sep 17 00:00:00 2001 From: Elder Ryan Date: Sat, 9 Dec 2023 23:47:38 +0800 Subject: [PATCH] refactor!: backend and MessageEndpoint (#500) Rename client to provider Rename ServerMessage to ServiceMessage Add abstract layer of Backend Add macro utils to handle js_sys::Function Backend should not hold a processor Backend should only call APIs via provider which holding function request Added extension back --- Cargo.lock | 10 +- Cargo.toml | 2 +- core/src/consts.rs | 4 +- core/src/dht/did.rs | 8 +- core/src/error.rs | 14 +- core/src/message/protocols/relay.rs | 3 +- core/src/message/types.rs | 10 +- core/src/storage/persistence/idb.rs | 4 +- core/src/swarm/mod.rs | 14 +- core/src/tests/mod.rs | 2 +- core/src/tests/wasm/mod.rs | 1 + core/src/tests/wasm/test_fn_macro.rs | 46 +++ core/src/utils.rs | 95 ++++- examples/ffi/rings.py | 10 +- node/bin/rings.rs | 20 +- node/src/backend/browser.rs | 111 +++--- node/src/backend/mod.rs | 65 ++++ node/src/backend/native/extension.rs | 354 ++++++++++++++++-- node/src/backend/native/mod.rs | 85 +++-- .../backend/native/{server => service}/mod.rs | 93 +++-- .../native/{server => service}/tcp_proxy.rs | 66 ++-- node/src/backend/types.rs | 52 ++- node/src/jsonrpc/handler.rs | 4 + node/src/jsonrpc/server.rs | 63 ++++ node/src/lib.rs | 2 +- node/src/native/cli.rs | 6 +- node/src/native/config.rs | 2 +- node/src/processor.rs | 12 +- node/src/{client => provider}/browser/mod.rs | 4 +- .../browser/provider.rs} | 79 ++-- .../src/{client => provider}/browser/utils.rs | 0 node/src/{client => provider}/ffi.rs | 110 +++--- node/src/{client => provider}/mod.rs | 62 ++- node/src/tests/wasm/browser.rs | 78 ++-- rpc/src/method.rs | 4 + .../src/connections/native_webrtc/mod.rs | 5 +- 36 files changed, 1111 insertions(+), 389 deletions(-) create mode 100644 core/src/tests/wasm/test_fn_macro.rs rename node/src/backend/native/{server => service}/mod.rs (52%) rename node/src/backend/native/{server => service}/tcp_proxy.rs (68%) rename node/src/{client => provider}/browser/mod.rs (94%) rename node/src/{client/browser/client.rs => provider/browser/provider.rs} (89%) rename node/src/{client => provider}/browser/utils.rs (100%) rename node/src/{client => provider}/ffi.rs (75%) rename node/src/{client => provider}/mod.rs (72%) diff --git a/Cargo.lock b/Cargo.lock index 7e572b427..5d517654c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3410,7 +3410,7 @@ dependencies = [ [[package]] name = "rings-core" -version = "0.4.1" +version = "0.4.2" dependencies = [ "arrayref", "async-channel", @@ -3472,7 +3472,7 @@ dependencies = [ [[package]] name = "rings-derive" -version = "0.4.1" +version = "0.4.2" dependencies = [ "proc-macro2", "quote", @@ -3483,7 +3483,7 @@ dependencies = [ [[package]] name = "rings-node" -version = "0.4.1" +version = "0.4.2" dependencies = [ "anyhow", "arrayref", @@ -3542,7 +3542,7 @@ dependencies = [ [[package]] name = "rings-rpc" -version = "0.4.1" +version = "0.4.2" dependencies = [ "base64 0.13.1", "http", @@ -3558,7 +3558,7 @@ dependencies = [ [[package]] name = "rings-transport" -version = "0.4.1" +version = "0.4.2" dependencies = [ "async-trait", "bincode", diff --git a/Cargo.toml b/Cargo.toml index c08e71ad9..35d0773ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = ["core", "transport", "node", "rpc", "derive"] [workspace.package] -version = "0.4.1" +version = "0.4.2" edition = "2021" license = "GPL-3.0" authors = ["RND "] diff --git a/core/src/consts.rs b/core/src/consts.rs index bf052a676..f38cb1a86 100644 --- a/core/src/consts.rs +++ b/core/src/consts.rs @@ -5,6 +5,8 @@ pub const DEFAULT_TTL_MS: u64 = 300 * 1000; pub const MAX_TTL_MS: u64 = DEFAULT_TTL_MS * 10; pub const TS_OFFSET_TOLERANCE_MS: u128 = 3000; pub const DEFAULT_SESSION_TTL_MS: u64 = 30 * 24 * 3600 * 1000; +/// 60k pub const TRANSPORT_MTU: usize = 60000; -pub const TRANSPORT_MAX_SIZE: usize = TRANSPORT_MTU * 16; +/// 60M +pub const TRANSPORT_MAX_SIZE: usize = TRANSPORT_MTU * 1000; pub const VNODE_DATA_MAX_LEN: usize = 1024; diff --git a/core/src/dht/did.rs b/core/src/dht/did.rs index 501df41e4..ebc8c8bc6 100644 --- a/core/src/dht/did.rs +++ b/core/src/dht/did.rs @@ -112,13 +112,7 @@ impl BiasId { impl PartialOrd for BiasId { fn partial_cmp(&self, other: &Self) -> Option { - if other.bias != self.bias { - let did: Did = other.into(); - let bid = BiasId::new(self.bias, did); - self.did.partial_cmp(&bid.did) - } else { - self.did.partial_cmp(&other.did) - } + Some(self.cmp(other)) } } diff --git a/core/src/error.rs b/core/src/error.rs index 4cbb9da11..a6dd55770 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -338,8 +338,8 @@ pub enum Error { #[error("Message decryption failed")] MessageDecryptionFailed(ecies::SecpError), - #[error("message too large, consider use ChunkList")] - MessageTooLarge, + #[error("Message has {0} bytes which is too large")] + MessageTooLarge(usize), #[cfg(feature = "wasm")] #[error("Cannot get property {0} from JsValue")] @@ -362,6 +362,9 @@ pub enum Error { #[error("Transport error: {0}")] Transport(#[from] rings_transport::error::Error), + + #[error("External Javascript error: {0}")] + JsError(String), } #[cfg(feature = "wasm")] @@ -370,3 +373,10 @@ impl From for wasm_bindgen::JsValue { wasm_bindgen::JsValue::from_str(&err.to_string()) } } + +#[cfg(feature = "wasm")] +impl From for Error { + fn from(err: js_sys::Error) -> Self { + Error::JsError(err.to_string().into()) + } +} diff --git a/core/src/message/protocols/relay.rs b/core/src/message/protocols/relay.rs index e6ccb0438..dae3d6b19 100644 --- a/core/src/message/protocols/relay.rs +++ b/core/src/message/protocols/relay.rs @@ -95,6 +95,7 @@ impl MessageRelay { // Prevent infinite loop if has_infinite_loop(&self.path) { + tracing::error!("Infinite path detected {:?}", self.path); return Err(Error::InfiniteRelayPath); } @@ -136,7 +137,7 @@ where T: PartialEq + std::fmt::Debug { let p2 = path.iter().rev().skip(indexes[1]); let p3 = path.iter().rev().skip(indexes[2]); - let lens = vec![ + let lens = [ indexes[1] - indexes[0], indexes[2] - indexes[1], path.len() - indexes[2], diff --git a/core/src/message/types.rs b/core/src/message/types.rs index ba58db8bb..65f3d2d06 100644 --- a/core/src/message/types.rs +++ b/core/src/message/types.rs @@ -167,7 +167,7 @@ pub struct SyncVNodeWithSuccessor { } /// MessageType use to customize message, will be handle by `custom_message` method. -#[derive(Debug, Deserialize, Serialize, Clone)] +#[derive(Deserialize, Serialize, Clone)] pub struct CustomMessage(pub Vec); /// MessageType enum Report contain FindSuccessorSend. @@ -242,3 +242,11 @@ impl Message { Ok(Message::CustomMessage(CustomMessage(msg.to_vec()))) } } + +impl std::fmt::Debug for CustomMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CustomMessage") + .field("size", &self.0.len()) + .finish() + } +} diff --git a/core/src/storage/persistence/idb.rs b/core/src/storage/persistence/idb.rs index 87c6293ef..682131195 100644 --- a/core/src/storage/persistence/idb.rs +++ b/core/src/storage/persistence/idb.rs @@ -169,7 +169,7 @@ where .filter_map(|(k, v)| { Some(( K::from_str(k.as_string().unwrap().as_str()).ok()?, - js_value::deserialize::>(&v).unwrap().data, + js_value::deserialize::>(v).unwrap().data, )) }) .collect::>()) @@ -260,7 +260,7 @@ impl PersistenceStorageOperation for IDBStorage { tracing::debug!("entries: {:?}", entries); if let Some((_k, value)) = entries.first() { - let data_entry: DataStruct = js_value::deserialize(&value)?; + let data_entry: DataStruct = js_value::deserialize(value)?; store .delete(&JsValue::from(&data_entry.key)) .await diff --git a/core/src/swarm/mod.rs b/core/src/swarm/mod.rs index 05432e978..e88e9cd3f 100644 --- a/core/src/swarm/mod.rs +++ b/core/src/swarm/mod.rs @@ -346,10 +346,10 @@ impl PayloadSender for Swarm { let data = payload.to_bincode()?; if data.len() > TRANSPORT_MAX_SIZE { tracing::error!("Message is too large: {:?}", payload); - return Err(Error::MessageTooLarge); + return Err(Error::MessageTooLarge(data.len())); } - if data.len() > TRANSPORT_MTU { + let result = if data.len() > TRANSPORT_MTU { let chunks = ChunkList::::from(&data); for chunk in chunks { let data = @@ -358,11 +358,11 @@ impl PayloadSender for Swarm { conn.send_message(TransportMessage::Custom(data.to_vec())) .await?; } - } - - let result = conn - .send_message(TransportMessage::Custom(data.to_vec())) - .await; + Ok(()) + } else { + conn.send_message(TransportMessage::Custom(data.to_vec())) + .await + }; tracing::debug!( "Sent {:?}, to node {:?}", diff --git a/core/src/tests/mod.rs b/core/src/tests/mod.rs index 173749f4a..45915ad72 100644 --- a/core/src/tests/mod.rs +++ b/core/src/tests/mod.rs @@ -5,7 +5,7 @@ use crate::swarm::Swarm; #[cfg(feature = "wasm")] pub mod wasm; -#[cfg(all(not(feature = "wasm")))] +#[cfg(not(feature = "wasm"))] pub mod default; #[allow(dead_code)] diff --git a/core/src/tests/wasm/mod.rs b/core/src/tests/wasm/mod.rs index 48b848d2b..abd496c13 100644 --- a/core/src/tests/wasm/mod.rs +++ b/core/src/tests/wasm/mod.rs @@ -9,6 +9,7 @@ use crate::swarm::Swarm; use crate::swarm::SwarmBuilder; mod test_channel; +mod test_fn_macro; mod test_ice_servers; mod test_idb_storage; mod test_utils; diff --git a/core/src/tests/wasm/test_fn_macro.rs b/core/src/tests/wasm/test_fn_macro.rs new file mode 100644 index 000000000..d1a735d1f --- /dev/null +++ b/core/src/tests/wasm/test_fn_macro.rs @@ -0,0 +1,46 @@ +use js_sys::Array; +use js_sys::Function; +use wasm_bindgen::JsValue; +use wasm_bindgen_test::wasm_bindgen_test; + +use crate::utils::js_func; + +#[wasm_bindgen_test] +async fn test_fn_generator() { + let js_code_args = "a, b, c, d"; + let js_code_body = r#" +try { + return new Promise((resolve, reject) => { + const ret = a + b + c + d + if (ret !== "hello world") { + reject(`a: ${a}, b: ${b}, c: ${c}, d: ${d} -> ret: ${ret}`) + } else { + resolve(ret) + } + }) +} catch(e) { + return e +} +"#; + let func = Function::new_with_args(js_code_args, js_code_body); + let native_func = js_func::of4::(&func); + let a = "hello".to_string(); + let b = " ".to_string(); + let c = "world".to_string(); + let d = "".to_string(); + native_func(a, b, c, d).await.unwrap(); +} + +#[wasm_bindgen_test] +async fn test_try_into() { + let a = "hello".to_string(); + let b = " ".to_string(); + let c = "world".to_string(); + let p: Vec = vec![ + a.try_into().unwrap(), + b.try_into().unwrap(), + c.try_into().unwrap(), + ]; + let array = Array::from_iter(p.into_iter()); + assert_eq!(array.to_vec().len(), 3, "{:?}", array.to_vec()); +} diff --git a/core/src/utils.rs b/core/src/utils.rs index 5bd6582a9..4e6ef9b36 100644 --- a/core/src/utils.rs +++ b/core/src/utils.rs @@ -41,10 +41,99 @@ pub mod js_value { } /// From JsValue to serde - pub fn deserialize(obj: &(impl Into + Clone)) -> Result { - let value: JsValue = (*obj).clone().into(); - serde_wasm_bindgen::from_value(value).map_err(Error::SerdeWasmBindgenError) + pub fn deserialize(obj: impl Into) -> Result { + serde_wasm_bindgen::from_value(obj.into()).map_err(Error::SerdeWasmBindgenError) + } +} + +#[cfg(feature = "wasm")] +pub mod js_func { + /// This macro will generate a wrapper for mapping a js_sys::Function with type fn(T, T, T, T) -> Promise<()> + /// to native function + /// # Example: + /// For macro calling: of!(of2, a: T0, b: T1); + /// Will generate code: + /// ```rust + /// pub fn of2<'a, 'b: 'a, T0: TryInto + Clone, T1: TryInto + Clone>( + /// func: &Function, + /// ) -> Box Pin> + 'b>>> + /// where + /// T0: 'b, + /// T1: 'b, + /// T0::Error: Debug, + /// T1::Error: Debug, + /// { + /// let func = func.clone(); + /// Box::new( + /// move |a: T0, b: T1| -> Pin>>> { + /// let func = func.clone(); + /// Box::pin(async move { + /// let func = func.clone(); + /// let params = js_sys::Array::new(); + /// let a: JsValue = a + /// .clone() + /// .try_into() + /// .map_err(|_| Error::JsError(format!("{:?}", e))); + /// params.push(&a); + /// let b: JsValue = b + /// .clone() + /// .try_into() + /// .map_err(|_| Error::JsError(format!("{:?}", e))); + /// params.push(&b); + /// JsFuture::from(js_sys::Promise::from( + /// func.apply(&JsValue::NULL, ¶ms).map_err(|e| { + /// Error::JsError(js_sys::Error::from(e).to_string().into()) + /// })?, + /// )) + /// .await + /// .map_err(|e| Error::JsError(js_sys::Error::from(e).to_string().into()))?; + /// Ok(()) + /// }) + /// }, + /// ) + /// } + /// ``` + #[macro_export] + macro_rules! of { + ($func: ident, $($name:ident: $type: ident),+$(,)?) => { + pub fn $func<'a, 'b: 'a, $($type: TryInto + Clone),+>( + func: &js_sys::Function, + ) -> Box std::pin::Pin> + 'b>>> + where $($type::Error: std::fmt::Debug),+, + $($type: 'b),+ + { + let func = func.clone(); + Box::new( + move |$($name: $type,)+| -> std::pin::Pin>>> { + let func = func.clone(); + Box::pin(async move { + let func = func.clone(); + let params = js_sys::Array::new(); + $( + let $name: wasm_bindgen::JsValue = $name.clone().try_into().map_err(|e| $crate::error::Error::JsError(format!("{:?}", e)))?; + params.push(&$name); + )+ + wasm_bindgen_futures::JsFuture::from(js_sys::Promise::from( + func.apply( + &wasm_bindgen::JsValue::NULL, + ¶ms + ) + .map_err(|e| $crate::error::Error::from(js_sys::Error::from(e)))?, + )) + .await + .map_err(|e| $crate::error::Error::from(js_sys::Error::from(e)))?; + Ok(()) + }) + }, + ) + } + } } + + of!(of1, a: T0); + of!(of2, a: T0, b: T1); + of!(of3, a: T0, b: T1, c: T2); + of!(of4, a: T0, b: T1, c: T2, d: T3); } #[cfg(feature = "wasm")] diff --git a/examples/ffi/rings.py b/examples/ffi/rings.py index 9869edccb..4bbf7ed51 100644 --- a/examples/ffi/rings.py +++ b/examples/ffi/rings.py @@ -40,9 +40,9 @@ def on_inbound(payload): return -def create_client(rings_node, acc): +def create_provider(rings_node, acc): callback = rings_node.new_callback(on_inbound) - client = rings_node.new_client_with_callback( + provider = rings_node.new_provider_with_callback( "stun://stun.l.google.com".encode(), 10, acc.address.encode(), @@ -50,12 +50,12 @@ def create_client(rings_node, acc): signer, ffi.addressof(callback), ) - return client + return provider if __name__ == "__main__": rings_node = ffi.dlopen(f"./target/debug/librings_node.{extension}") rings_node.init_logging(rings_node.Debug) - client = create_client(rings_node, acc) - print(client) + provider = create_provider(rings_node, acc) + print(provider) diff --git a/node/bin/rings.rs b/node/bin/rings.rs index c39365228..1aa21135e 100644 --- a/node/bin/rings.rs +++ b/node/bin/rings.rs @@ -12,21 +12,23 @@ use futures::pin_mut; use futures::select; use futures::StreamExt; use futures_timer::Delay; -use rings_core::dht::Did; -use rings_core::session::SessionSkBuilder; -use rings_node::backend::native::Backend; use rings_node::backend::native::BackendConfig; +use rings_node::backend::native::BackendContext; +use rings_node::backend::Backend; use rings_node::logging::init_logging; use rings_node::logging::LogLevel; use rings_node::measure::PeriodicMeasure; use rings_node::native::cli::Client; use rings_node::native::config; use rings_node::native::endpoint::run_http_api; +use rings_node::prelude::rings_core::dht::Did; use rings_node::prelude::rings_core::ecc::SecretKey; use rings_node::prelude::PersistenceStorage; +use rings_node::prelude::SessionSkBuilder; use rings_node::processor::Processor; use rings_node::processor::ProcessorBuilder; use rings_node::processor::ProcessorConfig; +use rings_node::provider::Provider; use rings_node::util::ensure_parent_dir; use rings_node::util::expand_home; use tokio::io; @@ -341,6 +343,9 @@ struct SendHttpCommand { #[arg(long, default_value = "30000")] timeout: u64, + + #[arg(long = "request_id", short = 'i', help = "set request id")] + rid: Option, } #[derive(Args, Debug)] @@ -446,10 +451,10 @@ async fn daemon_run(args: RunCommand) -> anyhow::Result<()> { .build()?, ); println!("Did: {}", processor.swarm.did()); - - let backend = Arc::new(Backend::new(bc, processor.clone()).await?); - let backend_service_names = backend.service_names(); - + let backend_context = BackendContext::new(bc).await?; + let backend_service_names = backend_context.service_names(); + let provider = Arc::new(Provider::from_processor(processor.clone())); + let backend = Arc::new(Backend::new(provider, Box::new(backend_context))); processor.swarm.set_callback(backend).unwrap(); let processor_clone = processor.clone(); @@ -571,6 +576,7 @@ async fn main() -> anyhow::Result<()> { }) .collect::>(), args.body.map(|x| x.as_bytes().to_vec()), + args.rid, ) .await? .display(); diff --git a/node/src/backend/browser.rs b/node/src/backend/browser.rs index e7365f20c..328275e07 100644 --- a/node/src/backend/browser.rs +++ b/node/src/backend/browser.rs @@ -1,84 +1,81 @@ +#![warn(missing_docs)] +//! BackendContext implementation for browser +use std::sync::Arc; + use async_trait::async_trait; -use rings_core::message::CustomMessage; -use rings_core::message::Message; +use js_sys::Function; use rings_core::message::MessagePayload; -use rings_core::swarm::callback::SwarmCallback; +use rings_core::utils::js_func; use rings_core::utils::js_value; use rings_derive::wasm_export; use wasm_bindgen::JsValue; use crate::backend::types::BackendMessage; -use crate::error::Error; +use crate::backend::types::MessageEndpoint; use crate::error::Result; +use crate::provider::Provider; -/// MessageCallback instance for Browser +/// BackendContext is a context instance for handling backend message for browser #[wasm_export] -pub struct Backend { - backend_message_handler: js_sys::Function, +#[derive(Clone)] +pub struct BackendContext { + service_message_handler: Option, + plain_text_message_handler: Option, + extension_message_handler: Option, } #[wasm_export] -impl Backend { +impl BackendContext { /// Create a new instance of message callback, this function accept one argument: /// - /// * backend_message_handler: function(payload: string, message: string) -> (); + /// * backend_message_handler: `function(provider: Arc, payload: string, message: string) -> Promise<()>`; #[wasm_bindgen(constructor)] - pub fn new(backend_message_handler: js_sys::Function) -> Backend { - Backend { - backend_message_handler, + pub fn new( + service_message_handler: Option, + plain_text_message_handler: Option, + extension_message_handler: Option, + ) -> BackendContext { + BackendContext { + service_message_handler, + plain_text_message_handler, + extension_message_handler, } } } -#[async_trait(?Send)] -impl SwarmCallback for Backend { - async fn on_inbound( - &self, - payload: &MessagePayload, - ) -> std::result::Result<(), Box> { - let data: Message = payload.transaction.data()?; - - let Message::CustomMessage(CustomMessage(msg)) = data else { - return Ok(()); - }; - - let backend_msg = bincode::deserialize(&msg)?; - tracing::debug!("backend_message received: {backend_msg:?}"); - - self.handle_backend_message(payload, &backend_msg).await?; - - Ok(()) - } -} - -impl Backend { - async fn handle_backend_message( +#[cfg_attr(feature = "browser", async_trait(?Send))] +#[cfg_attr(not(feature = "browser"), async_trait)] +impl MessageEndpoint for BackendContext { + async fn on_message( &self, + provider: Arc, payload: &MessagePayload, msg: &BackendMessage, ) -> Result<()> { - let this = JsValue::null(); - let r = self - .backend_message_handler - .call2( - &this, - &js_value::serialize(&payload)?, - &js_value::serialize(&msg)?, - ) - .map_err(|e| Error::JsError(format!("call backend_message_handler failed: {e:?}")))?; - - let p = js_sys::Promise::try_from(r).map_err(|e| { - Error::JsError(format!( - "convert backend_message_handler promise failed: {e:?}" - )) - })?; - - wasm_bindgen_futures::JsFuture::from(p).await.map_err(|e| { - Error::JsError(format!( - "invoke backend_message_handler promise failed: {e:?}" - )) - })?; - + let provider = provider.clone().as_ref().clone(); + let ctx = js_value::serialize(&payload)?; + match msg { + BackendMessage::ServiceMessage(m) => { + if let Some(func) = &self.service_message_handler { + let m = js_value::serialize(m)?; + let cb = js_func::of4::(func); + cb(self.clone(), provider, ctx, m).await?; + } + } + BackendMessage::Extension(m) => { + if let Some(func) = &self.extension_message_handler { + let m = js_value::serialize(m)?; + let cb = js_func::of4::(func); + cb(self.clone(), provider, ctx, m).await?; + } + } + BackendMessage::PlainText(m) => { + if let Some(func) = &self.plain_text_message_handler { + let cb = js_func::of4::(func); + cb(self.clone(), provider, ctx, m.to_string()).await?; + } + } + } Ok(()) } } diff --git a/node/src/backend/mod.rs b/node/src/backend/mod.rs index 834d2c9bc..c5b2f7e78 100644 --- a/node/src/backend/mod.rs +++ b/node/src/backend/mod.rs @@ -1,7 +1,72 @@ +#![warn(missing_docs)] +//! This module provide basic mechanism. + pub mod types; +use std::sync::Arc; + +use async_trait::async_trait; +use rings_core::message::CustomMessage; +use rings_core::message::Message; +use rings_core::message::MessagePayload; +use rings_core::swarm::callback::SwarmCallback; + +use crate::backend::types::BackendMessage; +use crate::backend::types::MessageEndpoint; +use crate::error::Result; +use crate::provider::Provider; #[cfg(feature = "browser")] pub mod browser; #[cfg(feature = "node")] pub mod native; + +#[cfg(feature = "node")] +type HandlerTrait = dyn MessageEndpoint + Send + Sync; +#[cfg(feature = "browser")] +type HandlerTrait = dyn MessageEndpoint; + +/// Backend handle custom messages from Swarm +pub struct Backend { + provider: Arc, + handler: Box, +} + +impl Backend { + /// Create a new backend instance with Provider and Handler functions + pub fn new(provider: Arc, handler: Box) -> Self { + Self { provider, handler } + } + + async fn on_backend_message( + &self, + payload: &MessagePayload, + msg: &BackendMessage, + ) -> Result<()> { + let provider = self.provider.clone(); + self.handler.on_message(provider, payload, msg).await + } +} + +#[cfg_attr(feature = "browser", async_trait(?Send))] +#[cfg_attr(not(feature = "browser"), async_trait)] +impl SwarmCallback for Backend { + async fn on_inbound( + &self, + + payload: &MessagePayload, + ) -> std::result::Result<(), Box> { + let data: Message = payload.transaction.data()?; + + let Message::CustomMessage(CustomMessage(msg)) = data else { + return Ok(()); + }; + + let backend_msg = bincode::deserialize(&msg)?; + tracing::debug!("backend_message received: {backend_msg:?}"); + + self.on_backend_message(payload, &backend_msg).await?; + + Ok(()) + } +} diff --git a/node/src/backend/native/extension.rs b/node/src/backend/native/extension.rs index 18311006a..a95633d01 100644 --- a/node/src/backend/native/extension.rs +++ b/node/src/backend/native/extension.rs @@ -1,5 +1,4 @@ #![warn(missing_docs)] - //! This module supports a user-defined message handler based on WebAssembly (Wasm). //! The Rings network allows loading a Wasm or Wat file from a remote or local file and transforming it into a message handler. //! The Wasm module should satisfy the following requirements: @@ -12,11 +11,7 @@ //! //! ```text //! "message_abi" => { -//! "message_type" => msg_type, -//! "extra" => extra, -//! "data" => data, -//! "read_at" => read_at, -//! "write_at" => write_at +//! "request" => request, //! } //! ``` //! A basic wasm extension may looks like: @@ -24,8 +19,8 @@ //! ```wat //! (module //! ;; Let's import message_type from message_abi -//! (type $ty_message_type (func (param externref) (result i32))) -//! (import "message_abi" "message_type" (func $message_type (type $ty_message_type))) +//! (type $ty_request_type (func (param externref i32 i32) (result i32))) +//! (import "request" "request_type" (func request_type (type $ty_message_type))) //! ;; fn handler(param: ExternRef) -> ExternRef //! (func $handler (param externref) (result externref) //! (return (local.get 0)) @@ -34,19 +29,20 @@ //! ) //! //! You can see that this wat/wasm extension defines a handler function and -//! imports the message_type ABI. - +//! imports the request ABI. use std::sync::Arc; +use std::sync::RwLock; -use async_trait::async_trait; -use bytes::Bytes; -use rings_core::message::MessagePayload; +use loader::Handler; +use reqwest; use serde::Deserialize; use serde::Serialize; -use crate::backend::types::MessageEndpoint; +use super::MessageEndpoint; +use crate::error::Error; use crate::error::Result; -use crate::processor::Processor; +use crate::prelude::*; +use crate::provider::Provider; /// Path of a wasm extension #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] @@ -66,28 +62,340 @@ pub struct ExtensionConfig { /// Manager of Extension pub struct Extension { - #[allow(dead_code)] - processor: Arc, + /// Extension list + handlers: Vec, } /// Calls the extension handler with the given message and returns the response. pub trait ExtensionHandlerCaller { /// Call extension handler - fn call(&self, msg: Bytes) -> Result<()>; + fn call(&self, msg: bytes::Bytes, provider: Arc) -> Result<()>; +} + +/// Wrapper for BackendMessage and Provider that can be converted from and to the native WebAssembly type. +#[derive(Clone, Default)] +pub struct WasmABIContainer { + msg: Arc>>>, + provider: Arc>>>, +} + +impl WasmABIContainer { + /// Ask the instance to setup a new backend message to memory; + pub fn set_message(&self, msg: bytes::Bytes) -> Result<()> { + let mut guard = self + .msg + .write() + .map_err(|_| Error::WasmBackendMessageRwLockError)?; + *guard = Some(Box::new(msg)); + Ok(()) + } + + /// Ask the instance to set up provider for message calling + pub fn set_provider(&self, provider: Arc) -> Result<()> { + let mut guard = self + .provider + .write() + .map_err(|_| Error::WasmBackendMessageRwLockError)?; + *guard = Some(provider); + Ok(()) + } + + /// Create a new WasmAbiContainer instance + pub fn new(msg: Option, provider: Arc) -> Self { + Self { + msg: Arc::new(RwLock::new(msg.map(Box::new))), + provider: Arc::new(RwLock::new(Some(provider))), + } + } } impl Extension { + /// Loads a wasm module from the specified path, the path can be remote or local + async fn load(path: &Path) -> Result { + match path { + Path::Local(path) => loader::load_from_fs(path.to_string()).await, + Path::Remote(path) => { + let data: String = reqwest::get(path) + .await + .map_err(|e| Error::HttpRequestError(e.to_string()))? + .text() + .await + .map_err(|e| Error::HttpRequestError(e.to_string()))?; + loader::load(data).await + } + } + } + /// Creates a new Extension instance with the specified configuration. - pub async fn new(_config: &ExtensionConfig, processor: Arc) -> Result { - Ok(Self { processor }) + pub async fn new(config: &ExtensionConfig) -> Result { + let mut handlers = vec![]; + for p in &config.paths { + if let Ok(h) = Self::load(p).await { + handlers.push(h) + } else { + log::error!("Failed on loading extension {:?}", p) + } + } + Ok(Self { handlers }) } } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] -impl MessageEndpoint for Extension { - /// Handles the incoming message by passing it to the extension handlers. - async fn handle_message(&self, _ctx: &MessagePayload, _data: &Bytes) -> Result<()> { +impl MessageEndpoint for Extension { + /// Handles the incoming message by passing it to the extension handlers and returning the resulting events. + async fn on_message( + &self, + provider: Arc, + _ctx: &MessagePayload, + data: &bytes::Bytes, + ) -> Result<()> { + for h in &self.handlers { + h.call(data.clone(), provider.clone())?; + } Ok(()) } } + +/// Loader of wasm, including ABI generator +pub mod loader { + //! Wasm Loader module + use core::any::Any; + use std::ffi::CStr; + use std::fs; + use std::os::raw::c_char; + use std::sync::Arc; + use std::sync::RwLock; + + use lazy_static::lazy_static; + use wasmer::imports; + use wasmer::AsStoreMut; + use wasmer::ExternRef; + use wasmer::FromToNativeWasmType; + use wasmer::FunctionEnv; + use wasmer::FunctionEnvMut; + use wasmer::FunctionType; + use wasmer::Type; + use wasmer::TypedFunction; + use wasmer::Value; + + use super::WasmABIContainer; + use crate::error::Error; + use crate::error::Result; + use crate::provider::Provider; + + lazy_static! { + static ref WASM_MEM: Arc> = + Arc::new(RwLock::new(wasmer::Store::default())); + } + + impl TryFrom for FunctionEnv { + type Error = Error; + fn try_from(data: WasmABIContainer) -> Result { + let mut mem = WASM_MEM + .write() + .map_err(|_| Error::WasmGlobalMemoryLockError)?; + Ok(FunctionEnv::new(&mut mem, data)) + } + } + + /// The "WasmABILander" defines how a Rust native struct generates the corresponding Wasm ABI for its getter functions. + pub trait WasmABILander: Sized + Any + Send + 'static { + /// The land_abi function needs to return an ImportObject. + /// read more: + fn land_abi(env: &FunctionEnv, store: &mut impl AsStoreMut) -> wasmer::Imports; + } + + impl WasmABILander for WasmABIContainer { + fn land_abi(env: &FunctionEnv, store: &mut impl AsStoreMut) -> wasmer::Imports { + let request = wasmer::Function::new_with_env( + store, + env, + FunctionType::new(vec![Type::ExternRef, Type::I32, Type::I32], vec![]), + WasmABIContainer::request, + ); + + #[rustfmt::skip] + imports! { + "message_abi" => { + "request" => request + } + } + } + } + + impl WasmABIContainer { + /// wasm function type `Fn (Option, i32, i32) -> \[0\]`, external ref is always pointed to Env + pub fn request( + env: FunctionEnvMut, + params: &[Value], + ) -> core::result::Result, wasmer::RuntimeError> { + match params { + [Value::ExternRef(_), Value::I32(method_ptr), Value::I32(params_ptr)] => { + let container: WasmABIContainer = env.data().clone(); + let method_cstr = unsafe { CStr::from_ptr((*method_ptr) as *const c_char) }; + let method = method_cstr.to_str().unwrap(); + let params_cstr = unsafe { CStr::from_ptr((*params_ptr) as *const c_char) }; + let params = params_cstr.to_str().unwrap(); + let guard = container.provider.read().map_err(|_| { + wasmer::RuntimeError::new("Failed on lock memory of external ref") + })?; + if let Some(provider) = guard.clone() { + let params = serde_json::from_str(params) + .map_err(|_| wasmer::RuntimeError::new("Failed to serialize params"))?; + futures::executor::block_on(provider.request_internal( + method.to_string(), + params, + None, + )) + .map_err(|_| { + wasmer::RuntimeError::new("Failed to call async request function") + })?; + } else { + return Err(wasmer::RuntimeError::new( + "Failed on call func `write_at`:: ExternalRef is NULL", + )); + } + Ok(vec![]) + } + x => Err(wasmer::RuntimeError::new(format!( + "Expect [Externef, i32], got {:?}", + x + ))), + } + } + } + + unsafe impl FromToNativeWasmType for WasmABIContainer { + type Native = Option; + + fn from_native(native: Self::Native) -> Self { + if native.is_none() { + return Self::default(); + } + match WASM_MEM + .read() + .map_err(|_| Error::WasmGlobalMemoryLockError) + { + Ok(mem) => { + if let Some(m) = native.unwrap().downcast::(&mem) { + m.clone() + } else { + Self::default() + } + } + Err(e) => { + log::error!("{:?}", e); + Self::default() + } + } + } + + fn to_native(self) -> Self::Native { + // Convert BackendMessage to the native representation + match WASM_MEM + .write() + .map_err(|_| Error::WasmGlobalMemoryLockError) + { + Ok(mut mem) => { + let ext_ref = ExternRef::new::(&mut mem, self); + // Checks whether this ExternRef can be used with the given context. + if ext_ref.is_from_store(&mem) { + Some(ext_ref) + } else { + None + } + } + Err(e) => { + log::error!("{:?}", e); + None + } + } + } + } + + /// Type of message handler that the Wasm/Wat should implement + type TyHandler = TypedFunction, Option>; + + /// Externref type handler, this is a wrapper of handler function + pub struct Handler { + /// The native function get from wasm. + pub func: TyHandler, + /// By default, when resolving an ExternRef, it points to the function environment. + pub container: WasmABIContainer, + } + + impl super::ExtensionHandlerCaller for Handler { + fn call(&self, msg: bytes::Bytes, provider: Arc) -> Result<()> { + self.container.set_message(msg)?; + self.container.set_provider(provider)?; + let native_container = self.container.clone().to_native(); + { + let mut mem = WASM_MEM + .write() + .map_err(|_| Error::WasmGlobalMemoryLockError)?; + self.func + .call(&mut mem, native_container) + .map_err(|e| Error::WasmRuntimeError(e.to_string()))? + }; + Ok(()) + } + } + + /// wasm loarder, bytes can be WAT of *.wasm binary + pub async fn load(bytes: impl AsRef<[u8]>) -> Result { + let container = WasmABIContainer::default(); + let env: FunctionEnv = container.clone().try_into()?; + let mut store = WASM_MEM + .write() + .map_err(|_| Error::WasmGlobalMemoryLockError)?; + let module = wasmer::Module::new(&store, &bytes) + .map_err(|e| Error::WasmCompileError(e.to_string()))?; + // The module doesn't import anything, so we create an empty import object. + let import_object = WasmABIContainer::land_abi(&env, &mut store); + let ins = wasmer::Instance::new(&mut store, &module, &import_object) + .map_err(|_| Error::WasmInstantiationError)?; + let exports: wasmer::Exports = ins.exports; + let handler: TyHandler = exports + .get_function("handler") + .map_err(|_| Error::WasmExportError)? + .typed(&store) + .map_err(|_| Error::WasmExportError)?; + + Ok(Handler { + func: handler, + container, + }) + } + + /// Load wasm from filesystem + pub async fn load_from_fs(path: String) -> Result { + if let Ok(wat) = fs::read_to_string(path) { + load(wat).await + } else { + Err(Error::WasmFailedToLoadFile) + } + } +} + +#[cfg(not(feature = "browser"))] +#[cfg(test)] +mod test { + use crate::backend::native::extension::loader::load; + + #[tokio::test] + async fn test_load_wasm() { + // about wat: https://developer.mozilla.org/en-US/docs/WebAssembly/Understanding_the_text_format + let wasm = r#" +(module + ;; fn handler(param: ExternRef) -> ExternRef + (func $handler (param externref) (result externref) + (return (local.get 0)) + ) + + (export "handler" (func $handler)) +) +"#; + let _handler = load(wasm.to_string()).await.unwrap(); + } +} diff --git a/node/src/backend/native/mod.rs b/node/src/backend/native/mod.rs index f7dc50c20..bd7eb98ca 100644 --- a/node/src/backend/native/mod.rs +++ b/node/src/backend/native/mod.rs @@ -1,42 +1,71 @@ +#![warn(missing_docs)] +//! This module provides the implementation of a native Backend, +//! which includes BackendConfig and BackendContext. +//! +//! This module has two submodules: extension and service. +//! +//! The submodule [service] aims to provide an implementation of Rings Network based TCP services. +//! It can forward a TCP request from the Rings Network to a local request. +//! +//! The submodule extension aims to provide an implementation of Rings extensions. +//! These extensions are based on WebAssembly (WASM), allowing downloaded WASM code to be executed +//! as an external extension of the backend. + pub mod extension; -pub mod server; +pub mod service; use std::sync::Arc; use async_trait::async_trait; -use rings_core::message::CustomMessage; -use rings_core::message::Message; use rings_core::message::MessagePayload; use rings_core::message::MessageVerificationExt; -use rings_core::swarm::callback::SwarmCallback; use crate::backend::native::extension::Extension; use crate::backend::native::extension::ExtensionConfig; -use crate::backend::native::server::Server; -use crate::backend::native::server::ServiceConfig; +use crate::backend::native::service::ServiceConfig; +use crate::backend::native::service::ServiceProvider; use crate::backend::types::BackendMessage; use crate::backend::types::MessageEndpoint; use crate::error::Result; -use crate::processor::Processor; +use crate::provider::Provider; +/// BackendConfig including services config and extension config pub struct BackendConfig { + /// Config of services pub services: Vec, + /// Config of extensions pub extensions: ExtensionConfig, } -pub struct Backend { - server: Server, +/// BackendContext is a Context holder of backend message handler +pub struct BackendContext { + server: ServiceProvider, extension: Extension, } -impl Backend { - pub async fn new(config: BackendConfig, processor: Arc) -> Result { +#[cfg_attr(feature = "browser", async_trait(?Send))] +#[cfg_attr(not(feature = "browser"), async_trait)] +impl MessageEndpoint for BackendContext { + async fn on_message( + &self, + provider: Arc, + payload: &MessagePayload, + msg: &BackendMessage, + ) -> Result<()> { + self.handle_backend_message(provider, payload, msg).await + } +} + +impl BackendContext { + /// Create a new BackendContext instance with config + pub async fn new(config: BackendConfig) -> Result { Ok(Self { - server: Server::new(config.services, processor.clone()), - extension: Extension::new(&config.extensions, processor.clone()).await?, + server: ServiceProvider::new(config.services), + extension: Extension::new(&config.extensions).await?, }) } + /// List service names pub fn service_names(&self) -> Vec { self.server .services @@ -47,12 +76,17 @@ impl Backend { async fn handle_backend_message( &self, + provider: Arc, payload: &MessagePayload, msg: &BackendMessage, ) -> Result<()> { match msg { - BackendMessage::Extension(data) => self.extension.handle_message(payload, data).await, - BackendMessage::ServerMessage(data) => self.server.handle_message(payload, data).await, + BackendMessage::Extension(data) => { + self.extension.on_message(provider, payload, data).await + } + BackendMessage::ServiceMessage(data) => { + self.server.on_message(provider, payload, data).await + } BackendMessage::PlainText(text) => { let peer_did = payload.transaction.signer(); tracing::info!("BackendMessage from {peer_did:?} PlainText: {text:?}"); @@ -61,24 +95,3 @@ impl Backend { } } } - -#[async_trait] -impl SwarmCallback for Backend { - async fn on_inbound( - &self, - payload: &MessagePayload, - ) -> std::result::Result<(), Box> { - let data: Message = payload.transaction.data()?; - - let Message::CustomMessage(CustomMessage(msg)) = data else { - return Ok(()); - }; - - let backend_msg = bincode::deserialize(&msg)?; - tracing::debug!("backend_message received: {backend_msg:?}"); - - self.handle_backend_message(payload, &backend_msg).await?; - - Ok(()) - } -} diff --git a/node/src/backend/native/server/mod.rs b/node/src/backend/native/service/mod.rs similarity index 52% rename from node/src/backend/native/server/mod.rs rename to node/src/backend/native/service/mod.rs index 889a7c83a..bada19c20 100644 --- a/node/src/backend/native/server/mod.rs +++ b/node/src/backend/native/service/mod.rs @@ -1,5 +1,20 @@ +#![warn(missing_docs)] +//! This module provides the implementation of the Server of Rings Service. +//! Rings Service is a TCP/IP handler that can forward messages from a DHT (Distributed Hash Table) to +//! a local path. This module is essential for applications requiring decentralized network communication. +//! +//! # Service Config +//! +//! A Service Config is used to create a server instance. It contains the required parameters of +//! the services, describing how to forward messages to a local TCP socket. This configuration allows for +//! flexible and customized message routing based on specific application needs. +//! +//! # Service Provider +//! +//! A Rings Service Provider is a structure that serves Rings Service. Sometimes referred to as +//! "hidden-services," the Rings Service Provider exclusively handles the ServiceMessage type +//! of BackendMessage. This component is crucial for managing the flow of messages within decentralized networks. mod tcp_proxy; - use std::collections::HashMap; use std::net::SocketAddr; use std::str::FromStr; @@ -12,18 +27,22 @@ use rings_core::message::MessageVerificationExt; use serde::Deserialize; use serde::Serialize; -use crate::backend::native::server::tcp_proxy::tcp_connect_with_timeout; -use crate::backend::native::server::tcp_proxy::Tunnel; +use crate::backend::native::service::tcp_proxy::tcp_connect_with_timeout; +use crate::backend::native::service::tcp_proxy::Tunnel; use crate::backend::native::MessageEndpoint; +use crate::backend::types::BackendMessage; use crate::backend::types::HttpRequest; use crate::backend::types::HttpResponse; -use crate::backend::types::ServerMessage; +use crate::backend::types::ServiceMessage; use crate::backend::types::TunnelId; use crate::consts::TCP_SERVER_TIMEOUT; use crate::error::Error; use crate::error::Result; -use crate::processor::Processor; +use crate::jsonrpc::server::BackendMessageParams; +use crate::prelude::jsonrpc_core::Params; +use crate::provider::Provider; +/// Service Config for creating a Server instance #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] pub struct ServiceConfig { /// service name @@ -36,16 +55,18 @@ pub struct ServiceConfig { pub addr: SocketAddr, } -pub struct Server { - processor: Arc, +/// Service Provider, which hold tunnel and a list of service +pub struct ServiceProvider { + /// Service configs pub services: Vec, + /// Services tunnel, which is a HashMap of tunnel Id and Tunnel instance pub tunnels: DashMap, } -impl Server { - pub fn new(services: Vec, processor: Arc) -> Self { +impl ServiceProvider { + /// Create a new ServiceProvider with a config list + pub fn new(services: Vec) -> Self { Self { - processor, services, tunnels: DashMap::new(), } @@ -58,38 +79,51 @@ impl Server { } #[async_trait::async_trait] -impl MessageEndpoint for Server { - async fn handle_message(&self, ctx: &MessagePayload, msg: &ServerMessage) -> Result<()> { +impl MessageEndpoint for ServiceProvider { + async fn on_message( + &self, + provider: Arc, + ctx: &MessagePayload, + msg: &ServiceMessage, + ) -> Result<()> { let peer_did = ctx.transaction.signer(); match msg { - ServerMessage::TcpDial { tid, service } => { + ServiceMessage::TcpDial { tid, service } => { let service = self.service(service).ok_or(Error::InvalidService)?; match tcp_connect_with_timeout(service.addr, TCP_SERVER_TIMEOUT).await { Err(e) => { - let msg = ServerMessage::TcpClose { + let msg = ServiceMessage::TcpClose { tid: *tid, reason: e, }; - self.processor.send_backend_message(peer_did, msg).await?; + let backend_message: BackendMessage = msg.into(); + let params: Params = BackendMessageParams { + did: peer_did, + data: backend_message, + } + .try_into()?; + provider + .request("sendBackendMessage".to_string(), params, None) + .await?; Err(Error::TunnelError(e)) } Ok(local_stream) => { let mut tunnel = Tunnel::new(*tid); tunnel - .listen(local_stream, self.processor.clone(), peer_did) + .listen(provider.clone(), local_stream, peer_did) .await; self.tunnels.insert(*tid, tunnel); Ok(()) } } } - ServerMessage::TcpClose { tid, .. } => { + ServiceMessage::TcpClose { tid, .. } => { self.tunnels.remove(tid); Ok(()) } - ServerMessage::TcpPackage { tid, body } => { + ServiceMessage::TcpPackage { tid, body } => { self.tunnels .get(tid) .ok_or(Error::TunnelNotFound)? @@ -97,16 +131,23 @@ impl MessageEndpoint for Server { .await; Ok(()) } - ServerMessage::HttpRequest(req) => { + ServiceMessage::HttpRequest(req) => { let service = self.service(&req.service).ok_or(Error::InvalidService)?; let resp = handle_http_request(service.addr, req).await?; - self.processor - .send_backend_message(peer_did, ServerMessage::HttpResponse(resp)) + let msg: BackendMessage = ServiceMessage::HttpResponse(resp).into(); + let params: Params = BackendMessageParams { + did: peer_did, + data: msg, + } + .try_into()?; + let resp = provider + .request("sendBackendMessage".to_string(), params, None) .await?; + tracing::info!("done calling provider {:?}", resp); Ok(()) } - ServerMessage::HttpResponse(resp) => { - tracing::info!("ServerMessage from {peer_did:?} HttpResponse: {resp:?}"); + ServiceMessage::HttpResponse(resp) => { + tracing::info!("ServiceMessage from {peer_did:?} HttpResponse: {resp:?}"); Ok(()) } } @@ -114,7 +155,8 @@ impl MessageEndpoint for Server { } async fn handle_http_request(addr: SocketAddr, req: &HttpRequest) -> Result { - let url = format!("{}/{}", addr, req.path.trim_start_matches('/')); + let url = format!("http://{}/{}", addr, req.path.trim_start_matches('/')); + tracing::info!("Handle http request on url: {:?} start", url); let method = http::Method::from_str(req.method.as_str()).map_err(|_| Error::InvalidMethod)?; let headers_map: HashMap = req.headers.iter().cloned().collect(); @@ -152,10 +194,11 @@ async fn handle_http_request(addr: SocketAddr, req: &HttpRequest) -> Result>, @@ -23,12 +29,12 @@ pub struct Tunnel { listener: Option>, } +/// Listener of Tcp Tunnel, contains a mpsc channel pub struct TunnelListener { tid: TunnelId, local_stream: TcpStream, remote_stream_tx: mpsc::Sender, remote_stream_rx: mpsc::Receiver, - processor: Arc, peer_did: Did, cancel_token: CancellationToken, } @@ -51,6 +57,7 @@ impl Drop for Tunnel { } impl Tunnel { + /// Create a new tunnel with a given tunnel Id pub fn new(tid: TunnelId) -> Self { Self { tid, @@ -60,6 +67,7 @@ impl Tunnel { } } + /// Send bytes to tunnel via channel pub async fn send(&self, bytes: Bytes) { if let Some(ref tx) = self.remote_stream_tx { let _ = tx.send(bytes).await; @@ -68,20 +76,23 @@ impl Tunnel { } } + /// Start listen a local stream, this function will spawn a thread which + /// listening the inbound messages pub async fn listen( &mut self, + provider: Arc, local_stream: TcpStream, - processor: Arc, peer_did: Did, ) { if self.listener.is_some() { return; } - - let mut listener = TunnelListener::new(self.tid, local_stream, processor, peer_did).await; + let provider = provider.clone(); + let mut listener = TunnelListener::new(self.tid, local_stream, peer_did).await; let listener_cancel_token = listener.cancel_token(); let remote_stream_tx = listener.remote_stream_tx.clone(); - let listener_handler = tokio::spawn(Box::pin(async move { listener.listen().await })); + let listener_handler = + tokio::spawn(Box::pin(async move { listener.listen(provider).await })); self.remote_stream_tx = Some(remote_stream_tx); self.listener = Some(listener_handler); @@ -90,19 +101,14 @@ impl Tunnel { } impl TunnelListener { - async fn new( - tid: TunnelId, - local_stream: TcpStream, - processor: Arc, - peer_did: Did, - ) -> Self { + /// Create a new listener instance with TcpStream, tunnel id, and did of a target peer + async fn new(tid: TunnelId, local_stream: TcpStream, peer_did: Did) -> Self { let (remote_stream_tx, remote_stream_rx) = mpsc::channel(1024); Self { tid, local_stream, remote_stream_tx, remote_stream_rx, - processor, peer_did, cancel_token: CancellationToken::new(), } @@ -112,7 +118,7 @@ impl TunnelListener { self.cancel_token.clone() } - async fn listen(&mut self) { + async fn listen(&mut self, provider: Arc) { let (mut local_read, mut local_write) = self.local_stream.split(); let listen_local = async { @@ -126,18 +132,25 @@ impl TunnelListener { Err(e) => { break e.kind().into(); } - Ok(n) if n == 0 => { + Ok(0) => { break TunnelDefeat::ConnectionClosed; } Ok(n) => { let body = Bytes::copy_from_slice(&buf[..n]); - let msg = ServerMessage::TcpPackage { + let msg = ServiceMessage::TcpPackage { tid: self.tid, body, }; - if let Err(e) = self - .processor - .send_backend_message(self.peer_did, msg) + + let backend_message: BackendMessage = msg.into(); + let params: Params = BackendMessageParams { + did: self.peer_did, + data: backend_message, + } + .try_into() + .expect("Failed on cover backend message to rpc Params"); + if let Err(e) = provider + .request("sendBackendMessage".to_string(), params, None) .await { tracing::error!("Send TcpPackage message failed: {e:?}"); @@ -166,26 +179,31 @@ impl TunnelListener { tokio::select! { defeat = listen_local => { tracing::info!("Local stream closed: {defeat:?}"); - let msg = ServerMessage::TcpClose { + let msg = ServiceMessage::TcpClose { tid: self.tid, reason: defeat, }; - if let Err(e) =self.processor.send_backend_message(self.peer_did, msg).await { + let backend_message: BackendMessage = msg.into(); + let params: Params = BackendMessageParams{did: self.peer_did, data: backend_message}.try_into().expect("Failed to cover backend message to rpc params"); + if let Err(e) = provider.request("sendBackendMessage".to_string(), params, None).await { tracing::error!("Send TcpClose message failed: {e:?}"); } }, defeat = listen_remote => { tracing::info!("Remote stream closed: {defeat:?}"); - let msg = ServerMessage::TcpClose { + let msg = ServiceMessage::TcpClose { tid: self.tid, reason: defeat, }; - let _ = self.processor.send_backend_message(self.peer_did, msg).await; + let backend_message: BackendMessage = msg.into(); + let params: Params = BackendMessageParams{did: self.peer_did, data: backend_message}.try_into().expect("Failed to cover backend message to rpc params"); + let _ = provider.request("sendBackendMessage".to_string(), params, None).await; } } } } +/// This function handle a tcp request with timeout pub async fn tcp_connect_with_timeout( addr: SocketAddr, request_timeout_s: u64, diff --git a/node/src/backend/types.rs b/node/src/backend/types.rs index 4167490af..84aac24a3 100644 --- a/node/src/backend/types.rs +++ b/node/src/backend/types.rs @@ -1,8 +1,8 @@ #![warn(missing_docs)] //! Backend Message Types. - use std::io::ErrorKind as IOErrorKind; +use std::sync::Arc; use bytes::Bytes; use rings_core::message::MessagePayload; @@ -10,6 +10,7 @@ use serde::Deserialize; use serde::Serialize; use crate::error::Result; +use crate::provider::Provider; /// TunnelId type, use uuid. pub type TunnelId = uuid::Uuid; @@ -21,14 +22,14 @@ pub enum BackendMessage { /// extension message Extension(Bytes), /// server message - ServerMessage(ServerMessage), + ServiceMessage(ServiceMessage), /// Plain text PlainText(String), } -/// ServerMessage +/// ServiceMessage #[derive(Debug, Clone, Deserialize, Serialize)] -pub enum ServerMessage { +pub enum ServiceMessage { /// Tunnel Open TcpDial { /// Tunnel Id @@ -82,6 +83,8 @@ pub enum TunnelDefeat { /// HttpRequest #[derive(Debug, Clone, Deserialize, Serialize)] pub struct HttpRequest { + /// Request Id + pub rid: Option, /// Service name pub service: String, /// Method @@ -97,6 +100,8 @@ pub struct HttpRequest { /// HttpResponse #[derive(Debug, Clone, Deserialize, Serialize)] pub struct HttpResponse { + /// Request Id + pub rid: Option, /// Status pub status: u16, /// Headers @@ -105,29 +110,42 @@ pub struct HttpResponse { pub body: Option, } -/// IntoBackendMessage trait -pub trait IntoBackendMessage { - /// into_backend_message - fn into_backend_message(self) -> BackendMessage; -} - /// MessageEndpoint trait #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] pub trait MessageEndpoint { /// handle_message - async fn handle_message(&self, ctx: &MessagePayload, data: &T) -> Result<()>; + async fn on_message( + &self, + provider: Arc, + ctx: &MessagePayload, + data: &T, + ) -> Result<()>; } -impl IntoBackendMessage for BackendMessage { - fn into_backend_message(self) -> BackendMessage { - self +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +impl MessageEndpoint + for Vec + Send + Sync>> +{ + async fn on_message( + &self, + provider: Arc, + ctx: &MessagePayload, + data: &BackendMessage, + ) -> Result<()> { + for endpoint in self { + if let Err(e) = endpoint.on_message(provider.clone(), ctx, data).await { + tracing::error!("Failed to handle message, {:?}", e) + } + } + Ok(()) } } -impl IntoBackendMessage for ServerMessage { - fn into_backend_message(self) -> BackendMessage { - BackendMessage::ServerMessage(self) +impl From for BackendMessage { + fn from(val: ServiceMessage) -> Self { + BackendMessage::ServiceMessage(val) } } diff --git a/node/src/jsonrpc/handler.rs b/node/src/jsonrpc/handler.rs index c15393c59..167e3035c 100644 --- a/node/src/jsonrpc/handler.rs +++ b/node/src/jsonrpc/handler.rs @@ -143,6 +143,10 @@ pub fn methods() -> Vec<(Method, MethodFnBox)> { (Method::Disconnect, pin!(server::close_connection)), (Method::SendTo, pin!(server::send_raw_message)), (Method::SendCustomMessage, pin!(server::send_custom_message)), + ( + Method::SendBackendMessage, + pin!(server::send_backend_message), + ), ( Method::PublishMessageToTopic, pin!(server::publish_message_to_topic), diff --git a/node/src/jsonrpc/server.rs b/node/src/jsonrpc/server.rs index 73226c855..a37e90ab6 100644 --- a/node/src/jsonrpc/server.rs +++ b/node/src/jsonrpc/server.rs @@ -10,6 +10,7 @@ use rings_core::swarm::impls::ConnectionHandshake; use rings_transport::core::transport::ConnectionInterface; use serde_json::Value; +use crate::backend::types::BackendMessage; use crate::error::Error as ServerError; use crate::prelude::jsonrpc_core::Error; use crate::prelude::jsonrpc_core::ErrorCode; @@ -61,6 +62,48 @@ impl From> for RpcMeta { } } +/// Params for method `BackendMessage` +pub struct BackendMessageParams { + /// destination did + pub did: Did, + /// data of backend message + pub data: BackendMessage, +} + +impl TryFrom for BackendMessageParams { + type Error = Error; + fn try_from(params: Params) -> Result { + let params: Vec = params.parse()?; + let did = params + .get(0) + .ok_or_else(|| Error::new(ErrorCode::InvalidParams))? + .as_str() + .ok_or_else(|| Error::new(ErrorCode::InvalidParams))?; + let did = Did::from_str(did).map_err(|_| Error::new(ErrorCode::InvalidParams))?; + + let data = params + .get(1) + .ok_or_else(|| Error::new(ErrorCode::InvalidParams))? + .as_str() + .ok_or_else(|| Error::new(ErrorCode::InvalidParams))?; + let data: BackendMessage = + serde_json::from_str(data).map_err(|_| Error::new(ErrorCode::InvalidParams))?; + Ok(Self { did, data }) + } +} + +impl TryInto for BackendMessageParams { + type Error = Error; + fn try_into(self) -> Result { + let data: String = + serde_json::to_string(&self.data).map_err(|_| Error::new(ErrorCode::InvalidParams))?; + Ok(Params::Array(vec![ + serde_json::Value::String(self.did.to_string()), + serde_json::Value::String(data), + ])) + } +} + pub(crate) async fn node_info(_: Params, meta: RpcMeta) -> Result { let node_info = meta .processor @@ -290,6 +333,26 @@ pub(crate) async fn send_custom_message(params: Params, meta: RpcMeta) -> Result ) } +/// send custom message to specifice destination +/// * Params +/// - destination: destination did +/// - data: base64 of [u8] +pub(crate) async fn send_backend_message(params: Params, meta: RpcMeta) -> Result { + meta.require_authed()?; + let bm_params: BackendMessageParams = params.try_into()?; + let tx_id = meta + .processor + .send_backend_message(bm_params.did, bm_params.data) + .await?; + tracing::info!("Send Response message"); + Ok( + serde_json::to_value(rings_rpc::response::SendMessageResponse::from( + tx_id.to_string(), + )) + .unwrap(), + ) +} + pub(crate) async fn publish_message_to_topic(params: Params, meta: RpcMeta) -> Result { meta.require_authed()?; let params: Vec = params.parse()?; diff --git a/node/src/lib.rs b/node/src/lib.rs index 95bab5f52..cfa13fe20 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -1,7 +1,6 @@ #![doc = include_str!("../README.md")] pub mod backend; -pub mod client; pub mod consts; pub mod error; pub mod jsonrpc; @@ -11,6 +10,7 @@ pub mod measure; pub mod native; pub mod prelude; pub mod processor; +pub mod provider; pub mod seed; #[cfg(test)] mod tests; diff --git a/node/src/native/cli.rs b/node/src/native/cli.rs index d3c32d7d6..502f9bdd4 100644 --- a/node/src/native/cli.rs +++ b/node/src/native/cli.rs @@ -28,7 +28,7 @@ use serde_json::json; use crate::backend::types::BackendMessage; use crate::backend::types::HttpRequest; -use crate::backend::types::ServerMessage; +use crate::backend::types::ServiceMessage; use crate::prelude::rings_core::inspect::SwarmInspect; use crate::prelude::rings_core::session::SessionSk; use crate::prelude::rings_rpc::client::Client as RpcClient; @@ -162,6 +162,7 @@ impl Client { path: &str, headers: Vec<(String, String)>, body: Option>, + rid: Option, ) -> Output<()> { let req = HttpRequest { service: service.to_string(), @@ -169,9 +170,10 @@ impl Client { path: path.to_string(), headers, body, + rid, }; - let msg = BackendMessage::ServerMessage(ServerMessage::HttpRequest(req)); + let msg = BackendMessage::ServiceMessage(ServiceMessage::HttpRequest(req)); let data = bincode::serialize(&msg).map_err(|e| { anyhow::anyhow!("Failed to serialize HttpRequest message to binary format: {e}",) diff --git a/node/src/native/config.rs b/node/src/native/config.rs index a51d0ac31..e029c13f7 100644 --- a/node/src/native/config.rs +++ b/node/src/native/config.rs @@ -7,7 +7,7 @@ use serde::Deserialize; use serde::Serialize; use crate::backend::native::extension::ExtensionConfig; -use crate::backend::native::server::ServiceConfig; +use crate::backend::native::service::ServiceConfig; use crate::backend::native::BackendConfig; use crate::error::Error; use crate::error::Result; diff --git a/node/src/processor.rs b/node/src/processor.rs index 0f4aa2ca7..272b50d96 100644 --- a/node/src/processor.rs +++ b/node/src/processor.rs @@ -13,7 +13,7 @@ use rings_transport::core::transport::ConnectionInterface; use serde::Deserialize; use serde::Serialize; -use crate::backend::types::IntoBackendMessage; +use crate::backend::types::BackendMessage; use crate::consts::DATA_REDUNDANT; use crate::error::Error; use crate::error::Result; @@ -43,7 +43,7 @@ use crate::prelude::SessionSk; /// ProcessorConfig is usually serialized as json or yaml. /// There is a `from_config` method in [ProcessorBuilder] used to initialize the Builder with a serialized ProcessorConfig. -#[derive(Clone)] +#[derive(Clone, Debug)] #[wasm_export] pub struct ProcessorConfig { /// ICE servers for webrtc @@ -262,7 +262,6 @@ impl ProcessorBuilder { if let Some(measure) = self.measure { swarm_builder = swarm_builder.measure(measure); } - let swarm = Arc::new(swarm_builder.build()); let stabilization = Arc::new(Stabilization::new(swarm.clone(), self.stabilize_timeout)); @@ -384,9 +383,9 @@ impl Processor { /// Send custom message to a did. pub async fn send_message(&self, destination: &str, msg: &[u8]) -> Result { tracing::info!( - "send_message, destination: {}, text: {:?}", + "send_message, destination: {}, message size: {:?}", destination, - msg, + msg.len(), ); let destination = Did::from_str(destination).map_err(|_| Error::InvalidDid)?; @@ -402,9 +401,8 @@ impl Processor { pub async fn send_backend_message( &self, destination: Did, - msg: impl IntoBackendMessage, + backend_msg: BackendMessage, ) -> Result { - let backend_msg = msg.into_backend_message(); let msg_bytes = bincode::serialize(&backend_msg).map_err(|_| Error::EncodeError)?; self.send_message(&destination.to_string(), &msg_bytes) .await diff --git a/node/src/client/browser/mod.rs b/node/src/provider/browser/mod.rs similarity index 94% rename from node/src/client/browser/mod.rs rename to node/src/provider/browser/mod.rs index 386d93e88..917e3fedb 100644 --- a/node/src/client/browser/mod.rs +++ b/node/src/provider/browser/mod.rs @@ -2,13 +2,13 @@ #![allow(clippy::unused_unit)] #![allow(non_snake_case, non_upper_case_globals)] //! rings-node browser support. -pub mod client; +pub mod provider; pub mod utils; use std::str::FromStr; use wasm_bindgen; -pub use self::client::*; +pub use self::provider::*; use crate::logging::browser::init_logging; use crate::logging::set_panic_hook; use crate::logging::LogLevel; diff --git a/node/src/client/browser/client.rs b/node/src/provider/browser/provider.rs similarity index 89% rename from node/src/client/browser/client.rs rename to node/src/provider/browser/provider.rs index 6d90ea86d..a45269192 100644 --- a/node/src/client/browser/client.rs +++ b/node/src/provider/browser/provider.rs @@ -1,5 +1,5 @@ #![warn(missing_docs)] -//! Browser Client implementation +//! Browser Provider implementation #![allow(non_snake_case, non_upper_case_globals, clippy::ptr_offset_with_cast)] use std::convert::TryFrom; use std::future::Future; @@ -26,14 +26,15 @@ use wasm_bindgen_futures; use wasm_bindgen_futures::future_to_promise; use wasm_bindgen_futures::JsFuture; -use crate::backend::browser::Backend; +use crate::backend::browser::BackendContext; use crate::backend::types::BackendMessage; use crate::backend::types::HttpRequest; -use crate::backend::types::ServerMessage; -use crate::client::AsyncSigner; -use crate::client::Client; -use crate::client::Signer; +use crate::backend::types::ServiceMessage; +use crate::backend::Backend; use crate::processor::ProcessorConfig; +use crate::provider::AsyncSigner; +use crate::provider::Provider; +use crate::provider::Signer; /// AddressType enum contains `DEFAULT` and `ED25519`. #[wasm_export] @@ -68,8 +69,8 @@ impl TryFrom<&Peer> for JsValue { } #[wasm_export] -impl Client { - /// Create new instance of Client, return Promise +impl Provider { + /// Create new instance of Provider, return Promise /// Ice_servers should obey forrmat: "[turn|strun]://
:;..." /// Account is hex string /// Account should format as same as account_type declared @@ -84,7 +85,7 @@ impl Client { account: String, account_type: String, signer: js_sys::Function, - backend: Option, + backend_context: Option, ) -> js_sys::Promise { fn wrapped_signer(signer: js_sys::Function) -> AsyncSigner { Box::new( @@ -109,7 +110,7 @@ impl Client { future_to_promise(async move { let signer = wrapped_signer(signer); - let client = Client::new_client_internal( + let provider = Provider::new_provider_internal( ice_servers, stabilize_timeout, account, @@ -117,30 +118,31 @@ impl Client { Signer::Async(Box::new(signer)), ) .await?; - if let Some(cb) = backend { - client - .set_swarm_callback(Arc::new(cb)) + if let Some(cb) = backend_context { + let backend: Backend = Backend::new(Arc::new(provider.clone()), Box::new(cb)); + provider + .set_swarm_callback(Arc::new(backend)) .expect("Failed on set swarm callback"); } - Ok(JsValue::from(client)) + Ok(JsValue::from(provider)) }) } - /// Create new client instance with serialized config (yaml/json) - pub fn new_client_with_serialized_config( + /// Create new provider instance with serialized config (yaml/json) + pub fn new_provider_with_serialized_config( config: String, - backend: Option, + backend: Option, ) -> js_sys::Promise { let cfg: ProcessorConfig = serde_yaml::from_str(&config).unwrap(); - Self::new_client_with_config(cfg, backend) + Self::new_provider_with_config(cfg, backend) } - /// Create a new client instance. - pub fn new_client_with_config( + /// Create a new provider instance. + pub fn new_provider_with_config( config: ProcessorConfig, - backend: Option, + backend: Option, ) -> js_sys::Promise { - Self::new_client_with_storage(config, backend, "rings-node".to_string()) + Self::new_provider_with_storage(config, backend, "rings-node".to_string()) } /// get self web3 address @@ -149,22 +151,23 @@ impl Client { self.processor.did().to_string() } - /// create new unsigned Client - pub fn new_client_with_storage( + /// create new unsigned Provider + pub fn new_provider_with_storage( config: ProcessorConfig, - backend: Option, + backend_context: Option, storage_name: String, ) -> js_sys::Promise { future_to_promise(async move { - let client = Self::new_client_with_storage_internal(config, storage_name) + let provider = Self::new_provider_with_storage_internal(config, storage_name) .await .map_err(JsError::from)?; - if let Some(cb) = backend { - client - .set_swarm_callback(Arc::new(cb)) + if let Some(cb) = backend_context { + let backend: Backend = Backend::new(Arc::new(provider.clone()), Box::new(cb)); + provider + .set_swarm_callback(Arc::new(backend)) .expect("Failed on set swarm callback"); } - Ok(JsValue::from(client)) + Ok(JsValue::from(provider)) }) } @@ -229,12 +232,12 @@ impl Client { /// connect peer with web3 address, and wait for connection channel connected /// example: /// ```typescript - /// const client1 = new Client() - /// const client2 = new Client() - /// const client3 = new Client() - /// await create_connection(client1, client2); - /// await create_connection(client2, client3); - /// await client1.connect_with_did(client3.address()) + /// const provider1 = new Provider() + /// const provider2 = new Provider() + /// const provider3 = new Provider() + /// await create_connection(provider1, provider2); + /// await create_connection(provider2, provider3); + /// await provider1.connect_with_did(provider3.address()) /// ``` pub fn connect_with_address( &self, @@ -416,6 +419,7 @@ impl Client { path: String, headers: JsValue, body: Option, + rid: Option, ) -> js_sys::Promise { let p = self.processor.clone(); @@ -459,10 +463,11 @@ impl Client { path, headers, body, + rid, }; let tx_id = p - .send_backend_message(destination, ServerMessage::HttpRequest(req)) + .send_backend_message(destination, ServiceMessage::HttpRequest(req).into()) .await .map_err(JsError::from)?; diff --git a/node/src/client/browser/utils.rs b/node/src/provider/browser/utils.rs similarity index 100% rename from node/src/client/browser/utils.rs rename to node/src/provider/browser/utils.rs diff --git a/node/src/client/ffi.rs b/node/src/provider/ffi.rs similarity index 75% rename from node/src/client/ffi.rs rename to node/src/provider/ffi.rs index 630c5f9d0..cf1c400d0 100644 --- a/node/src/client/ffi.rs +++ b/node/src/provider/ffi.rs @@ -1,7 +1,7 @@ #![warn(missing_docs)] -//! ffi Client implementation +//! ffi Provider implementation //! ======================= -//! This module allows developers to integrate the client with various programming languages, +//! This module allows developers to integrate the provider with various programming languages, //! such as C, C++, Golang, Python, and Node.js. //! //! The module provides functionality for integrating Rust-based systems with external @@ -10,8 +10,8 @@ //! this Rust module. //! //! Primary Features: -//! 1. **Client Representation for FFI**: The module defines `ClientPtr`, a struct that -//! serves as a C-compatible representation of the `Client` type, allowing for interaction +//! 1. **Provider Representation for FFI**: The module defines `ProviderPtr`, a struct that +//! serves as a C-compatible representation of the `Provider` type, allowing for interaction //! with other languages through raw pointers. It abstracts the reference counting of //! internal `Arc` components, ensuring memory safety across the boundary. //! @@ -19,8 +19,8 @@ //! for message callback functionalities between Rust and other languages. It can hold //! function pointers to C-compatible functions that handle custom and built-in messages. //! -//! 3. **Functions for Client Interaction**: Several extern "C" functions, such as `new_client_with_callback`, -//! `listen`, and `async_listen`, facilitate the creation of clients, listening to messages, +//! 3. **Functions for Provider Interaction**: Several extern "C" functions, such as `new_provider_with_callback`, +//! `listen`, and `async_listen`, facilitate the creation of providers, listening to messages, //! and making internal requests. They make the module's core functionalities accessible from C //! or other languages supporting FFI. //! @@ -72,8 +72,8 @@ //! rings_node.init_logging(rings_node.Debug) //! callback = rings_node.new_callback(custom_msg_callback, builtin_msg_callback) //! -//! def create_client(signer, acc): -//! client = rings_node.new_client_with_callback( +//! def create_provider(signer, acc): +//! provider = rings_node.new_provider_with_callback( //! "stun://stun.l.google.com".encode(), //! 10, //! acc.address.encode(), @@ -81,12 +81,12 @@ //! signer, //! ffi.addressof(callback) //! ) -//! return client +//! return provider //! //! if __name__ == "__main__": -//! client = create_client(signer, acc) -//! rings_node.listen(ffi.addressof(client)) -//! print(client) +//! provider = create_provider(signer, acc) +//! rings_node.listen(ffi.addressof(provider)) +//! print(provider) //! ``` //! //! Note: Since the above code is executed in a single-process environment of Python, @@ -103,22 +103,22 @@ use rings_core::async_trait; use rings_core::message::MessagePayload; use rings_core::swarm::callback::SwarmCallback; -use super::Client; +use super::Provider; use super::Signer; use crate::error::Error; use crate::error::Result; use crate::jsonrpc::HandlerType; use crate::processor::Processor; -/// A structure to represent the Client in a C-compatible format. +/// A structure to represent the Provider in a C-compatible format. /// This is necessary as using Arc directly in FFI can be unsafe. #[repr(C)] -pub struct ClientPtr { +pub struct ProviderPtr { processor: *const Processor, handler: *const HandlerType, } -impl ClientPtr { +impl ProviderPtr { /// Increases the reference count for the associated Arcs. /// # Safety /// This function unsafely increments the reference count of the Arcs. @@ -136,7 +136,7 @@ impl ClientPtr { } } -impl std::ops::Drop for ClientPtr { +impl std::ops::Drop for ProviderPtr { fn drop(&mut self) { tracing::debug!("Ptr dropped!"); unsafe { @@ -145,44 +145,44 @@ impl std::ops::Drop for ClientPtr { } } -impl Client { - /// Converts a raw ClientPtr pointer to a Rust Client type. +impl Provider { + /// Converts a raw ProviderPtr pointer to a Rust Provider type. /// # Safety /// Unsafe due to the dereferencing of the raw pointer. - fn from_raw(ptr: *const ClientPtr) -> Result { + fn from_raw(ptr: *const ProviderPtr) -> Result { // Check point here. if ptr.is_null() { return Err(Error::FFINulPtrError); } - let client_ptr: &ClientPtr = unsafe { &*ptr }; - let client: Client = client_ptr.into(); - Ok(client) + let provider_ptr: &ProviderPtr = unsafe { &*ptr }; + let provider: Provider = provider_ptr.into(); + Ok(provider) } } -impl From<&ClientPtr> for Client { - /// Converts a reference to a ClientPtr to a Client type. +impl From<&ProviderPtr> for Provider { + /// Converts a reference to a ProviderPtr to a Provider type. /// Note that the conversion from raw pointers to Arcs does not modify the reference count. /// # Safety /// Unsafe due to the conversion from raw pointers to Arcs. - fn from(ptr: &ClientPtr) -> Client { - tracing::debug!("FFI: Client from Ptr!"); + fn from(ptr: &ProviderPtr) -> Provider { + tracing::debug!("FFI: Provider from Ptr!"); let processor = unsafe { Arc::::from_raw(ptr.processor as *const Processor) }; let handler = unsafe { Arc::::from_raw(ptr.handler as *const HandlerType) }; Self { processor, handler } } } -impl From<&Client> for ClientPtr { - /// Cast a Client into ClientPtr - fn from(client: &Client) -> ClientPtr { - tracing::debug!("FFI: Client into Ptr!"); +impl From<&Provider> for ProviderPtr { + /// Cast a Provider into ProviderPtr + fn from(provider: &Provider) -> ProviderPtr { + tracing::debug!("FFI: Provider into Ptr!"); // Clone the Arcs, which increases the ref count, // then turn them into raw pointers. - let processor_ptr = Arc::into_raw(client.processor.clone()); - let handler_ptr = Arc::into_raw(client.handler.clone()); - ClientPtr { + let processor_ptr = Arc::into_raw(provider.processor.clone()); + let handler_ptr = Arc::into_raw(provider.handler.clone()); + ProviderPtr { processor: processor_ptr, handler: handler_ptr, } @@ -223,43 +223,43 @@ pub extern "C" fn new_callback( /// Start message listening and stabilization /// # Safety -/// Listen function accept a ClientPtr and will unsafety cast it into Arc based Client +/// Listen function accept a ProviderPtr and will unsafety cast it into Arc based Provider #[no_mangle] -pub extern "C" fn listen(client_ptr: *const ClientPtr) { - let client: Client = Client::from_raw(client_ptr).expect("Client ptr is invalid"); - executor::block_on(client.processor.listen()); +pub extern "C" fn listen(provider_ptr: *const ProviderPtr) { + let provider: Provider = Provider::from_raw(provider_ptr).expect("Provider ptr is invalid"); + executor::block_on(provider.processor.listen()); } /// Start message listening and stabilization /// This function will launch listener in a new thread /// # Safety -/// Listen function accept a ClientPtr and will unsafety cast it into Arc based Client +/// Listen function accept a ProviderPtr and will unsafety cast it into Arc based Provider #[no_mangle] -pub extern "C" fn async_listen(client_ptr: *const ClientPtr) { - let client: Client = Client::from_raw(client_ptr).expect("Client ptr is invalid"); +pub extern "C" fn async_listen(provider_ptr: *const ProviderPtr) { + let provider: Provider = Provider::from_raw(provider_ptr).expect("Provider ptr is invalid"); std::thread::spawn(move || { - executor::block_on(client.processor.listen()); + executor::block_on(provider.processor.listen()); }); } /// Request internal rpc api /// # Safety /// -/// * This function accept a ClientPtr and will unsafety cast it into Arc based Client +/// * This function accept a ProviderPtr and will unsafety cast it into Arc based Provider /// * This function cast CStr into Str #[no_mangle] pub extern "C" fn request( - client_ptr: *const ClientPtr, + provider_ptr: *const ProviderPtr, method: *const c_char, params: *const c_char, ) -> *const c_char { match (|| -> Result<*const c_char> { - let client: Client = Client::from_raw(client_ptr)?; + let provider: Provider = Provider::from_raw(provider_ptr)?; let method = c_char_to_string(method)?; let params = c_char_to_string(params)?; let params = serde_json::from_str(¶ms)?; let ret: String = serde_json::to_string(&executor::block_on( - client.request_internal(method, params, None), + provider.request_internal(method, params, None), )?)?; let c_ret = CString::new(ret)?; Ok(c_ret.as_ptr()) @@ -271,19 +271,19 @@ pub extern "C" fn request( } } -/// Craft a new Client with signer and callback ptr +/// Craft a new Provider with signer and callback ptr /// # Safety /// /// * This function cast CStr into Str #[no_mangle] -pub unsafe extern "C" fn new_client_with_callback( +pub unsafe extern "C" fn new_provider_with_callback( ice_server: *const c_char, stabilize_timeout: u32, account: *const c_char, account_type: *const c_char, signer: extern "C" fn(*const c_char, *mut c_char) -> (), callback_ptr: *const SwarmCallbackInstanceFFI, -) -> ClientPtr { +) -> ProviderPtr { fn wrapped_signer( signer: extern "C" fn(*const c_char, *mut c_char) -> (), ) -> impl Fn(String) -> Vec { @@ -304,12 +304,12 @@ pub unsafe extern "C" fn new_client_with_callback( } } - let client: Client = match (|| -> Result { + let provider: Provider = match (|| -> Result { let ice: String = c_char_to_string(ice_server)?; let acc: String = c_char_to_string(account)?; let acc_ty: String = c_char_to_string(account_type)?; - executor::block_on(Client::new_client_internal( + executor::block_on(Provider::new_provider_internal( ice, stabilize_timeout as usize, acc, @@ -319,17 +319,17 @@ pub unsafe extern "C" fn new_client_with_callback( })() { Ok(r) => r, Err(e) => { - panic!("Failed on create new client {:#}", e) + panic!("Failed on create new provider {:#}", e) } }; let callback: &SwarmCallbackInstanceFFI = unsafe { &*callback_ptr }; - client + provider .set_swarm_callback(Arc::new(callback.clone())) .expect("Failed to set callback"); - let ret: ClientPtr = (&client).into(); - // When leaving the closure, the origin Client ref will be release, + let ret: ProviderPtr = (&provider).into(); + // When leaving the closure, the origin Provider ref will be release, // So we manually increase the count here unsafe { ret.increase_strong_count(); diff --git a/node/src/client/mod.rs b/node/src/provider/mod.rs similarity index 72% rename from node/src/client/mod.rs rename to node/src/provider/mod.rs index d3cebe6a7..8850148de 100644 --- a/node/src/client/mod.rs +++ b/node/src/provider/mod.rs @@ -1,5 +1,5 @@ #![warn(missing_docs)] -//! General Client, this module provide Client implementation for FFI and WASM +//! General Provider, this module provide Provider implementation for FFI and WASM use std::future::Future; use std::pin::Pin; @@ -28,14 +28,14 @@ pub mod browser; #[cfg(feature = "ffi")] pub mod ffi; -/// General Client, which holding reference of Processor -/// Client should be obey memory layout of CLang -/// Client should be export for wasm-bindgen +/// General Provider, which holding reference of Processor +/// Provider should be obey memory layout of CLang +/// Provider should be export for wasm-bindgen #[derive(Clone)] #[allow(dead_code)] #[repr(C)] #[wasm_export] -pub struct Client { +pub struct Provider { processor: Arc, handler: Arc, } @@ -58,12 +58,21 @@ pub enum Signer { } #[allow(dead_code)] -impl Client { - /// Create a client instance with storage name - pub(crate) async fn new_client_with_storage_internal( +impl Provider { + /// Create provider from processor directly + pub fn from_processor(processor: Arc) -> Self { + let mut handler: HandlerType = processor.clone().into(); + handler.build(); + Self { + processor, + handler: handler.into(), + } + } + /// Create a provider instance with storage name + pub(crate) async fn new_provider_with_storage_internal( config: ProcessorConfig, storage_name: String, - ) -> Result { + ) -> Result { let storage_path = storage_name.as_str(); let measure_path = [storage_path, "measure"].join("/"); @@ -85,23 +94,23 @@ impl Client { let mut handler: HandlerType = processor.clone().into(); handler.build(); - Ok(Client { + Ok(Provider { processor, handler: handler.into(), }) } - /// Create a client instance with storage name and serialized config string - /// This function is useful for creating a client with config file (yaml and json). - pub(crate) async fn new_client_with_storage_and_serialized_config_internal( + /// Create a provider instance with storage name and serialized config string + /// This function is useful for creating a provider with config file (yaml and json). + pub(crate) async fn new_provider_with_storage_and_serialized_config_internal( config: String, storage_name: String, - ) -> Result { + ) -> Result { let config: ProcessorConfig = serde_yaml::from_str(&config)?; - Self::new_client_with_storage_internal(config, storage_name).await + Self::new_provider_with_storage_internal(config, storage_name).await } - /// Create a new client instanice with everything in detail + /// Create a new provider instanice with everything in detail /// Ice_servers should obey forrmat: "[turn|strun]://
:;..." /// Account is hex string /// Account should format as same as account_type declared @@ -109,13 +118,13 @@ impl Client { /// please check [rings_core::ecc] /// Signer should accept a String and returns bytes. /// Signer should function as same as account_type declared, Eg: eip191 or secp256k1 or ed25519. - pub(crate) async fn new_client_internal( + pub(crate) async fn new_provider_internal( ice_servers: String, stabilize_timeout: usize, account: String, account_type: String, signer: Signer, - ) -> Result { + ) -> Result { let mut sk_builder = SessionSkBuilder::new(account, account_type); let proof = sk_builder.unsigned_proof(); let sig = match signer { @@ -125,7 +134,7 @@ impl Client { sk_builder = sk_builder.set_session_sig(sig.to_vec()); let session_sk = sk_builder.build().map_err(Error::InternalError)?; let config = ProcessorConfig::new(ice_servers, session_sk, stabilize_timeout); - Self::new_client_with_storage_internal(config, "rings-node".to_string()).await + Self::new_provider_with_storage_internal(config, "rings-node".to_string()).await } pub(crate) fn set_swarm_callback(&self, callback: SharedSwarmCallback) -> Result<()> { @@ -137,7 +146,7 @@ impl Client { /// Request local rpc interface /// the internal rpc interface is provide by rings_rpc - pub(crate) async fn request_internal( + pub async fn request_internal( &self, method: String, params: Params, @@ -161,3 +170,16 @@ impl Client { .map_err(Error::InternalRpcError) } } + +#[cfg(feature = "node")] +impl Provider { + /// A request function implementation for native provider + pub async fn request( + &self, + method: String, + params: Params, + opt_id: Option, + ) -> Result { + self.request_internal(method, params, opt_id).await + } +} diff --git a/node/src/tests/wasm/browser.rs b/node/src/tests/wasm/browser.rs index c796451d8..104ebf3ba 100644 --- a/node/src/tests/wasm/browser.rs +++ b/node/src/tests/wasm/browser.rs @@ -2,9 +2,6 @@ use wasm_bindgen::JsValue; use wasm_bindgen_futures::JsFuture; use wasm_bindgen_test::*; -use crate::client::browser; -use crate::client::browser::Peer; -use crate::client::Client; use crate::prelude::jsonrpc_core::types::response::Output; use crate::prelude::jsonrpc_core::types::Value; use crate::prelude::rings_core::prelude::uuid; @@ -12,10 +9,13 @@ use crate::prelude::rings_core::utils; use crate::prelude::rings_core::utils::js_value; use crate::prelude::*; use crate::processor::ProcessorConfig; +use crate::provider::browser; +use crate::provider::browser::Peer; +use crate::provider::Provider; wasm_bindgen_test_configure!(run_in_browser); -async fn new_client() -> (Client, String) { +async fn new_provider() -> (Provider, String) { let key = SecretKey::random(); let sm = SessionSk::new_with_seckey(&key).unwrap(); @@ -27,17 +27,17 @@ async fn new_client() -> (Client, String) { .unwrap(); let storage_name = uuid::Uuid::new_v4().to_string(); - let client: Client = Client::new_client_with_storage_and_serialized_config_internal( + let provider: Provider = Provider::new_provider_with_storage_and_serialized_config_internal( config, storage_name.clone(), ) .await .unwrap(); - (client, storage_name) + (provider, storage_name) } -async fn get_peers(client: &Client) -> Vec { - let peers = JsFuture::from(client.list_peers()).await.ok().unwrap(); +async fn get_peers(provider: &Provider) -> Vec { + let peers = JsFuture::from(provider.list_peers()).await.ok().unwrap(); let peers: js_sys::Array = peers.into(); let peers: Vec = peers .iter() @@ -46,16 +46,16 @@ async fn get_peers(client: &Client) -> Vec { peers } -async fn create_connection(client1: &Client, client2: &Client) { +async fn create_connection(provider1: &Provider, provider2: &Provider) { futures::try_join!( - JsFuture::from(client1.listen()), - JsFuture::from(client2.listen()), + JsFuture::from(provider1.listen()), + JsFuture::from(provider2.listen()), ) .unwrap(); - let address = JsValue::from_str(&client2.address()); + let address = JsValue::from_str(&provider2.address()); let req0 = js_sys::Array::of1(&address); - let offer_fut = JsFuture::from(client1.request("createOffer".to_string(), req0.into(), None)) + let offer_fut = JsFuture::from(provider1.request("createOffer".to_string(), req0.into(), None)) .await .unwrap(); @@ -72,9 +72,10 @@ async fn create_connection(client1: &Client, client2: &Client) { let js_offer = JsValue::from_str(&offer); let req1 = js_sys::Array::of1(&js_offer); - let answer_fut = JsFuture::from(client2.request("answerOffer".to_string(), req1.into(), None)) - .await - .unwrap(); + let answer_fut = + JsFuture::from(provider2.request("answerOffer".to_string(), req1.into(), None)) + .await + .unwrap(); let answer: String = match js_value::deserialize::(&answer_fut).unwrap() { Output::Success(ret) => { @@ -92,34 +93,34 @@ async fn create_connection(client1: &Client, client2: &Client) { let js_answer = JsValue::from_str(&answer); let req2 = js_sys::Array::of1(&js_answer); - let _ret = JsFuture::from(client1.request("acceptAnswer".to_string(), req2.into(), None)) + let _ret = JsFuture::from(provider1.request("acceptAnswer".to_string(), req2.into(), None)) .await .unwrap(); } #[wasm_bindgen_test] -async fn test_two_client_connect_and_list() { +async fn test_two_provider_connect_and_list() { // super::setup_log(); - let (client1, _storage1) = new_client().await; - let (client2, _storage2) = new_client().await; + let (provider1, _storage1) = new_provider().await; + let (provider2, _storage2) = new_provider().await; futures::try_join!( - JsFuture::from(client1.listen()), - JsFuture::from(client2.listen()), + JsFuture::from(provider1.listen()), + JsFuture::from(provider2.listen()), ) .unwrap(); - create_connection(&client1, &client2).await; + create_connection(&provider1, &provider2).await; console_log!("wait for register"); utils::js_utils::window_sleep(1000).await.unwrap(); - let peers = get_peers(&client1).await; + let peers = get_peers(&provider1).await; assert!(peers.len() == 1, "peers len should be 1"); let peer2 = peers.get(0).unwrap(); console_log!("get peer"); let peer2: Peer = js_value::deserialize( - &JsFuture::from(client1.get_peer(peer2.did.clone(), None)) + &JsFuture::from(provider1.get_peer(peer2.did.clone(), None)) .await .unwrap(), ) @@ -130,15 +131,15 @@ async fn test_two_client_connect_and_list() { peer2.state, ); - JsFuture::from(client1.disconnect(peer2.did.clone(), None)) + JsFuture::from(provider1.disconnect(peer2.did.clone(), None)) .await .unwrap(); - let peers = get_peers(&client1).await; + let peers = get_peers(&provider1).await; assert_eq!(peers.len(), 0); } #[wasm_bindgen_test] -async fn test_client_parse_params() { +async fn test_provider_parse_params() { let null_value = browser::utils::parse_params(JsValue::null()); assert!(null_value.is_ok(), "null_value is error"); match null_value { @@ -205,18 +206,18 @@ async fn test_get_address() { #[wasm_bindgen_test] async fn test_create_connection_via_local_rpc() { // super::setup_log(); - let (client1, _storage1) = new_client().await; - let (client2, _storage2) = new_client().await; + let (provider1, _storage1) = new_provider().await; + let (provider2, _storage2) = new_provider().await; futures::try_join!( - JsFuture::from(client1.listen()), - JsFuture::from(client2.listen()), + JsFuture::from(provider1.listen()), + JsFuture::from(provider2.listen()), ) .unwrap(); - let address = JsValue::from_str(&client2.address()); + let address = JsValue::from_str(&provider2.address()); let req0 = js_sys::Array::of1(&address); - let offer_fut = JsFuture::from(client1.request("createOffer".to_string(), req0.into(), None)) + let offer_fut = JsFuture::from(provider1.request("createOffer".to_string(), req0.into(), None)) .await .unwrap(); @@ -233,9 +234,10 @@ async fn test_create_connection_via_local_rpc() { let js_offer = JsValue::from_str(&offer); let req1 = js_sys::Array::of1(&js_offer); - let answer_fut = JsFuture::from(client2.request("answerOffer".to_string(), req1.into(), None)) - .await - .unwrap(); + let answer_fut = + JsFuture::from(provider2.request("answerOffer".to_string(), req1.into(), None)) + .await + .unwrap(); let answer: String = match js_value::deserialize::(&answer_fut).unwrap() { Output::Success(ret) => { @@ -253,7 +255,7 @@ async fn test_create_connection_via_local_rpc() { let js_answer = JsValue::from_str(&answer); let req2 = js_sys::Array::of1(&js_answer); - let _ret = JsFuture::from(client1.request("acceptAnswer".to_string(), req2.into(), None)) + let _ret = JsFuture::from(provider1.request("acceptAnswer".to_string(), req2.into(), None)) .await .unwrap(); } diff --git a/rpc/src/method.rs b/rpc/src/method.rs index 6e94cce18..45437c4e9 100644 --- a/rpc/src/method.rs +++ b/rpc/src/method.rs @@ -26,6 +26,8 @@ pub enum Method { Disconnect, /// SendCustomMessage, SendCustomMessage, + /// SendBackendMessage + SendBackendMessage, /// Append data to topic PublishMessageToTopic, /// Fetch data of topic @@ -54,6 +56,7 @@ impl Method { Method::Disconnect => "disconnect", Method::AcceptAnswer => "acceptAnswer", Method::SendCustomMessage => "sendCustomMessage", + Method::SendBackendMessage => "sendBackendMessage", Method::PublishMessageToTopic => "publishMessageToTopic", Method::FetchMessagesOfTopic => "fetchMessagesOfTopic", Method::RegisterService => "registerService", @@ -84,6 +87,7 @@ impl TryFrom<&str> for Method { "sendTo" => Self::SendTo, "disconnect" => Self::Disconnect, "acceptAnswer" => Self::AcceptAnswer, + "sendBackendMessage" => Self::SendBackendMessage, "sendCustomMessage" => Self::SendCustomMessage, "publishMessageToTopic" => Method::PublishMessageToTopic, "fetchMessagesOfTopic" => Method::FetchMessagesOfTopic, diff --git a/transport/src/connections/native_webrtc/mod.rs b/transport/src/connections/native_webrtc/mod.rs index 25be06572..bcbd904ae 100644 --- a/transport/src/connections/native_webrtc/mod.rs +++ b/transport/src/connections/native_webrtc/mod.rs @@ -94,7 +94,10 @@ impl ConnectionInterface for WebrtcConnection { async fn send_message(&self, msg: TransportMessage) -> Result<()> { self.webrtc_wait_for_data_channel_open().await?; let data = bincode::serialize(&msg).map(Bytes::from)?; - self.webrtc_data_channel.send(&data).await?; + if let Err(e) = self.webrtc_data_channel.send(&data).await { + tracing::error!("{:?}, Data size: {:?}", e, data.len()); + return Err(e.into()); + } Ok(()) }