From 154e49b81cd874e89a220d39b03ae9180d165cad Mon Sep 17 00:00:00 2001 From: haerdib Date: Fri, 3 Nov 2023 13:39:38 +0100 Subject: [PATCH 01/21] fix error --- .../transfer_with_tungstenite_client.rs | 20 ++++++++++-- examples/examples/transfer_with_ws_client.rs | 20 ++++++++++-- src/rpc/tungstenite_client/client.rs | 18 +++++------ src/rpc/ws_client/mod.rs | 31 ++++++++++--------- 4 files changed, 60 insertions(+), 29 deletions(-) diff --git a/examples/examples/transfer_with_tungstenite_client.rs b/examples/examples/transfer_with_tungstenite_client.rs index e60695190..2fefa986c 100755 --- a/examples/examples/transfer_with_tungstenite_client.rs +++ b/examples/examples/transfer_with_tungstenite_client.rs @@ -55,8 +55,22 @@ fn main() { let bob_balance = api.get_account_data(&bob.into()).unwrap().unwrap_or_default().free; println!("[+] Bob's Free Balance is {}\n", bob_balance); - // Generate extrinsic. - let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), 1000000000000); + // We first generate an extrinsic that will fail to be executed due to missing funds. + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); + println!( + "Sending an extrinsic from Alice (Key = {}),\n\nto Bob (Key = {})\n", + alice.public(), + bob + ); + println!("[+] Composed extrinsic: {:?}\n", xt); + + // Send and watch extrinsic until it fails onchain. + let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); + assert!(result.is_err()); + println!("[+] Extrinsic did not get included due to: {:?}\n", result); + + // This time, we generate an extrinsic that will succeed. + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); println!( "Sending an extrinsic from Alice (Key = {}),\n\nto Bob (Key = {})\n", alice.public(), @@ -70,7 +84,7 @@ fn main() { .unwrap() .block_hash .unwrap(); - println!("[+] Extrinsic got included. Hash: {:?}\n", block_hash); + println!("[+] Extrinsic got included. Block Hash: {:?}\n", block_hash); // Verify that Bob's free Balance increased. let bob_new_balance = api.get_account_data(&bob.into()).unwrap().unwrap().free; diff --git a/examples/examples/transfer_with_ws_client.rs b/examples/examples/transfer_with_ws_client.rs index 14581bd07..2adb39bbf 100755 --- a/examples/examples/transfer_with_ws_client.rs +++ b/examples/examples/transfer_with_ws_client.rs @@ -54,8 +54,22 @@ fn main() { let bob_balance = api.get_account_data(&bob.into()).unwrap().unwrap_or_default().free; println!("[+] Bob's Free Balance is {}\n", bob_balance); - // Generate extrinsic. - let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), 1000000000000); + // We first generate an extrinsic that will fail to be executed due to missing funds. + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); + println!( + "Sending an extrinsic from Alice (Key = {}),\n\nto Bob (Key = {})\n", + alice.public(), + bob + ); + println!("[+] Composed extrinsic: {:?}\n", xt); + + // Send and watch extrinsic until it fails onchain. + let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); + assert!(result.is_err()); + println!("[+] Extrinsic did not get included due to: {:?}\n", result); + + // This time, we generate an extrinsic that will succeed. + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); println!( "Sending an extrinsic from Alice (Key = {}),\n\nto Bob (Key = {})\n", alice.public(), @@ -69,7 +83,7 @@ fn main() { .unwrap() .block_hash .unwrap(); - println!("[+] Extrinsic got included. Hash: {:?}\n", block_hash); + println!("[+] Extrinsic got included. Block Hash: {:?}\n", block_hash); // Verify that Bob's free Balance increased. let bob_new_balance = api.get_account_data(&bob.into()).unwrap().unwrap().free; diff --git a/src/rpc/tungstenite_client/client.rs b/src/rpc/tungstenite_client/client.rs index 983d31743..fbecb3c11 100644 --- a/src/rpc/tungstenite_client/client.rs +++ b/src/rpc/tungstenite_client/client.rs @@ -151,15 +151,15 @@ fn send_message_to_client(result_in: ThreadOut, message: &str) -> Result debug!("got on_subscription_msg {}", message); let value: Value = serde_json::from_str(message)?; - match value["id"].as_str() { - Some(_idstr) => { - warn!("Expected subscription, but received an id response instead: {:?}", value); - }, - None => { - let message = serde_json::to_string(&value["params"]["result"])?; - result_in.send(message)?; - }, - }; + // We currently do not differentiate between different subscription Ids, we simply + // forward them all to the user. + if let Some(_subscription_id) = value["params"]["subscription"].as_str() { + let message = serde_json::to_string(&value["params"]["result"])?; + result_in.send(message)?; + } else if let Some(error_message) = value["error"]["message"].as_str() { + result_in.send(error_message.to_string())?; + } + Ok(()) } diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index 2ea1eac7a..e31dd2216 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -115,20 +115,23 @@ impl HandleMessage for SubscriptionHandler { info!("got on_subscription_msg {}", msg); let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; - match value["id"].as_str() { - Some(_idstr) => { - warn!("Expected subscription, but received an id response instead: {:?}", value); - }, - None => { - let answer = serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?; - - if let Err(e) = result.send(answer) { - // This may happen if the receiver has unsubscribed. - trace!("SendError: {}. will close ws", e); - out.close(CloseCode::Normal)?; - } - }, - }; + // We currently do not differentiate between different subscription Ids, we simply + // forward them all to the user. + if let Some(_subscription_id) = value["params"]["subscription"].as_str() { + let message = serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?; + if let Err(e) = result.send(message) { + // This may happen if the receiver has unsubscribed. + trace!("SendError: {}. will close ws", e); + out.close(CloseCode::Normal)?; + } + } else if let Some(error_message) = value["error"]["message"].as_str() { + if let Err(e) = result.send(error_message.to_string()) { + // This may happen if the receiver has unsubscribed. + trace!("SendError: {}. will close ws", e); + out.close(CloseCode::Normal)?; + } + } + Ok(()) } } From f470b0a12276b82664656e01f51a47a08f2d0185 Mon Sep 17 00:00:00 2001 From: haerdib Date: Fri, 3 Nov 2023 14:14:14 +0100 Subject: [PATCH 02/21] fix clippy warning --- src/rpc/ws_client/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/ws_client/client.rs b/src/rpc/ws_client/client.rs index d5ba7ac03..bd7b53bcf 100644 --- a/src/rpc/ws_client/client.rs +++ b/src/rpc/ws_client/client.rs @@ -50,7 +50,7 @@ impl WsRpcClient { impl Request for WsRpcClient { async fn request(&self, method: &str, params: RpcParams) -> Result { let json_req = to_json_req(method, params)?; - let response = self.direct_rpc_request(json_req, RequestHandler::default())??; + let response = self.direct_rpc_request(json_req, RequestHandler)??; let deserialized_value: R = serde_json::from_str(&response)?; Ok(deserialized_value) } From da110784525fb1531246a744dfd6dcd58561c13d Mon Sep 17 00:00:00 2001 From: haerdib Date: Fri, 3 Nov 2023 17:11:39 +0100 Subject: [PATCH 03/21] seems to wrok --- src/rpc/error.rs | 1 + src/rpc/tungstenite_client/client.rs | 12 +++++--- src/rpc/tungstenite_client/subscription.rs | 7 +++-- src/rpc/ws_client/mod.rs | 36 ++++++++++++---------- src/rpc/ws_client/subscription.rs | 7 +++-- 5 files changed, 38 insertions(+), 25 deletions(-) diff --git a/src/rpc/error.rs b/src/rpc/error.rs index b459f23d8..2d3411afe 100644 --- a/src/rpc/error.rs +++ b/src/rpc/error.rs @@ -22,6 +22,7 @@ pub type Result = core::result::Result; #[derive(Debug)] pub enum Error { SerdeJson(serde_json::error::Error), + UnexpectedResponse(String), MpscSend(String), InvalidUrl(String), RecvError(String), diff --git a/src/rpc/tungstenite_client/client.rs b/src/rpc/tungstenite_client/client.rs index fbecb3c11..50a0b811c 100644 --- a/src/rpc/tungstenite_client/client.rs +++ b/src/rpc/tungstenite_client/client.rs @@ -148,16 +148,18 @@ pub fn do_reconnect(error: &RpcClientError) -> bool { } fn send_message_to_client(result_in: ThreadOut, message: &str) -> Result<()> { - debug!("got on_subscription_msg {}", message); + error!("got on_subscription_msg {}", message); let value: Value = serde_json::from_str(message)?; // We currently do not differentiate between different subscription Ids, we simply // forward them all to the user. if let Some(_subscription_id) = value["params"]["subscription"].as_str() { - let message = serde_json::to_string(&value["params"]["result"])?; - result_in.send(message)?; - } else if let Some(error_message) = value["error"]["message"].as_str() { - result_in.send(error_message.to_string())?; + result_in.send(serde_json::to_string(&value["params"]["result"])?)?; + } else if let Some(_error) = value["error"].as_str() { + result_in.send(serde_json::to_string(&value["error"]["message"])?)?; + } else { + // Id string is accepted, since it is the immediate response to a subscription message. + error!("Got subscription id {}", serde_json::to_string(&value["result"])?); } Ok(()) diff --git a/src/rpc/tungstenite_client/subscription.rs b/src/rpc/tungstenite_client/subscription.rs index f06abdd42..d39815d64 100644 --- a/src/rpc/tungstenite_client/subscription.rs +++ b/src/rpc/tungstenite_client/subscription.rs @@ -15,7 +15,7 @@ */ -use crate::rpc::{HandleSubscription, Result}; +use crate::rpc::{Error, HandleSubscription, Result}; use core::marker::PhantomData; use serde::de::DeserializeOwned; use std::sync::mpsc::Receiver; @@ -42,7 +42,10 @@ impl HandleSubscription // Sender was disconnected, therefore no further messages are to be expected. Err(_e) => return None, }; - Some(serde_json::from_str(¬ification).map_err(|e| e.into())) + Some( + serde_json::from_str(¬ification) + .map_err(|_| Error::UnexpectedResponse(notification)), + ) } async fn unsubscribe(self) -> Result<()> { diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index e31dd2216..1c05501af 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -112,26 +112,30 @@ impl HandleMessage for SubscriptionHandler { let out = &context.out; let msg = &context.msg; - info!("got on_subscription_msg {}", msg); + error!("got on_subscription_msg {}", msg); let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; // We currently do not differentiate between different subscription Ids, we simply // forward them all to the user. - if let Some(_subscription_id) = value["params"]["subscription"].as_str() { - let message = serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?; - if let Err(e) = result.send(message) { - // This may happen if the receiver has unsubscribed. - trace!("SendError: {}. will close ws", e); - out.close(CloseCode::Normal)?; - } - } else if let Some(error_message) = value["error"]["message"].as_str() { - if let Err(e) = result.send(error_message.to_string()) { - // This may happen if the receiver has unsubscribed. - trace!("SendError: {}. will close ws", e); - out.close(CloseCode::Normal)?; - } - } - + let result = if let Some(_subscription_id) = value["params"]["subscription"].as_str() { + result.send(serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?) + } else if let Some(error) = value["error"].as_str() { + info!("Error {}", error); + result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?) + } else { + // Id string is accepted, since it is the immediate response to a subscription message. + error!( + "Got subscription id {}", + serde_json::to_string(&value["result"]).map_err(Box::new)? + ); + Ok(()) + }; + + if let Err(e) = result { + // This may happen if the receiver has unsubscribed. + trace!("SendError: {}. will close ws", e); + out.close(CloseCode::Normal)?; + }; Ok(()) } } diff --git a/src/rpc/ws_client/subscription.rs b/src/rpc/ws_client/subscription.rs index 0358e7ad7..be75048c5 100644 --- a/src/rpc/ws_client/subscription.rs +++ b/src/rpc/ws_client/subscription.rs @@ -15,7 +15,7 @@ */ -use crate::rpc::{HandleSubscription, Result}; +use crate::rpc::{Error, HandleSubscription, Result}; use core::marker::PhantomData; use serde::de::DeserializeOwned; use std::sync::mpsc::Receiver; @@ -44,7 +44,10 @@ impl HandleSubscription // Sender was disconnected, therefore no further messages are to be expected. Err(_) => return None, }; - Some(serde_json::from_str(¬ification).map_err(|e| e.into())) + Some( + serde_json::from_str(¬ification) + .map_err(|_| Error::UnexpectedResponse(notification)), + ) } async fn unsubscribe(self) -> Result<()> { From fa54d27552abc2b58faabfc1baa0954b692ff6d4 Mon Sep 17 00:00:00 2001 From: haerdib Date: Fri, 3 Nov 2023 18:19:16 +0100 Subject: [PATCH 04/21] check for subscirption id --- src/rpc/error.rs | 2 +- src/rpc/tungstenite_client/client.rs | 39 ++++++++++---- src/rpc/tungstenite_client/subscription.rs | 5 +- src/rpc/ws_client/mod.rs | 63 +++++++++++++++------- src/rpc/ws_client/subscription.rs | 5 +- 5 files changed, 75 insertions(+), 39 deletions(-) diff --git a/src/rpc/error.rs b/src/rpc/error.rs index 2d3411afe..8f7e635f5 100644 --- a/src/rpc/error.rs +++ b/src/rpc/error.rs @@ -22,7 +22,7 @@ pub type Result = core::result::Result; #[derive(Debug)] pub enum Error { SerdeJson(serde_json::error::Error), - UnexpectedResponse(String), + ExtrinsicFailed(String), MpscSend(String), InvalidUrl(String), RecvError(String), diff --git a/src/rpc/tungstenite_client/client.rs b/src/rpc/tungstenite_client/client.rs index 50a0b811c..03c732665 100644 --- a/src/rpc/tungstenite_client/client.rs +++ b/src/rpc/tungstenite_client/client.rs @@ -134,9 +134,27 @@ fn subscribe_to_server( // Subscribe to server socket.send(Message::Text(json_req))?; + // Read the first message response - must be the subscription id. + let msg = read_until_text_message(&mut socket)?; + let value: Value = serde_json::from_str(&msg)?; + + let subcription_id = match value["result"].as_str() { + Some(id) => id, + None => { + error!("response: {:?} ", value["error"]); + + let message = match value["error"]["message"].is_string() { + true => serde_json::to_string(&value["error"])?, + false => format!("Received unexpected response: {}", msg), + }; + result_in.send(message)?; + return Ok(()) + }, + }; + loop { let msg = read_until_text_message(&mut socket)?; - send_message_to_client(result_in.clone(), msg.as_str())?; + send_message_to_client(result_in.clone(), &msg, subcription_id)?; } } @@ -147,19 +165,18 @@ pub fn do_reconnect(error: &RpcClientError) -> bool { ) } -fn send_message_to_client(result_in: ThreadOut, message: &str) -> Result<()> { +fn send_message_to_client( + result_in: ThreadOut, + message: &str, + subscription_id: &str, +) -> Result<()> { error!("got on_subscription_msg {}", message); let value: Value = serde_json::from_str(message)?; - // We currently do not differentiate between different subscription Ids, we simply - // forward them all to the user. - if let Some(_subscription_id) = value["params"]["subscription"].as_str() { - result_in.send(serde_json::to_string(&value["params"]["result"])?)?; - } else if let Some(_error) = value["error"].as_str() { - result_in.send(serde_json::to_string(&value["error"]["message"])?)?; - } else { - // Id string is accepted, since it is the immediate response to a subscription message. - error!("Got subscription id {}", serde_json::to_string(&value["result"])?); + if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() { + if subscription_id == msg_subscription_id { + result_in.send(serde_json::to_string(&value["params"]["result"])?)?; + } } Ok(()) diff --git a/src/rpc/tungstenite_client/subscription.rs b/src/rpc/tungstenite_client/subscription.rs index d39815d64..75b84e265 100644 --- a/src/rpc/tungstenite_client/subscription.rs +++ b/src/rpc/tungstenite_client/subscription.rs @@ -42,10 +42,7 @@ impl HandleSubscription // Sender was disconnected, therefore no further messages are to be expected. Err(_e) => return None, }; - Some( - serde_json::from_str(¬ification) - .map_err(|_| Error::UnexpectedResponse(notification)), - ) + Some(serde_json::from_str(¬ification).map_err(|_| Error::ExtrinsicFailed(notification))) } async fn unsubscribe(self) -> Result<()> { diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index 1c05501af..ae2007b26 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -33,7 +33,7 @@ pub(crate) trait HandleMessage { type ThreadMessage; type Context; - fn handle_message(&self, context: &mut Self::Context) -> WsResult<()>; + fn handle_message(&mut self, context: &mut Self::Context) -> WsResult<()>; } // Clippy says request is never used, even though it is.. @@ -83,7 +83,7 @@ impl HandleMessage for RequestHandler { type ThreadMessage = RpcMessage; type Context = MessageContext; - fn handle_message(&self, context: &mut Self::Context) -> WsResult<()> { + fn handle_message(&mut self, context: &mut Self::Context) -> WsResult<()> { let result = &context.result; let out = &context.out; let msg = &context.msg; @@ -101,13 +101,15 @@ impl HandleMessage for RequestHandler { } #[derive(Default, Debug, PartialEq, Eq, Clone)] -pub(crate) struct SubscriptionHandler {} +pub(crate) struct SubscriptionHandler { + subscription_id: Option, +} impl HandleMessage for SubscriptionHandler { type ThreadMessage = String; type Context = MessageContext; - fn handle_message(&self, context: &mut Self::Context) -> WsResult<()> { + fn handle_message(&mut self, context: &mut Self::Context) -> WsResult<()> { let result = &context.result; let out = &context.out; let msg = &context.msg; @@ -115,23 +117,46 @@ impl HandleMessage for SubscriptionHandler { error!("got on_subscription_msg {}", msg); let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; - // We currently do not differentiate between different subscription Ids, we simply - // forward them all to the user. - let result = if let Some(_subscription_id) = value["params"]["subscription"].as_str() { - result.send(serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?) - } else if let Some(error) = value["error"].as_str() { - info!("Error {}", error); - result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?) - } else { - // Id string is accepted, since it is the immediate response to a subscription message. - error!( - "Got subscription id {}", - serde_json::to_string(&value["result"]).map_err(Box::new)? - ); - Ok(()) + if value["error"]["message"].is_string() { + result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?); + out.close(CloseCode::Normal)?; + return Ok(()) + } + + let mut send_result = Ok(()); + match self.subscription_id.clone() { + Some(id) => { + if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() { + if id == msg_subscription_id { + send_result = result.send( + serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?, + ); + } + } + }, + None => match value["result"].as_str() { + Some(id) => self.subscription_id = Some(id.to_string()), + None => { + send_result = result.send(format!("Received unexpected response: {}", msg)); + }, + }, }; - if let Err(e) = result { + // let result = if let Some(_subscription_id) = value["params"]["subscription"].as_str() { + // result.send(serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?) + // } else if let Some(error) = value["error"].as_str() { + // info!("Error {}", error); + // result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?) + // } else { + // // Id string is accepted, since it is the immediate response to a subscription message. + // error!( + // "Got subscription id {}", + // serde_json::to_string(&value["result"]).map_err(Box::new)? + // ); + // Ok(()) + // }; + + if let Err(e) = send_result { // This may happen if the receiver has unsubscribed. trace!("SendError: {}. will close ws", e); out.close(CloseCode::Normal)?; diff --git a/src/rpc/ws_client/subscription.rs b/src/rpc/ws_client/subscription.rs index be75048c5..0f86d9808 100644 --- a/src/rpc/ws_client/subscription.rs +++ b/src/rpc/ws_client/subscription.rs @@ -44,10 +44,7 @@ impl HandleSubscription // Sender was disconnected, therefore no further messages are to be expected. Err(_) => return None, }; - Some( - serde_json::from_str(¬ification) - .map_err(|_| Error::UnexpectedResponse(notification)), - ) + Some(serde_json::from_str(¬ification).map_err(|_| Error::ExtrinsicFailed(notification))) } async fn unsubscribe(self) -> Result<()> { From 66b36d3cdc91202c4cf5f49137f79d825f6d0449 Mon Sep 17 00:00:00 2001 From: haerdib Date: Fri, 3 Nov 2023 18:20:17 +0100 Subject: [PATCH 05/21] fix clippy --- src/rpc/ws_client/mod.rs | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index ae2007b26..ab332b851 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -118,7 +118,7 @@ impl HandleMessage for SubscriptionHandler { let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; if value["error"]["message"].is_string() { - result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?); + let _ result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?); out.close(CloseCode::Normal)?; return Ok(()) } @@ -142,20 +142,6 @@ impl HandleMessage for SubscriptionHandler { }, }; - // let result = if let Some(_subscription_id) = value["params"]["subscription"].as_str() { - // result.send(serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?) - // } else if let Some(error) = value["error"].as_str() { - // info!("Error {}", error); - // result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?) - // } else { - // // Id string is accepted, since it is the immediate response to a subscription message. - // error!( - // "Got subscription id {}", - // serde_json::to_string(&value["result"]).map_err(Box::new)? - // ); - // Ok(()) - // }; - if let Err(e) = send_result { // This may happen if the receiver has unsubscribed. trace!("SendError: {}. will close ws", e); From b110bef40f01b187437c407eb6dc2ac9218f700c Mon Sep 17 00:00:00 2001 From: haerdib Date: Fri, 3 Nov 2023 18:21:00 +0100 Subject: [PATCH 06/21] fix typo --- src/rpc/ws_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index ab332b851..0e633a531 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -118,7 +118,7 @@ impl HandleMessage for SubscriptionHandler { let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; if value["error"]["message"].is_string() { - let _ result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?); + let _ = result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?); out.close(CloseCode::Normal)?; return Ok(()) } From 71ce17d6b7d8a6434f9b86468dd28a267bf29377 Mon Sep 17 00:00:00 2001 From: haerdib Date: Fri, 3 Nov 2023 18:23:35 +0100 Subject: [PATCH 07/21] remove error logs --- src/rpc/tungstenite_client/client.rs | 2 -- src/rpc/ws_client/mod.rs | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/rpc/tungstenite_client/client.rs b/src/rpc/tungstenite_client/client.rs index 03c732665..4081cf34d 100644 --- a/src/rpc/tungstenite_client/client.rs +++ b/src/rpc/tungstenite_client/client.rs @@ -141,8 +141,6 @@ fn subscribe_to_server( let subcription_id = match value["result"].as_str() { Some(id) => id, None => { - error!("response: {:?} ", value["error"]); - let message = match value["error"]["message"].is_string() { true => serde_json::to_string(&value["error"])?, false => format!("Received unexpected response: {}", msg), diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index 0e633a531..b944a2a99 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -114,7 +114,7 @@ impl HandleMessage for SubscriptionHandler { let out = &context.out; let msg = &context.msg; - error!("got on_subscription_msg {}", msg); + info!("got on_subscription_msg {}", msg); let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; if value["error"]["message"].is_string() { From 524b196b528d61277535af3b73ccf6be9be51ae5 Mon Sep 17 00:00:00 2001 From: haerdib Date: Fri, 3 Nov 2023 18:25:15 +0100 Subject: [PATCH 08/21] add info log --- src/rpc/tungstenite_client/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/tungstenite_client/client.rs b/src/rpc/tungstenite_client/client.rs index 4081cf34d..e0e8765a1 100644 --- a/src/rpc/tungstenite_client/client.rs +++ b/src/rpc/tungstenite_client/client.rs @@ -168,7 +168,7 @@ fn send_message_to_client( message: &str, subscription_id: &str, ) -> Result<()> { - error!("got on_subscription_msg {}", message); + info!("got on_subscription_msg {}", message); let value: Value = serde_json::from_str(message)?; if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() { From 877020637f37a42fc0b7b128bd7a29b4d8995a99 Mon Sep 17 00:00:00 2001 From: haerdib Date: Fri, 3 Nov 2023 18:30:30 +0100 Subject: [PATCH 09/21] we expect an immediate response --- src/rpc/ws_client/mod.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index b944a2a99..487d6017d 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -117,12 +117,6 @@ impl HandleMessage for SubscriptionHandler { info!("got on_subscription_msg {}", msg); let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; - if value["error"]["message"].is_string() { - let _ = result.send(serde_json::to_string(&value["error"]).map_err(Box::new)?); - out.close(CloseCode::Normal)?; - return Ok(()) - } - let mut send_result = Ok(()); match self.subscription_id.clone() { Some(id) => { @@ -137,7 +131,13 @@ impl HandleMessage for SubscriptionHandler { None => match value["result"].as_str() { Some(id) => self.subscription_id = Some(id.to_string()), None => { - send_result = result.send(format!("Received unexpected response: {}", msg)); + let message = match value["error"]["message"].is_string() { + true => serde_json::to_string(&value["error"]).map_err(Box::new)?, + false => format!("Received unexpected response: {}", msg), + }; + let _ = result.send(message); + out.close(CloseCode::Normal)?; + return Ok(()) }, }, }; From e3c5206970903e2364b95ab4bcb0241aa1a9225f Mon Sep 17 00:00:00 2001 From: haerdib Date: Mon, 6 Nov 2023 09:53:53 +0100 Subject: [PATCH 10/21] add tests for rpc client --- .github/workflows/ci.yml | 4 +- testing/examples/tungstenite_client_test.rs | 63 +++++++++++++++++++++ testing/examples/ws_client_test.rs | 63 +++++++++++++++++++++ 3 files changed, 129 insertions(+), 1 deletion(-) create mode 100755 testing/examples/tungstenite_client_test.rs create mode 100755 testing/examples/ws_client_test.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 03aab46fe..ecaa32273 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -153,8 +153,10 @@ jobs: pallet_balances_tests, pallet_transaction_payment_tests, state_tests, + tungstenite_client_test, + state_tests, runtime_update_sync, - runtime_update_async, + ws_client_test, ] steps: - uses: actions/checkout@v3 diff --git a/testing/examples/tungstenite_client_test.rs b/testing/examples/tungstenite_client_test.rs new file mode 100755 index 000000000..715404369 --- /dev/null +++ b/testing/examples/tungstenite_client_test.rs @@ -0,0 +1,63 @@ +/* + Copyright 2019 Supercomputing Systems AG + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use sp_core::{ + crypto::{Pair, Ss58Codec}, + sr25519, +}; +use sp_runtime::MultiAddress; +use substrate_api_client::{ + ac_node_api::error::{DispatchError, TokenError}, + ac_primitives::{AssetRuntimeConfig, ExtrinsicSigner}, + extrinsic::BalancesExtrinsics, + rpc::TungsteniteRpcClient, + Api, GetAccountInformation, SubmitAndWatch, XtStatus, +}; + +fn main() { + // Setup + let alice: sr25519::Pair = Pair::from_string( + "0xe5be9a5092b81bca64be81d212e7f2f9eba183bb7a90954f7b76361f6edb5c0a", + None, + ) + .unwrap(); + let client = TungsteniteRpcClient::with_default_url(100); + let mut api = Api::::new(client).unwrap(); + api.set_signer(ExtrinsicSigner::::new(alice.clone())); + + let bob = sr25519::Public::from_ss58check("5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty") + .unwrap(); + let bob_balance = api.get_account_data(&bob.into()).unwrap().unwrap_or_default().free; + + // Check for failed extrinsic failed onchain + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); + let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); + assert_eq!(result, Err(DispatchError::Token(TokenError::FundsUnavailable))); + + // Check directly failed extrinsic (before actually submitted to a block) + let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); + assert!(result.is_err()); + assert!(format!("{:?}", result).contains("ExtrinsicFailed")); + + // Check for successful extrinisc + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); + let block_hash = api + .submit_and_watch_extrinsic_until(xt, XtStatus::InBlock) + .unwrap() + .block_hash + .unwrap(); + let bob_new_balance = api.get_account_data(&bob.into()).unwrap().unwrap().free; + assert!(bob_new_balance > bob_balance); +} diff --git a/testing/examples/ws_client_test.rs b/testing/examples/ws_client_test.rs new file mode 100755 index 000000000..21ca1a3e6 --- /dev/null +++ b/testing/examples/ws_client_test.rs @@ -0,0 +1,63 @@ +/* + Copyright 2019 Supercomputing Systems AG + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use sp_core::{ + crypto::{Pair, Ss58Codec}, + sr25519, +}; +use sp_runtime::MultiAddress; +use substrate_api_client::{ + ac_node_api::error::{DispatchError, TokenError}, + ac_primitives::{AssetRuntimeConfig, ExtrinsicSigner}, + extrinsic::BalancesExtrinsics, + rpc::WsRpcClient, + Api, GetAccountInformation, SubmitAndWatch, XtStatus, +}; + +fn main() { + // Setup + let alice: sr25519::Pair = Pair::from_string( + "0xe5be9a5092b81bca64be81d212e7f2f9eba183bb7a90954f7b76361f6edb5c0a", + None, + ) + .unwrap(); + let client = WsRpcClient::with_default_url(); + let mut api = Api::::new(client).unwrap(); + api.set_signer(ExtrinsicSigner::::new(alice.clone())); + + let bob = sr25519::Public::from_ss58check("5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty") + .unwrap(); + let bob_balance = api.get_account_data(&bob.into()).unwrap().unwrap_or_default().free; + + // Check for failed extrinsic failed onchain + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); + let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); + assert_eq!(result, Err(DispatchError::Token(TokenError::FundsUnavailable))); + + // Check directly failed extrinsic (before actually submitted to a block) + let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); + assert!(result.is_err()); + assert!(format!("{:?}", result).contains("ExtrinsicFailed")); + + // Check for successful extrinisc + let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); + let block_hash = api + .submit_and_watch_extrinsic_until(xt, XtStatus::InBlock) + .unwrap() + .block_hash + .unwrap(); + let bob_new_balance = api.get_account_data(&bob.into()).unwrap().unwrap().free; + assert!(bob_new_balance > bob_balance); +} From b9f1c38a1cb56a140c91be9f50575e07ffb4d444 Mon Sep 17 00:00:00 2001 From: haerdib Date: Mon, 6 Nov 2023 10:20:35 +0100 Subject: [PATCH 11/21] fix test and clean up ws_rpc client --- src/rpc/ws_client/mod.rs | 67 +++++++++++++-------- testing/examples/tungstenite_client_test.rs | 5 +- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index 487d6017d..2664abad0 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -112,41 +112,58 @@ impl HandleMessage for SubscriptionHandler { fn handle_message(&mut self, context: &mut Self::Context) -> WsResult<()> { let result = &context.result; let out = &context.out; - let msg = &context.msg; + let msg = &context.msg.as_text()?; info!("got on_subscription_msg {}", msg); - let value: serde_json::Value = serde_json::from_str(msg.as_text()?).map_err(Box::new)?; - - let mut send_result = Ok(()); - match self.subscription_id.clone() { - Some(id) => { - if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() { - if id == msg_subscription_id { - send_result = result.send( - serde_json::to_string(&value["params"]["result"]).map_err(Box::new)?, - ); - } + let value: serde_json::Value = serde_json::from_str(msg).map_err(Box::new)?; + + let send_result = match self.subscription_id.as_ref() { + Some(id) => handle_subscription_message(result, &value, id), + None => { + self.subscription_id = get_subscription_id(&value); + if self.subscription_id.is_none() { + send_error_response(result, &value, msg) + } else { + Ok(()) } }, - None => match value["result"].as_str() { - Some(id) => self.subscription_id = Some(id.to_string()), - None => { - let message = match value["error"]["message"].is_string() { - true => serde_json::to_string(&value["error"]).map_err(Box::new)?, - false => format!("Received unexpected response: {}", msg), - }; - let _ = result.send(message); - out.close(CloseCode::Normal)?; - return Ok(()) - }, - }, }; if let Err(e) = send_result { // This may happen if the receiver has unsubscribed. - trace!("SendError: {}. will close ws", e); + trace!("SendError: {:?}. will close ws", e); out.close(CloseCode::Normal)?; }; Ok(()) } } + +fn handle_subscription_message( + result: &ThreadOut, + value: &serde_json::Value, + subscription_id: &str, +) -> Result<(), RpcClientError> { + if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() { + if subscription_id == msg_subscription_id { + result.send(serde_json::to_string(&value["params"]["result"])?)?; + } + } + Ok(()) +} + +fn get_subscription_id(value: &serde_json::Value) -> Option { + value["result"].as_str().map(|id| id.to_string()) +} + +fn send_error_response( + result: &ThreadOut, + value: &serde_json::Value, + original_message: &str, +) -> Result<(), RpcClientError> { + let message = match value["error"]["message"].is_string() { + true => serde_json::to_string(&value["error"])?, + false => format!("Received unexpected response: {}", original_message), + }; + result.send(message)?; + Ok(()) +} diff --git a/testing/examples/tungstenite_client_test.rs b/testing/examples/tungstenite_client_test.rs index 715404369..34385d822 100755 --- a/testing/examples/tungstenite_client_test.rs +++ b/testing/examples/tungstenite_client_test.rs @@ -44,7 +44,10 @@ fn main() { // Check for failed extrinsic failed onchain let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); - assert_eq!(result, Err(DispatchError::Token(TokenError::FundsUnavailable))); + match result { + Ok(_) => panic!("Expected an error"), + Err(e) => assert_eq!(e, DispatchError::Token(TokenError::FundsUnavailable)), + }; // Check directly failed extrinsic (before actually submitted to a block) let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); From bb5ee280fb7ee343364c6ba055988eceace4ff1d Mon Sep 17 00:00:00 2001 From: haerdib Date: Mon, 6 Nov 2023 10:22:14 +0100 Subject: [PATCH 12/21] fix test --- testing/examples/tungstenite_client_test.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/testing/examples/tungstenite_client_test.rs b/testing/examples/tungstenite_client_test.rs index 34385d822..511573e22 100755 --- a/testing/examples/tungstenite_client_test.rs +++ b/testing/examples/tungstenite_client_test.rs @@ -44,10 +44,7 @@ fn main() { // Check for failed extrinsic failed onchain let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); - match result { - Ok(_) => panic!("Expected an error"), - Err(e) => assert_eq!(e, DispatchError::Token(TokenError::FundsUnavailable)), - }; + assert!(format!("{:?}", result).contains("DispatchError::Token(TokenError::FundsUnavailable)")); // Check directly failed extrinsic (before actually submitted to a block) let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); From e03ef62dc83ba51b6e6043747aab553ff0a22ab2 Mon Sep 17 00:00:00 2001 From: haerdib Date: Mon, 6 Nov 2023 10:23:24 +0100 Subject: [PATCH 13/21] fix test --- testing/examples/ws_client_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/examples/ws_client_test.rs b/testing/examples/ws_client_test.rs index 21ca1a3e6..c0e2926b6 100755 --- a/testing/examples/ws_client_test.rs +++ b/testing/examples/ws_client_test.rs @@ -44,7 +44,7 @@ fn main() { // Check for failed extrinsic failed onchain let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); - assert_eq!(result, Err(DispatchError::Token(TokenError::FundsUnavailable))); + assert!(format!("{:?}", result).contains("DispatchError::Token(TokenError::FundsUnavailable)")); // Check directly failed extrinsic (before actually submitted to a block) let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); From 64601733aba07405c49b49fc575af1de38f55b7b Mon Sep 17 00:00:00 2001 From: haerdib Date: Mon, 6 Nov 2023 10:26:05 +0100 Subject: [PATCH 14/21] fix clippy --- testing/examples/tungstenite_client_test.rs | 3 +-- testing/examples/ws_client_test.rs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/testing/examples/tungstenite_client_test.rs b/testing/examples/tungstenite_client_test.rs index 511573e22..9f3f9f531 100755 --- a/testing/examples/tungstenite_client_test.rs +++ b/testing/examples/tungstenite_client_test.rs @@ -19,7 +19,6 @@ use sp_core::{ }; use sp_runtime::MultiAddress; use substrate_api_client::{ - ac_node_api::error::{DispatchError, TokenError}, ac_primitives::{AssetRuntimeConfig, ExtrinsicSigner}, extrinsic::BalancesExtrinsics, rpc::TungsteniteRpcClient, @@ -53,7 +52,7 @@ fn main() { // Check for successful extrinisc let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); - let block_hash = api + let _block_hash = api .submit_and_watch_extrinsic_until(xt, XtStatus::InBlock) .unwrap() .block_hash diff --git a/testing/examples/ws_client_test.rs b/testing/examples/ws_client_test.rs index c0e2926b6..a14452d4d 100755 --- a/testing/examples/ws_client_test.rs +++ b/testing/examples/ws_client_test.rs @@ -19,7 +19,6 @@ use sp_core::{ }; use sp_runtime::MultiAddress; use substrate_api_client::{ - ac_node_api::error::{DispatchError, TokenError}, ac_primitives::{AssetRuntimeConfig, ExtrinsicSigner}, extrinsic::BalancesExtrinsics, rpc::WsRpcClient, @@ -53,7 +52,7 @@ fn main() { // Check for successful extrinisc let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance / 2); - let block_hash = api + let _block_hash = api .submit_and_watch_extrinsic_until(xt, XtStatus::InBlock) .unwrap() .block_hash From 994e57ae00572b1da7b615eca8e203713932c9ae Mon Sep 17 00:00:00 2001 From: haerdib Date: Mon, 6 Nov 2023 10:57:24 +0100 Subject: [PATCH 15/21] fix tests --- testing/examples/tungstenite_client_test.rs | 2 +- testing/examples/ws_client_test.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/testing/examples/tungstenite_client_test.rs b/testing/examples/tungstenite_client_test.rs index 9f3f9f531..8d4694ee6 100755 --- a/testing/examples/tungstenite_client_test.rs +++ b/testing/examples/tungstenite_client_test.rs @@ -43,7 +43,7 @@ fn main() { // Check for failed extrinsic failed onchain let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); - assert!(format!("{:?}", result).contains("DispatchError::Token(TokenError::FundsUnavailable)")); + assert!(format!("{:?}", result).contains("FundsUnavailable")); // Check directly failed extrinsic (before actually submitted to a block) let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); diff --git a/testing/examples/ws_client_test.rs b/testing/examples/ws_client_test.rs index a14452d4d..10dbd0866 100755 --- a/testing/examples/ws_client_test.rs +++ b/testing/examples/ws_client_test.rs @@ -43,7 +43,7 @@ fn main() { // Check for failed extrinsic failed onchain let xt = api.balance_transfer_allow_death(MultiAddress::Id(bob.into()), bob_balance + 1); let result = api.submit_and_watch_extrinsic_until(xt.clone(), XtStatus::InBlock); - assert!(format!("{:?}", result).contains("DispatchError::Token(TokenError::FundsUnavailable)")); + assert!(format!("{:?}", result).contains("FundsUnavailable")); // Check directly failed extrinsic (before actually submitted to a block) let result = api.submit_and_watch_extrinsic_until(xt, XtStatus::InBlock); From b8aea241270953547772f1b770a5eb033a67e16d Mon Sep 17 00:00:00 2001 From: haerdib Date: Mon, 6 Nov 2023 13:15:37 +0100 Subject: [PATCH 16/21] git commit --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ecaa32273..4f08bd148 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -154,9 +154,10 @@ jobs: pallet_transaction_payment_tests, state_tests, tungstenite_client_test, + ws_client_test, state_tests, runtime_update_sync, - ws_client_test, + runtime_update_async ] steps: - uses: actions/checkout@v3 From 867845d87bf358f678e0d9d379abbc0d69164f71 Mon Sep 17 00:00:00 2001 From: haerdib Date: Wed, 8 Nov 2023 18:05:22 +0100 Subject: [PATCH 17/21] add missing comma --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4f08bd148..c974d8a86 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -157,7 +157,7 @@ jobs: ws_client_test, state_tests, runtime_update_sync, - runtime_update_async + runtime_update_async, ] steps: - uses: actions/checkout@v3 From 3ec0c30ba0e34a92696017ff68334dce26b8cf73 Mon Sep 17 00:00:00 2001 From: haerdib Date: Wed, 8 Nov 2023 18:05:46 +0100 Subject: [PATCH 18/21] extract helper functions --- src/rpc/mod.rs | 1 + src/rpc/tungstenite_client/client.rs | 17 ++++++----------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 73d07801f..7eb5e4c27 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -35,6 +35,7 @@ pub use jsonrpsee_client::JsonrpseeClient; pub mod jsonrpsee_client; pub mod error; +mod helpers; pub use error::{Error, Result}; diff --git a/src/rpc/tungstenite_client/client.rs b/src/rpc/tungstenite_client/client.rs index e0e8765a1..906f11ec5 100644 --- a/src/rpc/tungstenite_client/client.rs +++ b/src/rpc/tungstenite_client/client.rs @@ -15,7 +15,7 @@ */ use crate::rpc::{ - to_json_req, tungstenite_client::subscription::TungsteniteSubscriptionWrapper, + helpers, to_json_req, tungstenite_client::subscription::TungsteniteSubscriptionWrapper, Error as RpcClientError, Request, Result, Subscribe, }; use ac_primitives::RpcParams; @@ -138,13 +138,10 @@ fn subscribe_to_server( let msg = read_until_text_message(&mut socket)?; let value: Value = serde_json::from_str(&msg)?; - let subcription_id = match value["result"].as_str() { + let subcription_id = match helpers::read_subscription_id(&value) { Some(id) => id, None => { - let message = match value["error"]["message"].is_string() { - true => serde_json::to_string(&value["error"])?, - false => format!("Received unexpected response: {}", msg), - }; + let message = helpers::read_error_message(&value, &msg); result_in.send(message)?; return Ok(()) }, @@ -152,7 +149,7 @@ fn subscribe_to_server( loop { let msg = read_until_text_message(&mut socket)?; - send_message_to_client(result_in.clone(), &msg, subcription_id)?; + send_message_to_client(result_in.clone(), &msg, &subcription_id)?; } } @@ -171,10 +168,8 @@ fn send_message_to_client( info!("got on_subscription_msg {}", message); let value: Value = serde_json::from_str(message)?; - if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() { - if subscription_id == msg_subscription_id { - result_in.send(serde_json::to_string(&value["params"]["result"])?)?; - } + if helpers::subscription_id_matches(&value, subscription_id) { + result_in.send(serde_json::to_string(&value["params"]["result"])?)?; } Ok(()) From 98df866430ff30adff22fdc5be5f2e459ba57493 Mon Sep 17 00:00:00 2001 From: haerdib Date: Wed, 8 Nov 2023 18:05:56 +0100 Subject: [PATCH 19/21] add missing helper file --- src/rpc/helpers.rs | 144 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 src/rpc/helpers.rs diff --git a/src/rpc/helpers.rs b/src/rpc/helpers.rs new file mode 100644 index 000000000..67b413307 --- /dev/null +++ b/src/rpc/helpers.rs @@ -0,0 +1,144 @@ +/* + Copyright 2019 Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +use serde_json::Value; + +pub fn read_subscription_id(value: &Value) -> Option { + value["result"].as_str().map(|str| str.to_string()) +} + +pub fn read_error_message(value: &Value, msg: &str) -> String { + match value["error"].as_str() { + Some(error_message) => error_message.to_string(), + None => format!("Unexpected Response: {}", msg), + } +} + +pub fn subscription_id_matches(value: &Value, subscription_id: &str) -> bool { + match value["params"]["subscription"].as_str() { + Some(retrieved_subscription_id) => subscription_id == retrieved_subscription_id, + None => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn read_valid_subscription_response() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "result": subcription_id, + "id": 43, + "and_so_on": "test", + }); + + let maybe_subcription_id = read_subscription_id(&value); + assert_eq!(maybe_subcription_id, Some(subcription_id.to_string())); + } + + #[test] + fn read_invalid_subscription_response() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "error": subcription_id, + "id": 43, + "and_so_on": "test", + }); + + let maybe_subcription_id = read_subscription_id(&value); + assert!(maybe_subcription_id.is_none()); + } + + #[test] + fn read_error_message_returns_error_if_available() { + let error_message = "some_error_message"; + let value = json!({ + "error": error_message, + "id": 43, + "and_so_on": "test", + }); + + let msg = serde_json::to_string(&value).unwrap(); + + let message = read_error_message(&value, &msg); + assert!(message.contains(error_message)); + assert!(message.contains("error")); + } + + #[test] + fn read_error_message_returns_full_msg_if_error_is_not_available() { + let error_message = "some_error_message"; + let value = json!({ + "result": error_message, + "id": 43, + "and_so_on": "test", + }); + + let msg = serde_json::to_string(&value).unwrap(); + + let message = read_error_message(&value, &msg); + assert!(message.contains(&msg)); + } + + #[test] + fn subscription_id_matches_returns_true_for_equal_id() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "params": { + "subscription": subcription_id, + "message": "Test" + }, + "id": 43, + "and_so_on": "test", + }); + + assert!(subscription_id_matches(&value, subcription_id)); + } + + #[test] + fn subscription_id_matches_returns_false_for_not_equal_id() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "params": { + "subscription": "something else", + "message": "Test" + }, + "id": 43, + "and_so_on": "test", + }); + + assert!(!subscription_id_matches(&value, subcription_id)); + } + + #[test] + fn subscription_id_matches_returns_false_for_missing_subscription() { + let subcription_id = "tejkataa12124a"; + let value = json!({ + "params": { + "result": subcription_id, + "message": "Test" + }, + "id": 43, + "and_so_on": "test", + }); + + assert!(!subscription_id_matches(&value, subcription_id)); + } +} From 4243aee19f80e55ece8c1c80ac086c2a315577f8 Mon Sep 17 00:00:00 2001 From: haerdib Date: Wed, 8 Nov 2023 18:13:04 +0100 Subject: [PATCH 20/21] remove code from ws-client and use helpers --- src/rpc/mod.rs | 1 + src/rpc/ws_client/mod.rs | 22 ++++++---------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 7eb5e4c27..ff79acdea 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -35,6 +35,7 @@ pub use jsonrpsee_client::JsonrpseeClient; pub mod jsonrpsee_client; pub mod error; +#[cfg(any(feature = "ws-client", feature = "tungstenite-client"))] mod helpers; pub use error::{Error, Result}; diff --git a/src/rpc/ws_client/mod.rs b/src/rpc/ws_client/mod.rs index 2664abad0..714658043 100644 --- a/src/rpc/ws_client/mod.rs +++ b/src/rpc/ws_client/mod.rs @@ -15,7 +15,7 @@ */ -use crate::rpc::Error as RpcClientError; +use crate::rpc::{helpers, Error as RpcClientError}; use log::*; use std::{fmt::Debug, sync::mpsc::Sender as ThreadOut}; use ws::{CloseCode, Handler, Handshake, Message, Result as WsResult, Sender}; @@ -120,7 +120,7 @@ impl HandleMessage for SubscriptionHandler { let send_result = match self.subscription_id.as_ref() { Some(id) => handle_subscription_message(result, &value, id), None => { - self.subscription_id = get_subscription_id(&value); + self.subscription_id = helpers::read_subscription_id(&value); if self.subscription_id.is_none() { send_error_response(result, &value, msg) } else { @@ -143,27 +143,17 @@ fn handle_subscription_message( value: &serde_json::Value, subscription_id: &str, ) -> Result<(), RpcClientError> { - if let Some(msg_subscription_id) = value["params"]["subscription"].as_str() { - if subscription_id == msg_subscription_id { - result.send(serde_json::to_string(&value["params"]["result"])?)?; - } + if helpers::subscription_id_matches(value, subscription_id) { + result.send(serde_json::to_string(&value["params"]["result"])?)?; } Ok(()) } -fn get_subscription_id(value: &serde_json::Value) -> Option { - value["result"].as_str().map(|id| id.to_string()) -} - fn send_error_response( result: &ThreadOut, value: &serde_json::Value, - original_message: &str, + msg: &str, ) -> Result<(), RpcClientError> { - let message = match value["error"]["message"].is_string() { - true => serde_json::to_string(&value["error"])?, - false => format!("Received unexpected response: {}", original_message), - }; - result.send(message)?; + result.send(helpers::read_error_message(value, msg))?; Ok(()) } From 86dfc27dce12751d8ca57729f50936d31882906c Mon Sep 17 00:00:00 2001 From: haerdib Date: Wed, 8 Nov 2023 18:17:56 +0100 Subject: [PATCH 21/21] fix compile errors --- src/rpc/helpers.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/rpc/helpers.rs b/src/rpc/helpers.rs index 67b413307..5dcd5524a 100644 --- a/src/rpc/helpers.rs +++ b/src/rpc/helpers.rs @@ -15,6 +15,10 @@ */ +use alloc::{ + format, + string::{String, ToString}, +}; use serde_json::Value; pub fn read_subscription_id(value: &Value) -> Option {