Skip to content

Commit

Permalink
Remove timeout from subscription stream in wasm
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Dec 11, 2024
1 parent bf2d6d6 commit 5864374
Showing 1 changed file with 51 additions and 30 deletions.
81 changes: 51 additions & 30 deletions core/src/network/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ use color_eyre::{
Report, Result,
};
use futures::{Stream, TryStreamExt};
use std::{iter::Iterator, pin::Pin, sync::Arc, time::Duration};
#[cfg(not(target_arch = "wasm32"))]
use std::time::Duration;
use std::{iter::Iterator, pin::Pin, sync::Arc};
#[cfg(not(target_arch = "wasm32"))]
use thiserror::Error;
#[cfg(target_arch = "wasm32")]
use thiserror_no_std::Error;
use tokio::sync::{broadcast::Sender, RwLock};
use tokio_retry::Retry;
use tokio_stream::{Elapsed, StreamExt, StreamMap};
#[cfg(not(target_arch = "wasm32"))]
use tokio_stream::Elapsed;
use tokio_stream::{StreamExt, StreamMap};
use tracing::{error, info, warn};

use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof};
Expand Down Expand Up @@ -117,9 +121,14 @@ impl GenesisHash {
}
}

#[cfg(not(target_arch = "wasm32"))]
type SubscriptionStream =
Pin<Box<dyn Stream<Item = Result<Result<Subscription, subxt::error::Error>, Elapsed>> + Send>>;

#[cfg(target_arch = "wasm32")]
type SubscriptionStream =
Pin<Box<dyn Stream<Item = Result<Subscription, subxt::error::Error>> + Send>>;

#[derive(Clone)]
pub struct Client<T: Database> {
subxt_client: Arc<RwLock<SDK>>,
Expand Down Expand Up @@ -425,10 +434,17 @@ impl<D: Database> Client<D> {
Ok(mut stream) => {
loop {
match stream.next().await {
#[cfg(not(target_arch = "wasm32"))]
Some(Ok(Ok(item))) => {
yield Ok(item);
continue;
},
#[cfg(target_arch = "wasm32")]
Some(Ok(item)) => {
yield Ok(item);
continue;
},
#[cfg(not(target_arch = "wasm32"))]
Some(Ok(Err(error))) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."),
Some(Err(error)) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."),
None => warn!("RPC Subscription Stream exhausted. Creating new connection."),
Expand All @@ -452,39 +468,44 @@ impl<D: Database> Client<D> {
}

async fn create_rpc_subscriptions(client: SDK) -> Result<SubscriptionStream> {
// NOTE: current tokio stream implementation doesn't support timeouts on web
#[cfg(not(target_arch = "wasm32"))]
let timeout_in = Duration::from_secs(30);

let headers_stream = client
.api
.backend()
.stream_finalized_block_headers()
.await?
.map_ok(|(header, _)| Subscription::Header(header))
.inspect_ok(|_| info!("Received header on the stream"))
.inspect_err(|error| warn!(%error, "Received error on headers stream"));

// Create fused Avail Header subscription
let headers: SubscriptionStream = Box::pin(
client
.api
.backend()
.stream_finalized_block_headers()
.await?
.map_ok(|(header, _)| Subscription::Header(header))
.inspect_ok(|_| info!("Received header on the stream"))
.inspect_err(|error| warn!(%error, "Received error on headers stream"))
.timeout(timeout_in)
.fuse(),
);
#[cfg(not(target_arch = "wasm32"))]
let headers: SubscriptionStream = Box::pin(headers_stream.timeout(timeout_in).fuse());
#[cfg(target_arch = "wasm32")]
let headers: SubscriptionStream = Box::pin(headers_stream.fuse());

let justifications_stream = client
.rpc
.client
.subscribe(
"grandpa_subscribeJustifications",
rpc_params![],
"grandpa_unsubscribeJustifications",
)
.await?
.map_ok(Subscription::Justification)
.inspect_ok(|_| info!("Received justification on the stream"))
.inspect_err(|error| warn!(%error, "Received error on justifications stream"));

#[cfg(not(target_arch = "wasm32"))]
// Create fused GrandpaJustification subscription
let justifications: SubscriptionStream = Box::pin(
client
.rpc
.client
.subscribe(
"grandpa_subscribeJustifications",
rpc_params![],
"grandpa_unsubscribeJustifications",
)
.await?
.map_ok(Subscription::Justification)
.inspect_ok(|_| info!("Received justification on the stream"))
.inspect_err(|error| warn!(%error, "Received error on justifications stream"))
.timeout(timeout_in)
.fuse(),
);
let justifications: SubscriptionStream =
Box::pin(justifications_stream.timeout(timeout_in).fuse());
#[cfg(target_arch = "wasm32")]
let justifications: SubscriptionStream = Box::pin(justifications_stream.fuse());

let mut last_stream = 0;
let mut per_stream_count = 0;
Expand Down

0 comments on commit 5864374

Please sign in to comment.