From 476255dd6f0e09c89f27c58f47119c4117476b9e Mon Sep 17 00:00:00 2001 From: magine Date: Wed, 3 Jan 2024 15:45:18 +0800 Subject: [PATCH] fix: rings jsonrpc client (#513) * Fix rings cmd helper * Fix rings jsonrpc client * Fix CI * connect_peer_via_http in provider should use request * Update CI of ffi * The offer is already encoded * Update comment * Bump version --- .github/workflows/qaci.yml | 2 +- Cargo.lock | 18 +- Cargo.toml | 3 +- node/Cargo.toml | 1 + node/bin/rings.rs | 8 +- node/src/backend/native/service/mod.rs | 4 +- node/src/backend/native/service/tcp_proxy.rs | 8 +- node/src/backend/types.rs | 8 +- node/src/error.rs | 5 +- node/src/native/cli.rs | 141 ++++++------ node/src/native/endpoint/mod.rs | 20 +- node/src/prelude.rs | 3 - node/src/processor.rs | 62 +----- node/src/provider/browser/provider.rs | 15 +- node/src/rpc_impl.rs | 67 +++--- node/src/seed.rs | 19 +- package.json | 2 +- rpc/Cargo.toml | 3 +- rpc/src/client.rs | 176 --------------- rpc/src/error.rs | 2 +- rpc/src/jsonrpc.rs | 214 +++++++++++++++++++ rpc/src/jsonrpc_client/client.rs | 190 ---------------- rpc/src/jsonrpc_client/mod.rs | 5 - rpc/src/jsonrpc_client/request.rs | 184 ---------------- rpc/src/lib.rs | 4 +- rpc/src/method.rs | 8 +- rpc/src/prelude.rs | 4 - rpc/src/protos/build_config.yaml | 4 +- rpc/src/protos/rings_node.proto | 18 +- rpc/src/protos/rings_node.rs | 26 +-- rpc/src/protos/rings_node_handler.rs | 12 +- rpc/src/response.rs | 50 ----- 32 files changed, 432 insertions(+), 854 deletions(-) delete mode 100644 rpc/src/client.rs create mode 100644 rpc/src/jsonrpc.rs delete mode 100644 rpc/src/jsonrpc_client/client.rs delete mode 100644 rpc/src/jsonrpc_client/mod.rs delete mode 100644 rpc/src/jsonrpc_client/request.rs delete mode 100644 rpc/src/response.rs diff --git a/.github/workflows/qaci.yml b/.github/workflows/qaci.yml index 7ff052719..e84e42337 100644 --- a/.github/workflows/qaci.yml +++ b/.github/workflows/qaci.yml @@ -118,7 +118,7 @@ jobs: run: cargo clippy -p rings-node --features ffi - name: Build - run: cargo build -p rings-node --features ffi && ls target/include && ls target/debug + run: cargo build -p rings-node --features ffi - uses: actions/setup-python@v4 with: diff --git a/Cargo.lock b/Cargo.lock index d246ff9ca..9fb8e46bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1693,7 +1693,6 @@ dependencies = [ "futures-core", "futures-task", "futures-util", - "num_cpus", ] [[package]] @@ -2325,21 +2324,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "jsonrpc-pubsub" -version = "18.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "240f87695e6c6f62fb37f05c02c04953cf68d6408b8c1c89de85c7a0125b1011" -dependencies = [ - "futures", - "jsonrpc-core", - "lazy_static", - "log", - "parking_lot 0.11.2", - "rand 0.7.3", - "serde", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -3643,6 +3627,7 @@ dependencies = [ "http", "hyper", "js-sys", + "jsonrpc-core", "lazy_static", "log", "opentelemetry", @@ -3685,7 +3670,6 @@ dependencies = [ "bytes", "http", "jsonrpc-core", - "jsonrpc-pubsub", "prost 0.12.3", "prost-build-config", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 2176b0b18..769a90265 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = ["core", "transport", "node", "rpc", "derive"] [workspace.package] -version = "0.5.1" +version = "0.5.2" edition = "2021" license = "GPL-3.0" authors = ["RND "] @@ -11,6 +11,7 @@ repository = "https://github.com/RingsNetwork/rings-node" [workspace.dependencies] js-sys = "0.3.64" +jsonrpc-core = "18.0.0" rings-core = { path = "core", default-features = false } rings-derive = { path = "derive", default-features = false } rings-rpc = { path = "rpc", default-features = false } diff --git a/node/Cargo.toml b/node/Cargo.toml index 6a96dee4d..cbe9581aa 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -77,6 +77,7 @@ dotenv = "0.15.0" futures = { version = "0.3.21", features = ["alloc"] } futures-timer = "3.0.2" http = "0.2.6" +jsonrpc-core = { workspace = true } log = { version = "0.4", features = ["std"] } rings-core = { workspace = true, optional = true } rings-derive = { workspace = true, optional = true } diff --git a/node/bin/rings.rs b/node/bin/rings.rs index d923be372..c9376f253 100644 --- a/node/bin/rings.rs +++ b/node/bin/rings.rs @@ -105,7 +105,7 @@ struct NewSessionCommand { struct RunCommand { #[arg( long, - help = "Rings node external api listen address. If not provided, use bind_addr in config file or 127.0.0.1:50001", + help = "Rings node external api listen address. If not provided, use external_api_addr in config file or 127.0.0.1:50001", env )] pub external_api_addr: Option, @@ -184,11 +184,9 @@ struct ClientArgs { impl ClientArgs { async fn new_client(&self) -> anyhow::Result { - let c = config::Config::read_fs(self.config_args.config.as_str())?; - let process_config: ProcessorConfig = c.clone().try_into()?; + let c = config::Config::read_fs(&self.config_args.config)?; let endpoint_url = self.endpoint_url.as_ref().unwrap_or(&c.endpoint_url); - let session_sk = process_config.session_sk(); - Client::new(endpoint_url.as_str(), session_sk) + Client::new(endpoint_url) } } diff --git a/node/src/backend/native/service/mod.rs b/node/src/backend/native/service/mod.rs index 3a4374711..70ca62d55 100644 --- a/node/src/backend/native/service/mod.rs +++ b/node/src/backend/native/service/mod.rs @@ -97,7 +97,7 @@ impl MessageHandler for ServiceProvider { reason: e, }; let backend_message: BackendMessage = msg.into(); - let params = backend_message.to_request_params(peer_did)?; + let params = backend_message.into_send_backend_message_request(peer_did)?; provider.request(Method::SendBackendMessage, params).await?; Err(Error::TunnelError(e)) } @@ -128,7 +128,7 @@ impl MessageHandler for ServiceProvider { let service = self.service(&req.service).ok_or(Error::InvalidService)?; let resp = handle_http_request(service.addr, req).await?; let backend_message: BackendMessage = ServiceMessage::HttpResponse(resp).into(); - let params = backend_message.to_request_params(peer_did)?; + let params = backend_message.into_send_backend_message_request(peer_did)?; let resp = provider.request(Method::SendBackendMessage, params).await?; tracing::info!("done calling provider {:?}", resp); Ok(()) diff --git a/node/src/backend/native/service/tcp_proxy.rs b/node/src/backend/native/service/tcp_proxy.rs index ee0719734..a478671d3 100644 --- a/node/src/backend/native/service/tcp_proxy.rs +++ b/node/src/backend/native/service/tcp_proxy.rs @@ -142,7 +142,9 @@ impl TunnelListener { }; let backend_message: BackendMessage = msg.into(); - let params = backend_message.to_request_params(self.peer_did).unwrap(); + let params = backend_message + .into_send_backend_message_request(self.peer_did) + .unwrap(); if let Err(e) = provider.request(Method::SendBackendMessage, params).await { tracing::error!("Send TcpPackage message failed: {e:?}"); break TunnelDefeat::WebrtcDatachannelSendFailed; @@ -176,7 +178,7 @@ impl TunnelListener { }; let backend_message: BackendMessage = msg.into(); - let params = backend_message.to_request_params(self.peer_did).unwrap(); + let params = backend_message.into_send_backend_message_request(self.peer_did).unwrap(); if let Err(e) = provider.request(Method::SendBackendMessage, params).await { tracing::error!("Send TcpClose message failed: {e:?}"); } @@ -189,7 +191,7 @@ impl TunnelListener { }; let backend_message: BackendMessage = msg.into(); - let params = backend_message.to_request_params(self.peer_did).unwrap(); + let params = backend_message.into_send_backend_message_request(self.peer_did).unwrap(); let _ = provider.request(Method::SendBackendMessage, params).await; } } diff --git a/node/src/backend/types.rs b/node/src/backend/types.rs index 680fcfc80..08e688f30 100644 --- a/node/src/backend/types.rs +++ b/node/src/backend/types.rs @@ -5,7 +5,6 @@ use std::io::ErrorKind as IOErrorKind; use std::sync::Arc; use bytes::Bytes; -use rings_core::dht::Did; use rings_core::message::MessagePayload; use rings_rpc::protos::rings_node::SendBackendMessageRequest; use serde::Deserialize; @@ -145,10 +144,13 @@ impl From for TunnelDefeat { impl BackendMessage { /// Convert to SendBackendMessageRequest - pub fn to_request_params(&self, destination_did: Did) -> Result { + pub fn into_send_backend_message_request( + self, + destination_did: impl ToString, + ) -> Result { Ok(SendBackendMessageRequest { destination_did: destination_did.to_string(), - data: serde_json::to_string(self)?, + data: serde_json::to_string(&self)?, }) } } diff --git a/node/src/error.rs b/node/src/error.rs index 4712f2057..a0a124d06 100644 --- a/node/src/error.rs +++ b/node/src/error.rs @@ -1,6 +1,5 @@ //! A bunch of wrap errors. use crate::backend::types::TunnelDefeat; -use crate::prelude::jsonrpc_core; use crate::prelude::rings_core; /// A wrap `Result` contains custom errors. @@ -152,8 +151,8 @@ impl From for jsonrpc_core::Error { } } -impl From for Error { - fn from(e: crate::prelude::rings_rpc::error::Error) -> Self { +impl From for Error { + fn from(e: rings_rpc::error::Error) -> Self { match e { rings_rpc::error::Error::DecodeError => Error::DecodeError, rings_rpc::error::Error::EncodeError => Error::EncodeError, diff --git a/node/src/native/cli.rs b/node/src/native/cli.rs index cca55721b..e58a0a1df 100644 --- a/node/src/native/cli.rs +++ b/node/src/native/cli.rs @@ -24,13 +24,12 @@ use futures::select; use futures::FutureExt; use futures::Stream; use futures_timer::Delay; +use rings_rpc::jsonrpc::Client as RpcClient; +use rings_rpc::protos::rings_node::*; use crate::backend::types::BackendMessage; use crate::backend::types::HttpRequest; use crate::backend::types::ServiceMessage; -use crate::prelude::rings_core::inspect::SwarmInspect; -use crate::prelude::rings_core::session::SessionSk; -use crate::prelude::rings_rpc::client::Client as RpcClient; use crate::seed::Seed; use crate::util::loader::ResourceLoader; @@ -51,8 +50,8 @@ pub struct ClientOutput { impl Client { /// Creates a new Client instance with the specified endpoint URL and signature. - pub fn new(endpoint_url: &str, session_sk: SessionSk) -> anyhow::Result { - let rpc_client = RpcClient::new(endpoint_url, Some(session_sk)); + pub fn new(endpoint_url: &str) -> anyhow::Result { + let rpc_client = RpcClient::new(endpoint_url); Ok(Self { client: rpc_client }) } @@ -64,12 +63,17 @@ impl Client { /// /// Takes a URL for an HTTP server that will be used as the signaling channel to exchange ICE candidates and SDP with the remote peer. /// Returns a Did that can be used to refer to this connection in subsequent WebRTC operations. - pub async fn connect_peer_via_http(&mut self, http_url: &str) -> Output { + pub async fn connect_peer_via_http(&mut self, url: &str) -> Output { let peer_did = self .client - .connect_peer_via_http(http_url) + .connect_peer_via_http(&ConnectPeerViaHttpRequest { + url: url.to_string(), + }) .await - .map_err(|e| anyhow::anyhow!("{}", e))?; + .map_err(|e| anyhow::anyhow!("{}", e))? + .peer + .ok_or_else(|| anyhow::anyhow!("peer did not return"))? + .did; ClientOutput::ok(format!("Remote did: {}", peer_did), peer_did) } @@ -77,10 +81,10 @@ impl Client { /// Attempts to connect to a peer using a seed file located at the specified source path. pub async fn connect_with_seed(&mut self, source: &str) -> Output<()> { let seed = Seed::load(source).await?; - let seed_v = serde_json::to_value(seed).map_err(|_| anyhow::anyhow!("serialize failed"))?; + let req = seed.into_connect_with_seed_request(); self.client - .connect_with_seed(&[seed_v]) + .connect_with_seed(&req) .await .map_err(|e| anyhow::anyhow!("{}", e))?; @@ -90,7 +94,9 @@ impl Client { /// Attempts to connect to a peer using a DID stored in a Distributed Hash Table (DHT). pub async fn connect_with_did(&mut self, did: &str) -> Output<()> { self.client - .connect_with_did(did) + .connect_with_did(&ConnectWithDidRequest { + did: did.to_string(), + }) .await .map_err(|e| anyhow::anyhow!("{}", e))?; ClientOutput::ok("Successful!".to_owned(), ()) @@ -102,9 +108,10 @@ impl Client { pub async fn list_peers(&mut self) -> Output<()> { let peers = self .client - .list_peers() + .list_peers(&ListPeersRequest {}) .await - .map_err(|e| anyhow::anyhow!("{}", e))?; + .map_err(|e| anyhow::anyhow!("{}", e))? + .peers; let mut display = String::new(); display.push_str("Did, TransportId, Status\n"); @@ -123,7 +130,9 @@ impl Client { /// Disconnects from the peer with the specified DID. pub async fn disconnect(&mut self, did: &str) -> Output<()> { self.client - .disconnect(did) + .disconnect(&DisconnectRequest { + did: did.to_string(), + }) .await .map_err(|e| anyhow::anyhow!("{}", e))?; @@ -133,7 +142,10 @@ impl Client { /// Sends a custom message to the specified peer. pub async fn send_custom_message(&self, did: &str, data: &str) -> Output<()> { self.client - .send_custom_message(did, data) + .send_custom_message(&SendCustomMessageRequest { + destination_did: did.to_string(), + data: data.to_string(), + }) .await .map_err(|e| anyhow::anyhow!("{}", e))?; ClientOutput::ok("Done.".into(), ()) @@ -160,15 +172,13 @@ impl Client { rid, }; - let msg = BackendMessage::ServiceMessage(ServiceMessage::HttpRequest(req)); - - let data = bincode::serialize(&msg).map_err(|e| { - anyhow::anyhow!("Failed to serialize HttpRequest message to binary format: {e}",) - })?; - let data_b64 = base64::encode(&data); + let backend_msg = BackendMessage::from(ServiceMessage::HttpRequest(req)); + let rpc_req = backend_msg + .into_send_backend_message_request(did) + .map_err(|e| anyhow::anyhow!("{}", e))?; self.client - .send_custom_message(did, &data_b64) + .send_backend_message(&rpc_req) .await .map_err(|e| anyhow::anyhow!("{}", e))?; @@ -177,15 +187,13 @@ impl Client { /// Sends a plain text message to the specified peer. pub async fn send_plain_text_message(&self, did: &str, text: &str) -> Output<()> { - let msg = BackendMessage::PlainText(text.to_string()); - - let data = bincode::serialize(&msg).map_err(|e| { - anyhow::anyhow!("Failed to serialize PlainText message to binary format: {e}",) - })?; - let data_b64 = base64::encode(&data); + let backend_msg = BackendMessage::PlainText(text.to_string()); + let rpc_req = backend_msg + .into_send_backend_message_request(did) + .map_err(|e| anyhow::anyhow!("{}", e))?; self.client - .send_custom_message(did, &data_b64) + .send_backend_message(&rpc_req) .await .map_err(|e| anyhow::anyhow!("{}", e))?; @@ -195,7 +203,9 @@ impl Client { /// Registers a new service with the given name. pub async fn register_service(&self, name: &str) -> Output<()> { self.client - .register_service(name) + .register_service(&RegisterServiceRequest { + name: name.to_string(), + }) .await .map_err(|e| anyhow::anyhow!("{}", e))?; ClientOutput::ok("Done.".into(), ()) @@ -205,9 +215,12 @@ impl Client { pub async fn lookup_service(&self, name: &str) -> Output<()> { let dids = self .client - .lookup_service(name) + .lookup_service(&LookupServiceRequest { + name: name.to_string(), + }) .await - .map_err(|e| anyhow::anyhow!("{}", e))?; + .map_err(|e| anyhow::anyhow!("{}", e))? + .dids; ClientOutput::ok(dids.join("\n"), ()) } @@ -215,7 +228,10 @@ impl Client { /// Publishes a message to the specified topic. pub async fn publish_message_to_topic(&self, topic: &str, data: &str) -> Output<()> { self.client - .publish_message_to_topic(topic, data) + .publish_message_to_topic(&PublishMessageToTopicRequest { + topic: topic.to_string(), + data: data.to_string(), + }) .await .map_err(|e| anyhow::anyhow!("{}", e))?; ClientOutput::ok("Done.".into(), ()) @@ -229,29 +245,32 @@ impl Client { where 'a: 'b, { - let mut index = 0; + let mut skip = 0usize; stream! { - loop { - let timeout = Delay::new(Duration::from_secs(5)).fuse(); - pin_mut!(timeout); - - select! { - _ = timeout => { - let result = self - .client - .fetch_topic_messages(topic.as_str(), index) - .await; - - if let Err(e) = result { - tracing::error!("Failed to fetch messages of topic: {}, {}", topic, e); - continue; - } - let messages = result.unwrap(); - for msg in messages.iter().cloned() { - yield msg - } - index += messages.len(); + loop { + let timeout = Delay::new(Duration::from_secs(5)).fuse(); + pin_mut!(timeout); + + select! { + _ = timeout => { + let result = self + .client + .fetch_topic_messages(&FetchTopicMessagesRequest { + topic: topic.clone(), + skip: skip as i64, + }) + .await; + + if let Err(e) = result { + tracing::error!("Failed to fetch messages of topic: {}, {}", topic, e); + continue; + } + let messages = result.unwrap().data; + for msg in messages.iter().cloned() { + yield msg + } + skip += messages.len(); } } } @@ -259,17 +278,19 @@ impl Client { } /// Query for swarm inspect info. - pub async fn inspect(&self) -> Output { - let info = self + pub async fn inspect(&self) -> Output { + let swarm_info = self .client - .inspect() + .node_info(&NodeInfoRequest {}) .await - .map_err(|e| anyhow::anyhow!("{}", e))?; + .map_err(|e| anyhow::anyhow!("{}", e))? + .swarm + .unwrap(); let display = - serde_json::to_string_pretty(&info.swarm).map_err(|e| anyhow::anyhow!("{}", e))?; + serde_json::to_string_pretty(&swarm_info).map_err(|e| anyhow::anyhow!("{}", e))?; - ClientOutput::ok(display, info.swarm) + ClientOutput::ok(display, swarm_info) } } diff --git a/node/src/native/endpoint/mod.rs b/node/src/native/endpoint/mod.rs index 114cb66d2..1ba29e0d3 100644 --- a/node/src/native/endpoint/mod.rs +++ b/node/src/native/endpoint/mod.rs @@ -13,17 +13,17 @@ use axum::response::IntoResponse; use axum::routing::get; use axum::routing::post; use axum::Router; +use jsonrpc_core::MetaIoHandler; +use rings_rpc::protos::rings_node::NodeInfoResponse; use tower_http::cors::CorsLayer; use self::http_error::HttpError; -use crate::prelude::jsonrpc_core::MetaIoHandler; -use crate::prelude::rings_rpc::protos::rings_node::NodeInfoResponse; use crate::processor::Processor; /// JSON-RPC state #[derive(Clone)] pub struct JsonRpcState -where M: crate::prelude::jsonrpc_core::Middleware> +where M: jsonrpc_core::Middleware> { processor: Arc, io_handler: MetaIoHandler, M>, @@ -114,7 +114,7 @@ async fn jsonrpc_io_handler( body: String, ) -> Result where - M: crate::prelude::jsonrpc_core::Middleware>, + M: jsonrpc_core::Middleware>, { let r = state .io_handler @@ -170,16 +170,16 @@ async fn ws_handler( mod jsonrpc_middleware_impl { use std::future::Future; + use jsonrpc_core::futures_util::future; + use jsonrpc_core::futures_util::future::Either; + use jsonrpc_core::futures_util::FutureExt; + use jsonrpc_core::middleware::NoopCallFuture; + use jsonrpc_core::middleware::NoopFuture; + use jsonrpc_core::*; use rings_rpc::protos::rings_node_handler::ExternalRpcHandler; use rings_rpc::protos::rings_node_handler::InternalRpcHandler; use super::*; - use crate::native::endpoint::jsonrpc_middleware_impl::middleware::NoopCallFuture; - use crate::native::endpoint::jsonrpc_middleware_impl::middleware::NoopFuture; - use crate::prelude::jsonrpc_core::futures_util::future; - use crate::prelude::jsonrpc_core::futures_util::future::Either; - use crate::prelude::jsonrpc_core::futures_util::FutureExt; - use crate::prelude::jsonrpc_core::*; impl Middleware> for InternalRpcMiddleware { type Future = NoopFuture; diff --git a/node/src/prelude.rs b/node/src/prelude.rs index c160d4e60..ba747e081 100644 --- a/node/src/prelude.rs +++ b/node/src/prelude.rs @@ -2,9 +2,6 @@ /// Use this when you want to quickly bootstrap a new project. pub use rings_core; pub use rings_derive::wasm_export; -pub use rings_rpc; -pub use rings_rpc::jsonrpc_client; -pub use rings_rpc::prelude::jsonrpc_core; pub use self::rings_core::chunk; pub use self::rings_core::dht::PeerRing; diff --git a/node/src/processor.rs b/node/src/processor.rs index 3abf9d572..9cf12b914 100644 --- a/node/src/processor.rs +++ b/node/src/processor.rs @@ -7,8 +7,7 @@ use std::sync::Arc; use futures::future::Join; use futures::Future; -use rings_core::message::MessagePayload; -use rings_core::swarm::impls::ConnectionHandshake; +use rings_rpc::protos::rings_node::*; use rings_transport::core::transport::ConnectionInterface; use serde::Deserialize; use serde::Serialize; @@ -18,12 +17,9 @@ use crate::consts::DATA_REDUNDANT; use crate::error::Error; use crate::error::Result; use crate::measure::PeriodicMeasure; -use crate::prelude::jsonrpc_client::SimpleClient; -use crate::prelude::jsonrpc_core; use crate::prelude::rings_core::dht::Did; use crate::prelude::rings_core::dht::Stabilization; use crate::prelude::rings_core::dht::TStabilize; -use crate::prelude::rings_core::message::Decoder; use crate::prelude::rings_core::message::Encoded; use crate::prelude::rings_core::message::Encoder; use crate::prelude::rings_core::message::Message; @@ -33,8 +29,6 @@ use crate::prelude::rings_core::storage::PersistenceStorage; use crate::prelude::rings_core::swarm::MeasureImpl; use crate::prelude::rings_core::swarm::Swarm; use crate::prelude::rings_core::swarm::SwarmBuilder; -use crate::prelude::rings_rpc::method; -use crate::prelude::rings_rpc::protos::rings_node::NodeInfoResponse; use crate::prelude::vnode; use crate::prelude::wasm_export; use crate::prelude::ChordStorageInterface; @@ -291,59 +285,6 @@ impl Processor { self.swarm.did() } - /// Connect peer with remote rings-node jsonrpc server. - /// * peer_url: the remote rings-node jsonrpc server url. - pub async fn connect_peer_via_http(&self, peer_url: &str) -> Result { - // request remote offer and sand answer to remote - tracing::debug!("connect_peer_via_http: {}", peer_url); - - let client = SimpleClient::new(peer_url, None); - - let did_resp = client - .call_method(method::Method::NodeDid.as_str(), jsonrpc_core::Params::None) - .await - .map_err(|e| Error::RemoteRpcError(e.to_string()))?; - let did = serde_json::from_value::(did_resp.clone()) - .map_err(|_| Error::InvalidDid(did_resp.clone().to_string()))? - .parse() - .map_err(|_| Error::InvalidDid(did_resp.clone().to_string()))?; - - let (_, offer) = self - .swarm - .create_offer(did) - .await - .map_err(Error::CreateOffer)?; - let encoded_offer = offer.encode().map_err(|_| Error::EncodeError)?; - tracing::debug!("sending encoded offer {:?} to {}", encoded_offer, peer_url); - let req: serde_json::Value = serde_json::to_value(encoded_offer) - .map_err(Error::SerdeJsonError) - .map_err(Error::from)?; - - let resp = client - .call_method( - method::Method::AnswerOffer.as_str(), - jsonrpc_core::Params::Array(vec![req]), - ) - .await - .map_err(|e| Error::RemoteRpcError(e.to_string()))?; - - let answer_payload_str: String = - serde_json::from_value(resp).map_err(|_| Error::EncodeError)?; - - let encoded_answer: Encoded = >::from(&answer_payload_str); - - let answer_payload = - MessagePayload::from_encoded(&encoded_answer).map_err(|_| Error::DecodeError)?; - - let (did, _) = self - .swarm - .accept_answer(answer_payload) - .await - .map_err(Error::AcceptAnswer)?; - - Ok(did) - } - /// Connect peer with web3 did. /// There are 3 peers: PeerA, PeerB, PeerC. /// 1. PeerA has a connection with PeerB. @@ -466,6 +407,7 @@ impl Processor { mod test { use futures::lock::Mutex; use rings_core::swarm::callback::SwarmCallback; + use rings_core::swarm::impls::ConnectionHandshake; use rings_transport::core::transport::WebrtcConnectionState; use super::*; diff --git a/node/src/provider/browser/provider.rs b/node/src/provider/browser/provider.rs index 1a22a3663..f2cfd367b 100644 --- a/node/src/provider/browser/provider.rs +++ b/node/src/provider/browser/provider.rs @@ -16,6 +16,7 @@ use rings_core::prelude::vnode::VirtualNode; use rings_core::utils::js_utils; use rings_core::utils::js_value; use rings_derive::wasm_export; +use rings_rpc::protos::rings_node::*; use rings_transport::core::transport::ConnectionInterface; use rings_transport::core::transport::WebrtcConnectionState; use serde::Deserialize; @@ -195,17 +196,13 @@ impl Provider { }) } - /// connect peer with remote jsonrpc-server url + /// connect peer with remote jsonrpc server url pub fn connect_peer_via_http(&self, remote_url: String) -> js_sys::Promise { log::debug!("remote_url: {}", remote_url); - let p = self.processor.clone(); - future_to_promise(async move { - let did = p - .connect_peer_via_http(remote_url.as_str()) - .await - .map_err(JsError::from)?; - Ok(JsValue::from_str(&did.to_string())) - }) + self.request( + "ConnectPeerViaHttp".to_string(), + js_value::serialize(&ConnectPeerViaHttpRequest { url: remote_url }).unwrap(), + ) } /// connect peer with web3 address, without waiting for connection channel connected diff --git a/node/src/rpc_impl.rs b/node/src/rpc_impl.rs index 02a7673e1..f5743fd7f 100644 --- a/node/src/rpc_impl.rs +++ b/node/src/rpc_impl.rs @@ -1,28 +1,30 @@ #![warn(missing_docs)] -//! JSON-RPC handler for both feature=browser and feature=node. -//! We support running the JSON-RPC server in either native or browser environment. + +//! RPC handler for both feature=browser and feature=node. +//! We support handling the RPC request in either native or browser environment by `InternalRpcHandler` and `ExternalRpcHandler` from rings_rpc crate. //! For the native environment, we use jsonrpc_core to handle requests. -//! For the browser environment, we utilize a Simple MessageHandler to process the requests. +//! For the browser environment, we use `InternalRpcHandler` to process the requests. + use std::collections::HashSet; use std::str::FromStr; use async_trait::async_trait; use futures::future::join_all; +use jsonrpc_core::types::error::Error; +use jsonrpc_core::types::error::ErrorCode; +use jsonrpc_core::Result; +use rings_core::dht::Did; +use rings_core::message::Decoder; +use rings_core::message::Encoded; +use rings_core::message::Encoder; +use rings_core::message::MessagePayload; +use rings_core::prelude::vnode::VirtualNode; use rings_core::swarm::impls::ConnectionHandshake; use rings_rpc::protos::rings_node::*; use rings_rpc::protos::rings_node_handler::HandleRpc; use rings_transport::core::transport::ConnectionInterface; use crate::error::Error as ServerError; -use crate::prelude::jsonrpc_core::types::error::Error; -use crate::prelude::jsonrpc_core::types::error::ErrorCode; -use crate::prelude::jsonrpc_core::Result; -use crate::prelude::rings_core::dht::Did; -use crate::prelude::rings_core::message::Decoder; -use crate::prelude::rings_core::message::Encoded; -use crate::prelude::rings_core::message::Encoder; -use crate::prelude::rings_core::message::MessagePayload; -use crate::prelude::rings_core::prelude::vnode::VirtualNode; use crate::processor::Processor; use crate::seed::Seed; @@ -33,14 +35,25 @@ impl HandleRpc for Proces &self, req: ConnectPeerViaHttpRequest, ) -> Result { - let did = self - .connect_peer_via_http(&req.url) + let client = rings_rpc::jsonrpc::Client::new(&req.url); + + let did = client + .node_did(&NodeDidRequest {}) .await - .map_err(Error::from)?; + .map_err(|e| ServerError::RemoteRpcError(e.to_string()))? + .did; - Ok(ConnectPeerViaHttpResponse { - did: did.to_string(), - }) + let offer = self.handle_rpc(CreateOfferRequest { did }).await?.offer; + + let answer = client + .answer_offer(&AnswerOfferRequest { offer }) + .await + .map_err(|e| ServerError::RemoteRpcError(e.to_string()))? + .answer; + + let peer = self.handle_rpc(AcceptAnswerRequest { answer }).await?.peer; + + Ok(ConnectPeerViaHttpResponse { peer }) } } @@ -70,7 +83,11 @@ impl HandleRpc for Processor { .peers .iter() .filter(|&x| !connected.contains(&x.did)) - .map(|x| self.connect_peer_via_http(&x.url)); + .map(|x| { + self.handle_rpc(ConnectPeerViaHttpRequest { + url: x.url.to_string(), + }) + }); let results = join_all(tasks).await; @@ -151,7 +168,7 @@ impl HandleRpc for Processor { if req.answer.is_empty() { return Err(Error::invalid_params("Answer is empty")); } - let encoded: Encoded = >::from(req.answer); + let encoded = Encoded::from(req.answer); let answer_payload = MessagePayload::from_encoded(&encoded).map_err(|_| ServerError::DecodeError)?; @@ -223,11 +240,11 @@ impl HandleRpc for #[cfg_attr(feature = "browser", async_trait(?Send))] #[cfg_attr(not(feature = "browser"), async_trait)] -impl HandleRpc for Processor { +impl HandleRpc for Processor { async fn handle_rpc( &self, - req: FetchMessagesOfTopicRequest, - ) -> Result { + req: FetchTopicMessagesRequest, + ) -> Result { let vid = VirtualNode::gen_did(&req.topic) .map_err(|_| Error::invalid_params("Failed to get id of topic"))?; @@ -235,7 +252,7 @@ impl HandleRpc for Pr let result = self.storage_check_cache(vid).await; let Some(vnode) = result else { - return Ok(FetchMessagesOfTopicResponse { data: vec![] }); + return Ok(FetchTopicMessagesResponse { data: vec![] }); }; let data = vnode @@ -246,7 +263,7 @@ impl HandleRpc for Pr .filter_map(|v| v.ok()) .collect::>(); - Ok(FetchMessagesOfTopicResponse { data }) + Ok(FetchTopicMessagesResponse { data }) } } diff --git a/node/src/seed.rs b/node/src/seed.rs index f69bcd848..e16cc0795 100644 --- a/node/src/seed.rs +++ b/node/src/seed.rs @@ -1,12 +1,12 @@ //! Seed and SeedLoader use for getting peers from endpoint. use std::str::FromStr; +use rings_core::dht::Did; +use rings_rpc::protos::rings_node::ConnectWithSeedRequest; use serde::Deserialize; use serde::Serialize; use crate::error::Error; -use crate::prelude::rings_core::dht::Did; -use crate::prelude::rings_rpc::protos::rings_node::ConnectWithSeedRequest; /// A list contains SeedPeer. #[derive(Deserialize, Serialize, Debug)] @@ -37,3 +37,18 @@ impl TryFrom for Seed { Ok(Seed { peers }) } } + +impl Seed { + pub fn into_connect_with_seed_request(self) -> ConnectWithSeedRequest { + let mut peers = Vec::new(); + + for peer in self.peers { + peers.push(rings_rpc::protos::rings_node::SeedPeer { + did: peer.did.to_string(), + url: peer.url, + }); + } + + ConnectWithSeedRequest { peers } + } +} diff --git a/package.json b/package.json index 88dbf426a..432e3a50a 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "RND " ], "description": "Rings is a structured peer-to-peer network implementation using WebRTC, Chord algorithm, and full WebAssembly (WASM) support.\n", - "version": "0.5.1", + "version": "0.5.2", "license": "GPL-3.0", "repository": { "type": "git", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 90431c591..be4880565 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -25,8 +25,7 @@ async-trait = "0.1.73" base64 = { version = "0.13.0" } bytes = "1.5.0" http = "0.2.6" -jsonrpc-core = { version = "18.0.0" } -jsonrpc-pubsub = { version = "18.0.0" } +jsonrpc-core = { workspace = true } prost = "0.12.3" reqwest = { version = "0.11", features = ["json", "rustls-tls"], optional = true, default-features = false } reqwest-wasm = { version = "0.11", features = ["json", "rustls-tls"], optional = true, default-features = false } diff --git a/rpc/src/client.rs b/rpc/src/client.rs deleted file mode 100644 index 11fbd6f12..000000000 --- a/rpc/src/client.rs +++ /dev/null @@ -1,176 +0,0 @@ -//! rings-rpc client - -use rings_core::session::SessionSk; -use serde_json::json; -use serde_json::Value; - -use crate::error::Error; -use crate::error::Result; -use crate::jsonrpc_client::SimpleClient; -use crate::method::Method; -use crate::prelude::jsonrpc_core::Params; -use crate::prelude::*; -use crate::response; -use crate::response::Peer; - -/// Wrap json_client send request between nodes or browsers. -pub struct Client { - client: SimpleClient, -} - -impl Client { - /// Creates a new Client instance with the specified endpoint URL - pub fn new(endpoint_url: &str, delegated_sk: Option) -> Self { - Self { - client: SimpleClient::new(endpoint_url, delegated_sk), - } - } - - /// Establishes a WebRTC connection with a remote peer using HTTP as the signaling channel. - /// - /// This function allows two peers to establish a WebRTC connection using HTTP, - /// which can be useful in scenarios where a direct peer-to-peer connection is not possible due to firewall restrictions or other network issues. - /// The function sends ICE candidates and Session Description Protocol (SDP) messages over HTTP as a form of signaling to establish the connection. - /// - /// Takes a URL for an HTTP server that will be used as the signaling channel to exchange ICE candidates and SDP with the remote peer. - /// Returns a Did that can be used to refer to this connection in subsequent WebRTC operations. - pub async fn connect_peer_via_http(&mut self, http_url: &str) -> Result { - let resp = self - .client - .call_method( - Method::ConnectPeerViaHttp.as_str(), - Params::Array(vec![Value::String(http_url.to_owned())]), - ) - .await - .map_err(Error::RpcError)?; - - let did = resp.as_str().ok_or(Error::DecodeError)?; - Ok(did.to_string()) - } - - /// Attempts to connect to a peer using a seed file located at the specified source path. - pub async fn connect_with_seed(&mut self, seeds: &[serde_json::Value]) -> Result<()> { - self.client - .call_method( - Method::ConnectWithSeed.as_str(), - Params::Array(seeds.to_vec()), - ) - .await - .map_err(Error::RpcError)?; - Ok(()) - } - - /// Attempts to connect to a peer using a DID stored in a Distributed Hash Table (DHT). - pub async fn connect_with_did(&mut self, did: &str) -> Result<()> { - self.client - .call_method( - Method::ConnectWithDid.as_str(), - Params::Array(vec![Value::String(did.to_owned())]), - ) - .await - .map_err(Error::RpcError)?; - Ok(()) - } - - /// Lists all connected peers and their status. - /// - /// Returns an Output containing a formatted string representation of the list of peers if successful, or an anyhow::Error if an error occurred. - pub async fn list_peers(&mut self) -> Result> { - let resp = self - .client - .call_method(Method::ListPeers.as_str(), Params::Array(vec![])) - .await - .map_err(Error::RpcError)?; - - let peers: Vec = serde_json::from_value(resp).map_err(|_| Error::DecodeError)?; - Ok(peers) - } - - /// Disconnects from the peer with the specified DID. - pub async fn disconnect(&mut self, did: &str) -> Result<()> { - self.client - .call_method(Method::Disconnect.as_str(), Params::Array(vec![json!(did)])) - .await - .map_err(Error::RpcError)?; - - Ok(()) - } - - /// Sends a custom message to the specified peer. - pub async fn send_custom_message( - &self, - did: &str, - data_b64: &str, - ) -> Result { - let result = self - .client - .call_method( - Method::SendCustomMessage.as_str(), - Params::Array(vec![json!(did), json!(data_b64)]), - ) - .await - .map_err(Error::RpcError)?; - serde_json::from_value(result).map_err(|_| Error::DecodeError) - } - - /// Registers a new service with the given name. - pub async fn register_service(&self, name: &str) -> Result<()> { - self.client - .call_method( - Method::RegisterService.as_str(), - Params::Array(vec![json!(name)]), - ) - .await - .map_err(Error::RpcError)?; - Ok(()) - } - - /// Looks up the DIDs of services registered with the given name. - pub async fn lookup_service(&self, name: &str) -> Result> { - let resp = self - .client - .call_method( - Method::LookupService.as_str(), - Params::Array(vec![json!(name)]), - ) - .await - .map_err(Error::RpcError)?; - - serde_json::from_value(resp).map_err(|_| Error::DecodeError) - } - - /// Publishes a message to the specified topic. - pub async fn publish_message_to_topic(&self, topic: &str, data: &str) -> Result<()> { - self.client - .call_method( - Method::PublishMessageToTopic.as_str(), - Params::Array(vec![json!(topic), json!(data)]), - ) - .await - .map_err(Error::RpcError)?; - Ok(()) - } - - pub async fn fetch_topic_messages(&self, topic: &str, index: usize) -> Result> { - let resp = self - .client - .call_method( - Method::FetchMessagesOfTopic.as_str(), - Params::Array(vec![json!(topic), json!(index)]), - ) - .await - .map_err(Error::RpcError)?; - - serde_json::from_value(resp).map_err(|_| Error::DecodeError) - } - - /// Query for swarm inspect info. - pub async fn inspect(&self) -> Result { - let resp = self - .client - .call_method(Method::NodeInfo.as_str(), Params::None) - .await - .map_err(Error::RpcError)?; - serde_json::from_value(resp).map_err(|_| Error::DecodeError) - } -} diff --git a/rpc/src/error.rs b/rpc/src/error.rs index e8e0069d9..5947846e7 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -12,7 +12,7 @@ pub enum Error { #[error("Invalid method.")] InvalidMethod, #[error("Rpc error: {0}")] - RpcError(crate::jsonrpc_client::client::RpcError), + RpcError(crate::jsonrpc::RpcError), #[error("Invalid signature.")] InvalidSignature, #[error("Invalid headers.")] diff --git a/rpc/src/jsonrpc.rs b/rpc/src/jsonrpc.rs new file mode 100644 index 000000000..351f63dad --- /dev/null +++ b/rpc/src/jsonrpc.rs @@ -0,0 +1,214 @@ +//! rings-rpc client + +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::method::Method; +use crate::prelude::reqwest::Client as HttpClient; +use crate::protos::rings_node::*; + +/// Wrap json_client send request between nodes or browsers. +pub struct Client { + client: HttpClient, + endpoint_url: String, +} + +/// The errors returned by the client. +#[derive(Debug, thiserror::Error)] +pub enum RpcError { + /// An error returned by the server. + #[error("Server returned rpc error {0}")] + JsonClientError(jsonrpc_core::Error), + /// Failure to parse server response. + #[error("Failed to parse server response as {0}: {1}")] + ParseError(String, Box), + /// Request timed out. + #[error("Request timed out")] + Timeout, + /// A general client error. + #[error("Client error: {0}")] + Client(String), + /// Not rpc specific errors. + #[error("{0}")] + Other(Box), +} + +/// A wrap `Result` contains ClientError. +type Result = std::result::Result; + +impl Client { + /// Creates a new Client instance with the specified endpoint URL + pub fn new(endpoint_url: &str) -> Self { + Self { + client: HttpClient::default(), + endpoint_url: endpoint_url.to_string(), + } + } + + pub async fn call_method(&self, method: Method, req: &impl Serialize) -> Result + where T: DeserializeOwned { + use jsonrpc_core::*; + + let params = serde_json::to_value(req) + .map_err(|e| RpcError::Client(e.to_string()))? + .as_object() + .ok_or(RpcError::Client("params should be an object".to_string()))? + .clone(); + + let jsonrpc_request = Request::Single(Call::MethodCall(MethodCall { + jsonrpc: Some(Version::V2), + method: method.to_string(), + params: Params::Map(params), + id: Id::Num(1), + })); + + let result = self.do_jsonrpc_request(&jsonrpc_request).await?; + serde_json::from_value(result).map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e))) + } + + async fn do_jsonrpc_request(&self, req: &jsonrpc_core::Request) -> Result { + let body = serde_json::to_string(req).map_err(|e| RpcError::Client(e.to_string()))?; + + let req = self + .client + .post(self.endpoint_url.as_str()) + .header("content-type", "application/json") + .header("accept", "application/json") + .body(body); + + let resp = req + .send() + .await + .map_err(|e| RpcError::Client(e.to_string()))? + .error_for_status() + .map_err(|e| RpcError::Client(e.to_string()))? + .bytes() + .await + .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))?; + + let jsonrpc_resp = jsonrpc_core::Response::from_json(&String::from_utf8_lossy(&resp)) + .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))?; + + match jsonrpc_resp { + jsonrpc_core::Response::Single(resp) => match resp { + jsonrpc_core::Output::Success(success) => Ok(success.result), + jsonrpc_core::Output::Failure(failure) => { + Err(RpcError::JsonClientError(failure.error)) + } + }, + jsonrpc_core::Response::Batch(_) => Err(RpcError::Client( + "Batch response is not supported".to_string(), + )), + } + } + + /// Establishes a WebRTC connection with a remote peer using HTTP as the signaling channel. + /// + /// This function allows two peers to establish a WebRTC connection using HTTP, + /// which can be useful in scenarios where a direct peer-to-peer connection is not possible due to firewall restrictions or other network issues. + /// The function sends ICE candidates and Session Description Protocol (SDP) messages over HTTP as a form of signaling to establish the connection. + /// + /// Takes a URL for an HTTP server that will be used as the signaling channel to exchange ICE candidates and SDP with the remote peer. + /// Returns a Did that can be used to refer to this connection in subsequent WebRTC operations. + pub async fn connect_peer_via_http( + &self, + req: &ConnectPeerViaHttpRequest, + ) -> Result { + self.call_method(Method::ConnectPeerViaHttp, req).await + } + + /// Attempts to connect to a peer using a DID stored in a Distributed Hash Table (DHT). + pub async fn connect_with_did( + &self, + req: &ConnectWithDidRequest, + ) -> Result { + self.call_method(Method::ConnectWithDid, req).await + } + + /// Attempts to connect to a peer using a seed file located at the specified source path. + pub async fn connect_with_seed( + &self, + req: &ConnectWithSeedRequest, + ) -> Result { + self.call_method(Method::ConnectWithSeed, req).await + } + + /// Lists all connected peers and their status. + /// + /// Returns an Output containing a formatted string representation of the list of peers if successful, or an anyhow::Error if an error occurred. + pub async fn list_peers(&self, req: &ListPeersRequest) -> Result { + self.call_method(Method::ListPeers, req).await + } + + pub async fn create_offer(&self, req: &CreateOfferRequest) -> Result { + self.call_method(Method::CreateOffer, req).await + } + + pub async fn answer_offer(&self, req: &AnswerOfferRequest) -> Result { + self.call_method(Method::AnswerOffer, req).await + } + + pub async fn accept_answer(&self, req: &AcceptAnswerRequest) -> Result { + self.call_method(Method::AcceptAnswer, req).await + } + + /// Disconnects from the peer with the specified DID. + pub async fn disconnect(&self, req: &DisconnectRequest) -> Result { + self.call_method(Method::Disconnect, req).await + } + + /// Sends a custom message to the specified peer. + pub async fn send_custom_message( + &self, + req: &SendCustomMessageRequest, + ) -> Result { + self.call_method(Method::SendCustomMessage, req).await + } + + pub async fn send_backend_message( + &self, + req: &SendBackendMessageRequest, + ) -> Result { + self.call_method(Method::SendBackendMessage, req).await + } + + /// Publishes a message to the specified topic. + pub async fn publish_message_to_topic( + &self, + req: &PublishMessageToTopicRequest, + ) -> Result { + self.call_method(Method::PublishMessageToTopic, req).await + } + + pub async fn fetch_topic_messages( + &self, + req: &FetchTopicMessagesRequest, + ) -> Result { + self.call_method(Method::FetchTopicMessages, req).await + } + + /// Registers a new service with the given name. + pub async fn register_service( + &self, + req: &RegisterServiceRequest, + ) -> Result { + self.call_method(Method::RegisterService, req).await + } + + /// Looks up the DIDs of services registered with the given name. + pub async fn lookup_service( + &self, + req: &LookupServiceRequest, + ) -> Result { + self.call_method(Method::LookupService, req).await + } + + /// Query for swarm inspect info. + pub async fn node_info(&self, req: &NodeInfoRequest) -> Result { + self.call_method(Method::NodeInfo, req).await + } + + pub async fn node_did(&self, req: &NodeDidRequest) -> Result { + self.call_method(Method::NodeDid, req).await + } +} diff --git a/rpc/src/jsonrpc_client/client.rs b/rpc/src/jsonrpc_client/client.rs deleted file mode 100644 index e3645fb24..000000000 --- a/rpc/src/jsonrpc_client/client.rs +++ /dev/null @@ -1,190 +0,0 @@ -#![warn(missing_docs)] -//! SimpleClient for jsonrpc request use reqwest::Client. -/// -/// Sample: -/// let client = Simpleclient::new("http://localhost:5000", delegated_sk); -/// client.call_method("test", params); -use jsonrpc_core::Error; -use jsonrpc_core::Params; -use jsonrpc_core::Value; -use rings_core::session::SessionSk; - -use super::request::parse_response; -use super::request::RequestBuilder; -use crate::prelude::reqwest::Client as HttpClient; - -/// Create a new SimpleClient -/// * client: a instance of reqwest::Client -/// * url: remote jsonrpc_server url -pub struct SimpleClient { - client: HttpClient, - url: String, - delegated_sk: Option, -} - -impl SimpleClient { - /// * client: reqwest::Client handle http request. - /// * url: remote json_server url. - /// * session_key: session_key for sign request. - pub fn new(url: &str, delegated_sk: Option) -> Self { - Self { - client: HttpClient::default(), - url: url.to_string(), - delegated_sk, - } - } - - /// JSONRpc call_method - pub async fn call_method(&self, method: &str, params: Params) -> RpcResult { - let msg = CallMessage { - method: method.into(), - params, - }; - self.do_request(&RpcMessage::Call(msg)).await - } - - /// JSONRpc notify request - pub async fn notify(&self, method: &str, params: Params) -> RpcResult<()> { - let msg = NotifyMessage { - method: method.into(), - params, - }; - self.do_request(&RpcMessage::Notify(msg)).await?; - Ok(()) - } - - async fn do_request(&self, msg: &RpcMessage) -> RpcResult { - let mut request_builder = RequestBuilder::new(); - let request = match msg { - RpcMessage::Call(call) => request_builder.call_request(call).1, - RpcMessage::Notify(notify) => request_builder.notification(notify), - RpcMessage::Subscribe(_) => { - return Err(RpcError::Client( - "Unsupported `RpcMessage` type `Subscribe`.".to_owned(), - )); - } - }; - - let mut req = self - .client - .post(self.url.as_str()) - .header("content-type", "application/json") - .header("accept", "application/json") - .body(request.clone()); - - if let Some(delegated_sk) = &self.delegated_sk { - let sig = delegated_sk - .sign(request.clone().as_bytes()) - .map_err(|e| RpcError::Client(format!("Failed to sign request: {}", e)))?; - let encoded_sig = base64::encode(sig); - req = req.header("X-SIGNATURE", encoded_sig); - } - - let resp = req - .send() - .await - .map_err(|e| RpcError::Client(e.to_string()))?; - let resp = resp - .error_for_status() - .map_err(|e| RpcError::Client(e.to_string()))?; - let resp = resp - .bytes() - .await - .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))?; - let resp_str = String::from_utf8_lossy(&resp).into_owned(); - parse_response(&resp_str) - .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))? - .1 - } -} - -/// The errors returned by the client. -#[derive(Debug, thiserror::Error)] -pub enum RpcError { - /// An error returned by the server. - #[error("Server returned rpc error {0}")] - JsonRpcError(Error), - /// Failure to parse server response. - #[error("Failed to parse server response as {0}: {1}")] - ParseError(String, Box), - /// Request timed out. - #[error("Request timed out")] - Timeout, - /// A general client error. - #[error("Client error: {0}")] - Client(String), - /// Not rpc specific errors. - #[error("{0}")] - Other(Box), -} - -impl From for RpcError { - fn from(error: Error) -> Self { - RpcError::JsonRpcError(error) - } -} - -/// A result returned by the client. -pub type RpcResult = Result; - -/// An RPC call message. -pub struct CallMessage { - /// The RPC method name. - pub method: String, - /// The RPC method parameters. - pub params: Params, -} - -/// An RPC notification. -pub struct NotifyMessage { - /// The RPC method name. - pub method: String, - /// The RPC method parameters. - pub params: Params, -} - -/// An RPC subscription. -pub struct Subscription { - /// The subscribe method name. - pub subscribe: String, - /// The subscribe method parameters. - pub subscribe_params: Params, - /// The name of the notification. - pub notification: String, - /// The unsubscribe method name. - pub unsubscribe: String, -} - -/// An RPC subscribe message. -pub struct SubscribeMessage { - /// The subscription to subscribe to. - pub subscription: Subscription, -} - -/// A message sent to the `RpcClient`. -pub enum RpcMessage { - /// Make an RPC call. - Call(CallMessage), - /// Send a notification. - Notify(NotifyMessage), - /// Subscribe to a notification. - Subscribe(SubscribeMessage), -} - -impl From for RpcMessage { - fn from(msg: CallMessage) -> Self { - RpcMessage::Call(msg) - } -} - -impl From for RpcMessage { - fn from(msg: NotifyMessage) -> Self { - RpcMessage::Notify(msg) - } -} - -impl From for RpcMessage { - fn from(msg: SubscribeMessage) -> Self { - RpcMessage::Subscribe(msg) - } -} diff --git a/rpc/src/jsonrpc_client/mod.rs b/rpc/src/jsonrpc_client/mod.rs deleted file mode 100644 index 54c186e39..000000000 --- a/rpc/src/jsonrpc_client/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! A jsonrpc-client. -pub mod client; -pub mod request; - -pub use self::client::SimpleClient; diff --git a/rpc/src/jsonrpc_client/request.rs b/rpc/src/jsonrpc_client/request.rs deleted file mode 100644 index 6eece1990..000000000 --- a/rpc/src/jsonrpc_client/request.rs +++ /dev/null @@ -1,184 +0,0 @@ -#![warn(missing_docs)] -//! Basic Request/Response structures used internally. -use jsonrpc_core::Call; -use jsonrpc_core::Error; -use jsonrpc_core::Id; -use jsonrpc_core::MethodCall; -use jsonrpc_core::Notification; -use jsonrpc_core::Params; -use jsonrpc_core::Version; -use jsonrpc_pubsub::SubscriptionId; -use serde::Deserialize; -use serde::Serialize; -use serde_json::Value; - -use super::client::CallMessage; -use super::client::NotifyMessage; -use super::client::RpcError; - -/// Creates JSON-RPC requests -pub struct RequestBuilder { - id: u64, -} - -impl RequestBuilder { - /// Create a new RequestBuilder - #[allow(clippy::new_without_default)] - pub fn new() -> Self { - RequestBuilder { id: 0 } - } - - fn next_id(&mut self) -> Id { - let id = self.id; - self.id = id + 1; - Id::Num(id) - } - - /// Build a single request with the next available id - pub fn single_request(&mut self, method: String, params: Params) -> (Id, String) { - let id = self.next_id(); - let request = jsonrpc_core::Request::Single(Call::MethodCall(MethodCall { - jsonrpc: Some(Version::V2), - method, - params, - id: id.clone(), - })); - ( - id, - serde_json::to_string(&request).expect("Request serialization is infallible; qed"), - ) - } - - /// call single_request - pub fn call_request(&mut self, msg: &CallMessage) -> (Id, String) { - self.single_request(msg.method.clone(), msg.params.clone()) - } - - /// subscribe request - pub fn subscribe_request( - &mut self, - subscribe: String, - subscribe_params: Params, - ) -> (Id, String) { - self.single_request(subscribe, subscribe_params) - } - - /// unsubscribe request - pub fn unsubscribe_request( - &mut self, - unsubscribe: String, - sid: SubscriptionId, - ) -> (Id, String) { - self.single_request(unsubscribe, Params::Array(vec![Value::from(sid)])) - } - - /// notification request - pub fn notification(&mut self, msg: &NotifyMessage) -> String { - let request = jsonrpc_core::Request::Single(Call::Notification(Notification { - jsonrpc: Some(Version::V2), - method: msg.method.clone(), - params: msg.params.clone(), - })); - serde_json::to_string(&request).expect("Request serialization is infallible; qed") - } -} - -/// Parse raw string into a single JSON value, together with the request Id. -/// -/// This method will attempt to parse a JSON-RPC response object (either `Failure` or `Success`) -/// and a `Notification` (for Subscriptions). -/// Note that if you have more specific expectations about the returned type and don't want -/// to handle all of them it might be best to deserialize on your own. -#[allow(clippy::type_complexity)] -pub fn parse_response( - response: &str, -) -> Result< - ( - Id, - Result, - Option, - Option, - ), - RpcError, -> { - jsonrpc_core::serde_from_str::(response) - .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e))) - .map(|response| { - let id = response.id().unwrap_or(Id::Null); - let sid = response.subscription_id(); - let method = response.method(); - let value: Result = response.into(); - let result = value.map_err(RpcError::JsonRpcError); - (id, result, method, sid) - }) -} - -/// A type representing all possible values sent from the server to the client. -#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -#[serde(untagged)] -pub enum ClientResponse { - /// A regular JSON-RPC request output (single response). - Output(jsonrpc_core::Output), - /// A notification. - Notification(jsonrpc_core::Notification), -} - -impl ClientResponse { - /// Get the id of the response (if any). - pub fn id(&self) -> Option { - match *self { - ClientResponse::Output(ref output) => Some(output.id().clone()), - ClientResponse::Notification(_) => None, - } - } - - /// Get the method name if the output is a notification. - pub fn method(&self) -> Option { - match *self { - ClientResponse::Notification(ref n) => Some(n.method.to_owned()), - ClientResponse::Output(_) => None, - } - } - - /// Parses the response into a subscription id. - pub fn subscription_id(&self) -> Option { - match *self { - ClientResponse::Notification(ref n) => match &n.params { - jsonrpc_core::Params::Map(map) => match map.get("subscription") { - Some(value) => SubscriptionId::parse_value(value), - None => None, - }, - _ => None, - }, - _ => None, - } - } -} - -impl From for Result { - fn from(res: ClientResponse) -> Self { - match res { - ClientResponse::Output(output) => output.into(), - ClientResponse::Notification(n) => match &n.params { - Params::Map(map) => { - let subscription = map.get("subscription"); - let result = map.get("result"); - let error = map.get("error"); - - match (subscription, result, error) { - (Some(_), Some(result), _) => Ok(result.to_owned()), - (Some(_), _, Some(error)) => { - let error = serde_json::from_value::(error.to_owned()) - .ok() - .unwrap_or_else(Error::parse_error); - Err(error) - } - _ => Ok(n.params.into()), - } - } - _ => Ok(n.params.into()), - }, - } - } -} diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index aeeb5cd51..cf95cdc66 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,9 +1,7 @@ //! rings rpc library -pub mod client; pub mod error; -pub mod jsonrpc_client; +pub mod jsonrpc; pub mod method; pub mod prelude; pub mod protos; -pub mod response; pub mod types; diff --git a/rpc/src/method.rs b/rpc/src/method.rs index fef35a68f..0ab7d3765 100644 --- a/rpc/src/method.rs +++ b/rpc/src/method.rs @@ -1,4 +1,4 @@ -//! A JSONRPC `method` enum. +//! Rpc methods. #![warn(missing_docs)] use super::error::Error; @@ -31,7 +31,7 @@ pub enum Method { /// Append data to topic PublishMessageToTopic, /// Fetch data of topic - FetchMessagesOfTopic, + FetchTopicMessages, /// Register service RegisterService, /// Lookup service @@ -57,7 +57,7 @@ impl Method { Method::SendCustomMessage => "sendCustomMessage", Method::SendBackendMessage => "sendBackendMessage", Method::PublishMessageToTopic => "publishMessageToTopic", - Method::FetchMessagesOfTopic => "fetchMessagesOfTopic", + Method::FetchTopicMessages => "fetchTopicMessages", Method::RegisterService => "registerService", Method::LookupService => "lookupService", Method::NodeInfo => "nodeInfo", @@ -88,7 +88,7 @@ impl TryFrom<&str> for Method { "sendBackendMessage" => Self::SendBackendMessage, "sendCustomMessage" => Self::SendCustomMessage, "publishMessageToTopic" => Method::PublishMessageToTopic, - "fetchMessagesOfTopic" => Method::FetchMessagesOfTopic, + "fetchTopicMessages" => Method::FetchTopicMessages, "registerService" => Method::RegisterService, "lookupService" => Method::LookupService, "nodeInfo" => Method::NodeInfo, diff --git a/rpc/src/prelude.rs b/rpc/src/prelude.rs index ef86a4ff1..10eb07ee9 100644 --- a/rpc/src/prelude.rs +++ b/rpc/src/prelude.rs @@ -1,8 +1,4 @@ -pub use http; -pub use jsonrpc_core; -pub use jsonrpc_pubsub; #[cfg(feature = "std")] pub use reqwest; #[cfg(feature = "wasm")] pub use reqwest_wasm as reqwest; -pub use rings_core; diff --git a/rpc/src/protos/build_config.yaml b/rpc/src/protos/build_config.yaml index bcf48908f..3c6ef1221 100644 --- a/rpc/src/protos/build_config.yaml +++ b/rpc/src/protos/build_config.yaml @@ -33,8 +33,8 @@ messages: - rings_node.SendBackendMessageResponse - rings_node.PublishMessageToTopicRequest - rings_node.PublishMessageToTopicResponse - - rings_node.FetchMessagesOfTopicRequest - - rings_node.FetchMessagesOfTopicResponse + - rings_node.FetchTopicMessagesRequest + - rings_node.FetchTopicMessagesResponse - rings_node.RegisterServiceRequest - rings_node.RegisterServiceResponse - rings_node.LookupServiceRequest diff --git a/rpc/src/protos/rings_node.proto b/rpc/src/protos/rings_node.proto index 4e846bc1d..6c87796a9 100644 --- a/rpc/src/protos/rings_node.proto +++ b/rpc/src/protos/rings_node.proto @@ -1,12 +1,17 @@ syntax = "proto3"; package rings_node; +message PeerInfo { + string did = 1; + string state = 2; +} + message ConnectPeerViaHttpRequest { string url = 1; } message ConnectPeerViaHttpResponse { - string did = 1; + PeerInfo peer = 1; } message ConnectWithDidRequest { @@ -26,11 +31,6 @@ message ConnectWithSeedRequest { message ConnectWithSeedResponse {} -message PeerInfo { - string did = 1; - string state = 2; -} - message ListPeersRequest {} message ListPeersResponse { @@ -88,12 +88,12 @@ message PublishMessageToTopicRequest { message PublishMessageToTopicResponse {} -message FetchMessagesOfTopicRequest { +message FetchTopicMessagesRequest { string topic = 1; int64 skip = 2; } -message FetchMessagesOfTopicResponse { +message FetchTopicMessagesResponse { repeated string data = 1; } @@ -184,7 +184,7 @@ service InternalService { // Append data to topic rpc PublishMessageToTopic(PublishMessageToTopicRequest) returns (PublishMessageToTopicResponse); // Fetch data of topic - rpc FetchMessagesOfTopic(FetchMessagesOfTopicRequest) returns (FetchMessagesOfTopicResponse); + rpc FetchTopicMessages(FetchTopicMessagesRequest) returns (FetchTopicMessagesResponse); // Register service rpc RegisterService(RegisterServiceRequest) returns (RegisterServiceResponse); // Lookup service diff --git a/rpc/src/protos/rings_node.rs b/rpc/src/protos/rings_node.rs index 6c0367daf..33ffa6fb4 100644 --- a/rpc/src/protos/rings_node.rs +++ b/rpc/src/protos/rings_node.rs @@ -1,6 +1,15 @@ #[derive(serde::Serialize, serde::Deserialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PeerInfo { + #[prost(string, tag = "1")] + pub did: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub state: ::prost::alloc::string::String, +} +#[derive(serde::Serialize, serde::Deserialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ConnectPeerViaHttpRequest { #[prost(string, tag = "1")] pub url: ::prost::alloc::string::String, @@ -9,8 +18,8 @@ pub struct ConnectPeerViaHttpRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ConnectPeerViaHttpResponse { - #[prost(string, tag = "1")] - pub did: ::prost::alloc::string::String, + #[prost(message, optional, tag = "1")] + pub peer: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -46,15 +55,6 @@ pub struct ConnectWithSeedResponse {} #[derive(serde::Serialize, serde::Deserialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PeerInfo { - #[prost(string, tag = "1")] - pub did: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub state: ::prost::alloc::string::String, -} -#[derive(serde::Serialize, serde::Deserialize)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct ListPeersRequest {} #[derive(serde::Serialize, serde::Deserialize)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -158,7 +158,7 @@ pub struct PublishMessageToTopicResponse {} #[derive(serde::Serialize, serde::Deserialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct FetchMessagesOfTopicRequest { +pub struct FetchTopicMessagesRequest { #[prost(string, tag = "1")] pub topic: ::prost::alloc::string::String, #[prost(int64, tag = "2")] @@ -167,7 +167,7 @@ pub struct FetchMessagesOfTopicRequest { #[derive(serde::Serialize, serde::Deserialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct FetchMessagesOfTopicResponse { +pub struct FetchTopicMessagesResponse { #[prost(string, repeated, tag = "1")] pub data: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } diff --git a/rpc/src/protos/rings_node_handler.rs b/rpc/src/protos/rings_node_handler.rs index 6eed434f4..8c8ec9925 100644 --- a/rpc/src/protos/rings_node_handler.rs +++ b/rpc/src/protos/rings_node_handler.rs @@ -1,12 +1,12 @@ use std::sync::Arc; use async_trait::async_trait; +use jsonrpc_core::types::error::Error; +use jsonrpc_core::types::error::ErrorCode; +use jsonrpc_core::Result; use super::rings_node::*; use crate::method::Method; -use crate::prelude::jsonrpc_core::types::error::Error; -use crate::prelude::jsonrpc_core::types::error::ErrorCode; -use crate::prelude::jsonrpc_core::Result; /// Used for processor to match rpc request and response. #[cfg_attr(feature = "wasm", async_trait(?Send))] @@ -44,7 +44,7 @@ impl InternalRpcHandler { + HandleRpc + HandleRpc + HandleRpc - + HandleRpc + + HandleRpc + HandleRpc + HandleRpc + HandleRpc @@ -123,8 +123,8 @@ impl InternalRpcHandler { let resp = processor.handle_rpc(req).await?; serde_json::to_value(resp).map_err(|_| Error::new(ErrorCode::ParseError)) } - Method::FetchMessagesOfTopic => { - let req = serde_json::from_value::(params) + Method::FetchTopicMessages => { + let req = serde_json::from_value::(params) .map_err(|e| Error::invalid_params(e.to_string()))?; let resp = processor.handle_rpc(req).await?; serde_json::to_value(resp).map_err(|_| Error::new(ErrorCode::ParseError)) diff --git a/rpc/src/response.rs b/rpc/src/response.rs deleted file mode 100644 index f60664be3..000000000 --- a/rpc/src/response.rs +++ /dev/null @@ -1,50 +0,0 @@ -//! A JSONRPC response. - -use serde::Deserialize; -use serde::Serialize; -use serde_json::Value as JsonValue; - -use crate::error::Error; -use crate::error::Result; -use crate::prelude::rings_core::inspect::SwarmInspect; - -/// Peer contains transport address and state information. -#[derive(Deserialize, Serialize, Clone, Debug)] -pub struct Peer { - /// a processor' address - pub did: String, - /// a transport protocol using in swarm instance - pub cid: String, - /// transport ice connection state - pub state: String, -} - -impl Peer { - pub fn to_json_vec(&self) -> Result> { - serde_json::to_vec(self).map_err(|_| Error::EncodeError) - } - - pub fn to_json_obj(&self) -> Result { - serde_json::to_value(self).map_err(|_| Error::EncodeError) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SendMessageResponse { - pub tx_id: String, -} - -impl From for SendMessageResponse { - fn from(v: String) -> Self { - Self { tx_id: v } - } -} - -/// NodeInfo struct -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct NodeInfo { - /// node version - pub version: String, - /// swarm inspect info - pub swarm: SwarmInspect, -}