Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async support for subscription #612

Merged
merged 3 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions examples/examples/event_error_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
limitations under the License.
*/

use codec::Decode;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed unused imports

use sp_keyring::AccountKeyring;
use sp_runtime::{AccountId32 as AccountId, MultiAddress};
use sp_runtime::MultiAddress;
use substrate_api_client::{
ac_node_api::StaticEvent,
ac_primitives::{AssetRuntimeConfig, ExtrinsicSigner},
extrinsic::BalancesExtrinsics,
rpc::JsonrpseeClient,
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/get_blocks_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ async fn main() {
println!("Latest Header: \n {:?} \n", latest_header);
println!("Latest block: \n {:?} \n", latest_block);

// This part is still executed synchronously
println!("Subscribing to finalized heads");
let mut subscription = api.subscribe_finalized_heads().unwrap();
let mut subscription = api.subscribe_finalized_heads().await.unwrap();
for _ in 0..5 {
let head = subscription.next().unwrap().unwrap();
let head = subscription.next().await.unwrap().unwrap();
println!("Got new Block {:?}", head);
println!("This print should be printed before the one with \"Got new Block\"");
}
}
7 changes: 4 additions & 3 deletions src/api/rpc_api/author.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ where
rpc_params![encoded_extrinsic],
"author_unsubmitAndWatchExtrinsic",
)
.await
.map_err(|e| e.into())
}

Expand Down Expand Up @@ -360,12 +361,12 @@ where
let mut subscription: TransactionSubscriptionFor<Self::Client, Self::Hash> =
self.submit_and_watch_opaque_extrinsic(encoded_extrinsic).await?;

while let Some(transaction_status) = subscription.next() {
while let Some(transaction_status) = subscription.next().await {
let transaction_status = transaction_status?;
match transaction_status.is_expected() {
Ok(_) =>
if transaction_status.reached_status(watch_until) {
subscription.unsubscribe()?;
subscription.unsubscribe().await?;
let block_hash = transaction_status.get_maybe_block_hash();
return Ok(ExtrinsicReport::new(
tx_hash,
Expand All @@ -375,7 +376,7 @@ where
))
},
Err(e) => {
subscription.unsubscribe()?;
subscription.unsubscribe().await?;
return Err(e)
},
}
Expand Down
7 changes: 5 additions & 2 deletions src/api/rpc_api/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,17 @@ where
Ok(blocks)
}
}
#[maybe_async::maybe_async(?Send)]
pub trait SubscribeChain {
type Client: Subscribe;
type Header: DeserializeOwned;

fn subscribe_finalized_heads(
async fn subscribe_finalized_heads(
&self,
) -> Result<<Self::Client as Subscribe>::Subscription<Self::Header>>;
}

#[maybe_async::maybe_async(?Send)]
impl<T, Client> SubscribeChain for Api<T, Client>
where
T: Config,
Expand All @@ -173,7 +175,7 @@ where
type Client = Client;
type Header = T::Header;

fn subscribe_finalized_heads(
async fn subscribe_finalized_heads(
&self,
) -> Result<<Self::Client as Subscribe>::Subscription<Self::Header>> {
debug!("subscribing to finalized heads");
Expand All @@ -183,6 +185,7 @@ where
rpc_params![],
"chain_unsubscribeFinalizedHeads",
)
.await
.map_err(|e| e.into())
}
}
22 changes: 14 additions & 8 deletions src/api/rpc_api/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ where
{
/// Wait for the next value from the internal subscription.
/// Upon encounter, it retrieves and decodes the expected `EventRecord`.
pub fn next_events<RuntimeEvent: Decode, Topic: Decode>(
#[maybe_async::maybe_async(?Send)]
pub async fn next_events<RuntimeEvent: Decode, Topic: Decode>(
&mut self,
) -> Option<Result<Vec<EventRecord<RuntimeEvent, Topic>>>> {
let change_set = match self.subscription.next()? {
let change_set = match self.subscription.next().await? {
Ok(set) => set,
Err(e) => return Some(Err(Error::RpcClient(e))),
};
Expand All @@ -123,8 +124,9 @@ where
//
// On the contrary to `next_events` this function only needs up-to-date metadata
// and is therefore updateable during runtime.
pub fn next_events_from_metadata(&mut self) -> Option<Result<Events<Hash>>> {
let change_set = match self.subscription.next()? {
#[maybe_async::maybe_async(?Send)]
pub async fn next_events_from_metadata(&mut self) -> Option<Result<Events<Hash>>> {
let change_set = match self.subscription.next().await? {
Ok(set) => set,
Err(e) => return Some(Err(Error::RpcClient(e))),
};
Expand All @@ -140,19 +142,22 @@ where
}

/// Unsubscribe from the internal subscription.
pub fn unsubscribe(self) -> Result<()> {
self.subscription.unsubscribe().map_err(|e| e.into())
#[maybe_async::maybe_async(?Send)]
pub async fn unsubscribe(self) -> Result<()> {
self.subscription.unsubscribe().await.map_err(|e| e.into())
}
}

#[maybe_async::maybe_async(?Send)]
pub trait SubscribeEvents {
type Client: Subscribe;
type Hash: DeserializeOwned;

/// Subscribe to events.
fn subscribe_events(&self) -> Result<EventSubscriptionFor<Self::Client, Self::Hash>>;
async fn subscribe_events(&self) -> Result<EventSubscriptionFor<Self::Client, Self::Hash>>;
}

#[maybe_async::maybe_async(?Send)]
impl<T, Client> SubscribeEvents for Api<T, Client>
where
T: Config,
Expand All @@ -161,11 +166,12 @@ where
type Client = Client;
type Hash = T::Hash;

fn subscribe_events(&self) -> Result<EventSubscriptionFor<Self::Client, Self::Hash>> {
async fn subscribe_events(&self) -> Result<EventSubscriptionFor<Self::Client, Self::Hash>> {
let key = crate::storage_key("System", "Events");
let subscription = self
.client()
.subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage")
.await
.map(|sub| EventSubscription::new(sub, self.metadata().clone()))?;
Ok(subscription)
}
Expand Down
8 changes: 6 additions & 2 deletions src/api/rpc_api/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,17 +409,20 @@ where
Ok(Decode::decode(&mut c.value.as_slice())?)
}
}

#[maybe_async::maybe_async(?Send)]
pub trait SubscribeState {
type Client: Subscribe;
type Hash: DeserializeOwned;

fn subscribe_state(
async fn subscribe_state(
&self,
pallet: &str,
storage_key: &str,
) -> Result<StorageChangeSetSubscriptionFor<Self::Client, Self::Hash>>;
}

#[maybe_async::maybe_async(?Send)]
impl<T, Client> SubscribeState for Api<T, Client>
where
T: Config,
Expand All @@ -428,7 +431,7 @@ where
type Client = Client;
type Hash = T::Hash;

fn subscribe_state(
async fn subscribe_state(
&self,
pallet: &str,
storage_key_name: &str,
Expand All @@ -437,6 +440,7 @@ where
let key = crate::storage_key(pallet, storage_key_name);
self.client()
.subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage")
.await
.map_err(|e| e.into())
}
}
26 changes: 23 additions & 3 deletions src/rpc/jsonrpsee_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ impl JsonrpseeClient {
#[maybe_async::async_impl(?Send)]
impl Request for JsonrpseeClient {
async fn request<R: DeserializeOwned>(&self, method: &str, params: RpcParams) -> Result<R> {
// Support async: #278
let future = self.inner.request(method, RpcParamsWrapper(params)).await;
future.map_err(|e| Error::Client(Box::new(e)))
self.inner
.request(method, RpcParamsWrapper(params))
.await
.map_err(|e| Error::Client(Box::new(e)))
}
}
#[maybe_async::sync_impl]
Expand All @@ -71,6 +72,25 @@ impl Request for JsonrpseeClient {
}
}

#[maybe_async::async_impl(?Send)]
impl Subscribe for JsonrpseeClient {
type Subscription<Notification> = SubscriptionWrapper<Notification> where Notification: DeserializeOwned;

async fn subscribe<Notification: DeserializeOwned>(
&self,
sub: &str,
params: RpcParams,
unsub: &str,
) -> Result<Self::Subscription<Notification>> {
self.inner
.subscribe(sub, RpcParamsWrapper(params), unsub)
.await
.map(|sub| sub.into())
.map_err(|e| Error::Client(Box::new(e)))
}
}

#[maybe_async::sync_impl(?Send)]
impl Subscribe for JsonrpseeClient {
type Subscription<Notification> = SubscriptionWrapper<Notification> where Notification: DeserializeOwned;

Expand Down
19 changes: 18 additions & 1 deletion src/rpc/jsonrpsee_client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/

use crate::rpc::{Error, HandleSubscription, Result};
#[cfg(feature = "sync-api")]
use futures::executor::block_on;
use jsonrpsee::core::client::Subscription;
use serde::de::DeserializeOwned;
Expand All @@ -21,7 +22,7 @@ pub struct SubscriptionWrapper<Notification> {
inner: Subscription<Notification>,
}

// Support async: #278 (careful with no_std compatibility).
#[maybe_async::sync_impl(?Send)]
impl<Notification: DeserializeOwned> HandleSubscription<Notification>
for SubscriptionWrapper<Notification>
{
Expand All @@ -34,6 +35,22 @@ impl<Notification: DeserializeOwned> HandleSubscription<Notification>
}
}

#[maybe_async::async_impl(?Send)]
impl<Notification: DeserializeOwned> HandleSubscription<Notification>
for SubscriptionWrapper<Notification>
{
async fn next(&mut self) -> Option<Result<Notification>> {
self.inner
.next()
.await
.map(|result| result.map_err(|e| Error::Client(Box::new(e))))
}

async fn unsubscribe(self) -> Result<()> {
self.inner.unsubscribe().await.map_err(|e| Error::Client(Box::new(e)))
}
}

impl<Notification> From<Subscription<Notification>> for SubscriptionWrapper<Notification> {
fn from(inner: Subscription<Notification>) -> Self {
Self { inner }
Expand Down
8 changes: 5 additions & 3 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ pub trait Request {
}

/// Trait to be implemented by the ws-client for subscribing to the substrate node.
#[maybe_async::maybe_async(?Send)]
pub trait Subscribe {
type Subscription<Notification>: HandleSubscription<Notification>
where
Notification: DeserializeOwned;

fn subscribe<Notification: DeserializeOwned>(
async fn subscribe<Notification: DeserializeOwned>(
&self,
sub: &str,
params: RpcParams,
Expand All @@ -64,6 +65,7 @@ pub trait Subscribe {

/// Trait to use the full functionality of jsonrpseee Subscription type
/// without actually enforcing it.
#[maybe_async::maybe_async(?Send)]
pub trait HandleSubscription<Notification: DeserializeOwned> {
/// Returns the next notification from the stream.
/// This may return `None` if the subscription has been terminated,
Expand All @@ -72,10 +74,10 @@ pub trait HandleSubscription<Notification: DeserializeOwned> {
/// **Note:** This has an identical signature to the [`StreamExt::next`]
/// method (and delegates to that). Import [`StreamExt`] if you'd like
/// access to other stream combinator methods.
fn next(&mut self) -> Option<Result<Notification>>;
async fn next(&mut self) -> Option<Result<Notification>>;

/// Unsubscribe and consume the subscription.
fn unsubscribe(self) -> Result<()>;
async fn unsubscribe(self) -> Result<()>;
}

pub fn to_json_req(method: &str, params: RpcParams) -> Result<String> {
Expand Down
3 changes: 2 additions & 1 deletion src/rpc/tungstenite_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ impl Request for TungsteniteRpcClient {
}
}

#[maybe_async::maybe_async(?Send)]
impl Subscribe for TungsteniteRpcClient {
type Subscription<Notification> = TungsteniteSubscriptionWrapper<Notification> where Notification: DeserializeOwned;

fn subscribe<Notification: DeserializeOwned>(
async fn subscribe<Notification: DeserializeOwned>(
&self,
sub: &str,
params: RpcParams,
Expand Down
5 changes: 3 additions & 2 deletions src/rpc/tungstenite_client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ impl<Notification> TungsteniteSubscriptionWrapper<Notification> {
}
}

#[maybe_async::maybe_async(?Send)]
impl<Notification: DeserializeOwned> HandleSubscription<Notification>
for TungsteniteSubscriptionWrapper<Notification>
{
fn next(&mut self) -> Option<Result<Notification>> {
async fn next(&mut self) -> Option<Result<Notification>> {
let notification = match self.receiver.recv() {
Ok(notif) => notif,
// Sender was disconnected, therefore no further messages are to be expected.
Expand All @@ -44,7 +45,7 @@ impl<Notification: DeserializeOwned> HandleSubscription<Notification>
Some(serde_json::from_str(&notification).map_err(|e| e.into()))
}

fn unsubscribe(self) -> Result<()> {
async fn unsubscribe(self) -> Result<()> {
// TODO: Nicer unsubscription.
// We close ungracefully: Simply drop the receiver. This will turn
// into an error on the sender side, terminating the websocket polling
Expand Down
3 changes: 2 additions & 1 deletion src/rpc/ws_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ impl Request for WsRpcClient {
}
}

#[maybe_async::maybe_async(?Send)]
impl Subscribe for WsRpcClient {
type Subscription<Notification> = WsSubscriptionWrapper<Notification> where Notification: DeserializeOwned;

fn subscribe<Notification: DeserializeOwned>(
async fn subscribe<Notification: DeserializeOwned>(
&self,
sub: &str,
params: RpcParams,
Expand Down
6 changes: 3 additions & 3 deletions src/rpc/ws_client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ impl<Notification> WsSubscriptionWrapper<Notification> {
}
}

// Support async: #278 (careful with no_std compatibility).
#[maybe_async::maybe_async(?Send)]
impl<Notification: DeserializeOwned> HandleSubscription<Notification>
for WsSubscriptionWrapper<Notification>
{
fn next(&mut self) -> Option<Result<Notification>> {
async fn next(&mut self) -> Option<Result<Notification>> {
let notification = match self.receiver.recv() {
Ok(notif) => notif,
// Sender was disconnected, therefore no further messages are to be expected.
Expand All @@ -47,7 +47,7 @@ impl<Notification: DeserializeOwned> HandleSubscription<Notification>
Some(serde_json::from_str(&notification).map_err(|e| e.into()))
}

fn unsubscribe(self) -> Result<()> {
async fn unsubscribe(self) -> Result<()> {
self.ws_sender.clone().shutdown()?;
Ok(())
}
Expand Down