diff --git a/examples/examples/event_error_details.rs b/examples/examples/event_error_details.rs index 557c15460..062520d1f 100644 --- a/examples/examples/event_error_details.rs +++ b/examples/examples/event_error_details.rs @@ -13,11 +13,9 @@ limitations under the License. */ -use codec::Decode; use sp_keyring::AccountKeyring; -use sp_runtime::{AccountId32 as AccountId, MultiAddress}; +use sp_runtime::MultiAddress; use substrate_api_client::{ - ac_node_api::StaticEvent, ac_primitives::{AssetRuntimeConfig, ExtrinsicSigner}, extrinsic::BalancesExtrinsics, rpc::JsonrpseeClient, diff --git a/examples/examples/get_blocks_async.rs b/examples/examples/get_blocks_async.rs index 3a011d0c8..a2ab63d09 100644 --- a/examples/examples/get_blocks_async.rs +++ b/examples/examples/get_blocks_async.rs @@ -76,11 +76,11 @@ async fn main() { println!("Latest Header: \n {:?} \n", latest_header); println!("Latest block: \n {:?} \n", latest_block); - // This part is still executed synchronously println!("Subscribing to finalized heads"); - let mut subscription = api.subscribe_finalized_heads().unwrap(); + let mut subscription = api.subscribe_finalized_heads().await.unwrap(); for _ in 0..5 { - let head = subscription.next().unwrap().unwrap(); + let head = subscription.next().await.unwrap().unwrap(); println!("Got new Block {:?}", head); + println!("This print should be printed before the one with \"Got new Block\""); } } diff --git a/src/api/rpc_api/author.rs b/src/api/rpc_api/author.rs index 19187fa1b..1a2a654df 100644 --- a/src/api/rpc_api/author.rs +++ b/src/api/rpc_api/author.rs @@ -287,6 +287,7 @@ where rpc_params![encoded_extrinsic], "author_unsubmitAndWatchExtrinsic", ) + .await .map_err(|e| e.into()) } @@ -360,12 +361,12 @@ where let mut subscription: TransactionSubscriptionFor = self.submit_and_watch_opaque_extrinsic(encoded_extrinsic).await?; - while let Some(transaction_status) = subscription.next() { + while let Some(transaction_status) = subscription.next().await { let transaction_status = transaction_status?; match transaction_status.is_expected() { Ok(_) => if transaction_status.reached_status(watch_until) { - subscription.unsubscribe()?; + subscription.unsubscribe().await?; let block_hash = transaction_status.get_maybe_block_hash(); return Ok(ExtrinsicReport::new( tx_hash, @@ -375,7 +376,7 @@ where )) }, Err(e) => { - subscription.unsubscribe()?; + subscription.unsubscribe().await?; return Err(e) }, } diff --git a/src/api/rpc_api/chain.rs b/src/api/rpc_api/chain.rs index ca79d90eb..4452af5bd 100644 --- a/src/api/rpc_api/chain.rs +++ b/src/api/rpc_api/chain.rs @@ -156,15 +156,17 @@ where Ok(blocks) } } +#[maybe_async::maybe_async(?Send)] pub trait SubscribeChain { type Client: Subscribe; type Header: DeserializeOwned; - fn subscribe_finalized_heads( + async fn subscribe_finalized_heads( &self, ) -> Result<::Subscription>; } +#[maybe_async::maybe_async(?Send)] impl SubscribeChain for Api where T: Config, @@ -173,7 +175,7 @@ where type Client = Client; type Header = T::Header; - fn subscribe_finalized_heads( + async fn subscribe_finalized_heads( &self, ) -> Result<::Subscription> { debug!("subscribing to finalized heads"); @@ -183,6 +185,7 @@ where rpc_params![], "chain_unsubscribeFinalizedHeads", ) + .await .map_err(|e| e.into()) } } diff --git a/src/api/rpc_api/events.rs b/src/api/rpc_api/events.rs index d2babc7a8..9aaed1ba6 100644 --- a/src/api/rpc_api/events.rs +++ b/src/api/rpc_api/events.rs @@ -103,10 +103,11 @@ where { /// Wait for the next value from the internal subscription. /// Upon encounter, it retrieves and decodes the expected `EventRecord`. - pub fn next_events( + #[maybe_async::maybe_async(?Send)] + pub async fn next_events( &mut self, ) -> Option>>> { - let change_set = match self.subscription.next()? { + let change_set = match self.subscription.next().await? { Ok(set) => set, Err(e) => return Some(Err(Error::RpcClient(e))), }; @@ -123,8 +124,9 @@ where // // On the contrary to `next_events` this function only needs up-to-date metadata // and is therefore updateable during runtime. - pub fn next_events_from_metadata(&mut self) -> Option>> { - let change_set = match self.subscription.next()? { + #[maybe_async::maybe_async(?Send)] + pub async fn next_events_from_metadata(&mut self) -> Option>> { + let change_set = match self.subscription.next().await? { Ok(set) => set, Err(e) => return Some(Err(Error::RpcClient(e))), }; @@ -140,19 +142,22 @@ where } /// Unsubscribe from the internal subscription. - pub fn unsubscribe(self) -> Result<()> { - self.subscription.unsubscribe().map_err(|e| e.into()) + #[maybe_async::maybe_async(?Send)] + pub async fn unsubscribe(self) -> Result<()> { + self.subscription.unsubscribe().await.map_err(|e| e.into()) } } +#[maybe_async::maybe_async(?Send)] pub trait SubscribeEvents { type Client: Subscribe; type Hash: DeserializeOwned; /// Subscribe to events. - fn subscribe_events(&self) -> Result>; + async fn subscribe_events(&self) -> Result>; } +#[maybe_async::maybe_async(?Send)] impl SubscribeEvents for Api where T: Config, @@ -161,11 +166,12 @@ where type Client = Client; type Hash = T::Hash; - fn subscribe_events(&self) -> Result> { + async fn subscribe_events(&self) -> Result> { let key = crate::storage_key("System", "Events"); let subscription = self .client() .subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage") + .await .map(|sub| EventSubscription::new(sub, self.metadata().clone()))?; Ok(subscription) } diff --git a/src/api/rpc_api/state.rs b/src/api/rpc_api/state.rs index 5915bf6f8..224e4d636 100644 --- a/src/api/rpc_api/state.rs +++ b/src/api/rpc_api/state.rs @@ -409,17 +409,20 @@ where Ok(Decode::decode(&mut c.value.as_slice())?) } } + +#[maybe_async::maybe_async(?Send)] pub trait SubscribeState { type Client: Subscribe; type Hash: DeserializeOwned; - fn subscribe_state( + async fn subscribe_state( &self, pallet: &str, storage_key: &str, ) -> Result>; } +#[maybe_async::maybe_async(?Send)] impl SubscribeState for Api where T: Config, @@ -428,7 +431,7 @@ where type Client = Client; type Hash = T::Hash; - fn subscribe_state( + async fn subscribe_state( &self, pallet: &str, storage_key_name: &str, @@ -437,6 +440,7 @@ where let key = crate::storage_key(pallet, storage_key_name); self.client() .subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage") + .await .map_err(|e| e.into()) } } diff --git a/src/rpc/jsonrpsee_client/mod.rs b/src/rpc/jsonrpsee_client/mod.rs index 979fc220c..987a5dfb3 100644 --- a/src/rpc/jsonrpsee_client/mod.rs +++ b/src/rpc/jsonrpsee_client/mod.rs @@ -58,9 +58,10 @@ impl JsonrpseeClient { #[maybe_async::async_impl(?Send)] impl Request for JsonrpseeClient { async fn request(&self, method: &str, params: RpcParams) -> Result { - // Support async: #278 - let future = self.inner.request(method, RpcParamsWrapper(params)).await; - future.map_err(|e| Error::Client(Box::new(e))) + self.inner + .request(method, RpcParamsWrapper(params)) + .await + .map_err(|e| Error::Client(Box::new(e))) } } #[maybe_async::sync_impl] @@ -71,6 +72,25 @@ impl Request for JsonrpseeClient { } } +#[maybe_async::async_impl(?Send)] +impl Subscribe for JsonrpseeClient { + type Subscription = SubscriptionWrapper where Notification: DeserializeOwned; + + async fn subscribe( + &self, + sub: &str, + params: RpcParams, + unsub: &str, + ) -> Result> { + self.inner + .subscribe(sub, RpcParamsWrapper(params), unsub) + .await + .map(|sub| sub.into()) + .map_err(|e| Error::Client(Box::new(e))) + } +} + +#[maybe_async::sync_impl(?Send)] impl Subscribe for JsonrpseeClient { type Subscription = SubscriptionWrapper where Notification: DeserializeOwned; diff --git a/src/rpc/jsonrpsee_client/subscription.rs b/src/rpc/jsonrpsee_client/subscription.rs index 113642553..d42591b13 100644 --- a/src/rpc/jsonrpsee_client/subscription.rs +++ b/src/rpc/jsonrpsee_client/subscription.rs @@ -12,6 +12,7 @@ */ use crate::rpc::{Error, HandleSubscription, Result}; +#[cfg(feature = "sync-api")] use futures::executor::block_on; use jsonrpsee::core::client::Subscription; use serde::de::DeserializeOwned; @@ -21,7 +22,7 @@ pub struct SubscriptionWrapper { inner: Subscription, } -// Support async: #278 (careful with no_std compatibility). +#[maybe_async::sync_impl(?Send)] impl HandleSubscription for SubscriptionWrapper { @@ -34,6 +35,22 @@ impl HandleSubscription } } +#[maybe_async::async_impl(?Send)] +impl HandleSubscription + for SubscriptionWrapper +{ + async fn next(&mut self) -> Option> { + self.inner + .next() + .await + .map(|result| result.map_err(|e| Error::Client(Box::new(e)))) + } + + async fn unsubscribe(self) -> Result<()> { + self.inner.unsubscribe().await.map_err(|e| Error::Client(Box::new(e))) + } +} + impl From> for SubscriptionWrapper { fn from(inner: Subscription) -> Self { Self { inner } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 575bef48a..73d07801f 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -49,12 +49,13 @@ pub trait Request { } /// Trait to be implemented by the ws-client for subscribing to the substrate node. +#[maybe_async::maybe_async(?Send)] pub trait Subscribe { type Subscription: HandleSubscription where Notification: DeserializeOwned; - fn subscribe( + async fn subscribe( &self, sub: &str, params: RpcParams, @@ -64,6 +65,7 @@ pub trait Subscribe { /// Trait to use the full functionality of jsonrpseee Subscription type /// without actually enforcing it. +#[maybe_async::maybe_async(?Send)] pub trait HandleSubscription { /// Returns the next notification from the stream. /// This may return `None` if the subscription has been terminated, @@ -72,10 +74,10 @@ pub trait HandleSubscription { /// **Note:** This has an identical signature to the [`StreamExt::next`] /// method (and delegates to that). Import [`StreamExt`] if you'd like /// access to other stream combinator methods. - fn next(&mut self) -> Option>; + async fn next(&mut self) -> Option>; /// Unsubscribe and consume the subscription. - fn unsubscribe(self) -> Result<()>; + async fn unsubscribe(self) -> Result<()>; } pub fn to_json_req(method: &str, params: RpcParams) -> Result { diff --git a/src/rpc/tungstenite_client/client.rs b/src/rpc/tungstenite_client/client.rs index 4b4e037e5..efb4ea68c 100644 --- a/src/rpc/tungstenite_client/client.rs +++ b/src/rpc/tungstenite_client/client.rs @@ -64,10 +64,11 @@ impl Request for TungsteniteRpcClient { } } +#[maybe_async::maybe_async(?Send)] impl Subscribe for TungsteniteRpcClient { type Subscription = TungsteniteSubscriptionWrapper where Notification: DeserializeOwned; - fn subscribe( + async fn subscribe( &self, sub: &str, params: RpcParams, diff --git a/src/rpc/tungstenite_client/subscription.rs b/src/rpc/tungstenite_client/subscription.rs index 2f9fe2bce..f06abdd42 100644 --- a/src/rpc/tungstenite_client/subscription.rs +++ b/src/rpc/tungstenite_client/subscription.rs @@ -32,10 +32,11 @@ impl TungsteniteSubscriptionWrapper { } } +#[maybe_async::maybe_async(?Send)] impl HandleSubscription for TungsteniteSubscriptionWrapper { - fn next(&mut self) -> Option> { + async fn next(&mut self) -> Option> { let notification = match self.receiver.recv() { Ok(notif) => notif, // Sender was disconnected, therefore no further messages are to be expected. @@ -44,7 +45,7 @@ impl HandleSubscription Some(serde_json::from_str(¬ification).map_err(|e| e.into())) } - fn unsubscribe(self) -> Result<()> { + async fn unsubscribe(self) -> Result<()> { // TODO: Nicer unsubscription. // We close ungracefully: Simply drop the receiver. This will turn // into an error on the sender side, terminating the websocket polling diff --git a/src/rpc/ws_client/client.rs b/src/rpc/ws_client/client.rs index 5f9826d6f..d5ba7ac03 100644 --- a/src/rpc/ws_client/client.rs +++ b/src/rpc/ws_client/client.rs @@ -56,10 +56,11 @@ impl Request for WsRpcClient { } } +#[maybe_async::maybe_async(?Send)] impl Subscribe for WsRpcClient { type Subscription = WsSubscriptionWrapper where Notification: DeserializeOwned; - fn subscribe( + async fn subscribe( &self, sub: &str, params: RpcParams, diff --git a/src/rpc/ws_client/subscription.rs b/src/rpc/ws_client/subscription.rs index b1b54f1bc..0358e7ad7 100644 --- a/src/rpc/ws_client/subscription.rs +++ b/src/rpc/ws_client/subscription.rs @@ -34,11 +34,11 @@ impl WsSubscriptionWrapper { } } -// Support async: #278 (careful with no_std compatibility). +#[maybe_async::maybe_async(?Send)] impl HandleSubscription for WsSubscriptionWrapper { - fn next(&mut self) -> Option> { + async fn next(&mut self) -> Option> { let notification = match self.receiver.recv() { Ok(notif) => notif, // Sender was disconnected, therefore no further messages are to be expected. @@ -47,7 +47,7 @@ impl HandleSubscription Some(serde_json::from_str(¬ification).map_err(|e| e.into())) } - fn unsubscribe(self) -> Result<()> { + async fn unsubscribe(self) -> Result<()> { self.ws_sender.clone().shutdown()?; Ok(()) }