Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: stabilize chainhead backend #1802

Merged
merged 14 commits into from
Oct 3, 2024
Merged
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ jobs:
uses: "andymckay/cancel-action@a955d435292c0d409d104b57d8e78435a93a6ef1" # v0.5

unstable_backend_tests:
name: "Test (Unstable Backend)"
name: "Test chainhead backend"
runs-on: ubuntu-latest-16-cores
needs: [clippy, wasm_clippy, check, wasm_check, docs]
timeout-minutes: 30
Expand Down Expand Up @@ -329,7 +329,7 @@ jobs:
uses: actions-rs/[email protected]
with:
command: nextest
args: run --workspace --features unstable-backend-client
args: run --workspace --features chainhead-backend

- if: "failure()"
uses: "andymckay/cancel-action@a955d435292c0d409d104b57d8e78435a93a6ef1" # v0.5
Expand Down
11 changes: 10 additions & 1 deletion subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,21 @@ web = [
"finito?/wasm-bindgen",
]

# Feature flags for enabling default future executor runtimes.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
# Technically it's a hack enable to both but simplifies the conditional compilation
# and subxt is selecting executor based on the used platform.
#
# For instance `wasm-bindgen-futures` panics if the platform isn't wasm32 and
# similar for tokio that requires a tokio runtime to be initialized.
runtime = ["tokio/rt", "wasm-bindgen-futures"]

# Enable this to use the reconnecting rpc client
unstable-reconnecting-rpc-client = ["dep:finito", "dep:tokio", "jsonrpsee", "wasm-bindgen-futures"]
unstable-reconnecting-rpc-client = ["dep:finito", "jsonrpsee"]

# Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`).
jsonrpsee = [
"dep:jsonrpsee",
"runtime",
]

# Enable this to pull in extra Substrate dependencies which make it possible to
Expand Down
13 changes: 3 additions & 10 deletions subxt/examples/setup_reconnecting_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.build("ws://localhost:9944".to_string())
.await?;

// If you want to use the unstable backend with the reconnecting RPC client, you can do so like this:
// If you want to use the chainhead backend with the reconnecting RPC client, you can do so like this:
//
// ```
// use subxt::backend::unstable::UnstableBackend;
// use subxt::backend::chain_head:ChainHeadBackend;
// use subxt::OnlineClient;
//
// let (backend, mut driver) = UnstableBackend::builder().build(RpcClient::new(rpc.clone()));
// tokio::spawn(async move {
// while let Some(val) = driver.next().await {
// if let Err(e) = val {
// eprintln!("Error driving unstable backend: {e}; terminating client");
// }
// }
// });
// let backend = ChainHeadBackend::builder().build_with_background_task(RpcClient::new(rpc.clone()));
// let api: OnlineClient<PolkadotConfig> = OnlineClient::from_backend(Arc::new(backend)).await?;
// ```

Expand Down
35 changes: 35 additions & 0 deletions subxt/examples/setup_rpc_chainhead_backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//! Example to utilize the ChainHeadBackend rpc backend to subscribe to finalized blocks.

#![allow(missing_docs)]

use futures::StreamExt;
use subxt::backend::chain_head::{ChainHeadBackend, ChainHeadBackendBuilder};
use subxt::backend::rpc::RpcClient;
use subxt::{OnlineClient, PolkadotConfig};

// Generate an interface that we can use from the node's metadata.
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")]
pub mod polkadot {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

let rpc = RpcClient::from_url("ws://localhost:9944".to_string()).await?;
let backend: ChainHeadBackend<PolkadotConfig> =
ChainHeadBackendBuilder::default().build_with_background_driver(rpc.clone());
let api = OnlineClient::from_backend(std::sync::Arc::new(backend)).await?;

let mut blocks_sub = api.blocks().subscribe_finalized().await?.take(100);

while let Some(block) = blocks_sub.next().await {
let block = block?;

let block_number = block.number();
let block_hash = block.hash();

println!("Block #{block_number} ({block_hash})");
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use super::rpc_methods::{FollowEvent, UnstableRpcMethods};
use super::rpc_methods::{ChainHeadRpcMethods, FollowEvent};
use crate::config::Config;
use crate::error::Error;
use futures::{FutureExt, Stream, StreamExt};
Expand Down Expand Up @@ -99,7 +99,7 @@ impl<Hash> FollowStream<Hash> {
}

/// Create a new [`FollowStream`] given the RPC methods.
pub fn from_methods<T: Config>(methods: UnstableRpcMethods<T>) -> FollowStream<T::Hash> {
pub fn from_methods<T: Config>(methods: ChainHeadRpcMethods<T>) -> FollowStream<T::Hash> {
FollowStream {
stream_getter: Box::new(move || {
let methods = methods.clone();
Expand Down Expand Up @@ -215,7 +215,7 @@ impl<Hash> Stream for FollowStream<Hash> {
#[cfg(test)]
pub(super) mod test_utils {
use super::*;
use crate::backend::unstable::rpc_methods::{
use crate::backend::chain_head::rpc_methods::{
BestBlockChanged, Finalized, Initialized, NewBlock,
};
use crate::config::substrate::H256;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// see LICENSE for license details.

use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin};
use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEvent};
use crate::backend::chain_head::rpc_methods::{FollowEvent, Initialized, RuntimeEvent};
use crate::config::BlockHash;
use crate::error::{Error, RpcError};
use futures::stream::{Stream, StreamExt};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// see LICENSE for license details.

use super::follow_stream::FollowStream;
use super::UnstableRpcMethods;
use crate::backend::unstable::rpc_methods::{
use super::ChainHeadRpcMethods;
use crate::backend::chain_head::rpc_methods::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock,
};
use crate::config::{BlockHash, Config};
Expand Down Expand Up @@ -275,7 +275,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
/// Create a new [`FollowStreamUnpin`] given the RPC methods.
pub fn from_methods<T: Config>(
follow_stream: FollowStream<T::Hash>,
methods: UnstableRpcMethods<T>,
methods: ChainHeadRpcMethods<T>,
max_block_life: usize,
) -> FollowStreamUnpin<T::Hash> {
let unpin_method = Box::new(move |hash: T::Hash, sub_id: Arc<str>| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ use std::task::Poll;
use storage_items::StorageItems;

// Expose the RPC methods.
pub use rpc_methods::UnstableRpcMethods;
pub use rpc_methods::ChainHeadRpcMethods;

/// Configure and build an [`UnstableBackend`].
pub struct UnstableBackendBuilder<T> {
/// Configure and build an [`ChainHeadBackend`].
pub struct ChainHeadBackendBuilder<T> {
max_block_life: usize,
_marker: std::marker::PhantomData<T>,
}

impl<T: Config> Default for UnstableBackendBuilder<T> {
impl<T: Config> Default for ChainHeadBackendBuilder<T> {
fn default() -> Self {
Self::new()
}
}

impl<T: Config> UnstableBackendBuilder<T> {
/// Create a new [`UnstableBackendBuilder`].
impl<T: Config> ChainHeadBackendBuilder<T> {
/// Create a new [`ChainHeadBackendBuilder`].
pub fn new() -> Self {
Self {
max_block_life: usize::MAX,
Expand All @@ -73,15 +73,20 @@ impl<T: Config> UnstableBackendBuilder<T> {
self
}

/// Given an [`RpcClient`] to use to make requests, this returns a tuple of an [`UnstableBackend`],
/// which implements the [`Backend`] trait, and an [`UnstableBackendDriver`] which must be polled in
/// order for the backend to make progress.
/// A low-level API to build the backend and driver which requires polling the driver for the backend
/// to make progress.
///
/// This is useful if you want to manage the driver yourself, for example if you want to run it in on
/// a specific runtime.
///
/// If you just want to run the driver in the background until completion in on the default runtime,
/// use [`ChainHeadBackendBuilder::build_with_background_driver`] instead.
pub fn build(
self,
client: impl Into<RpcClient>,
) -> (UnstableBackend<T>, UnstableBackendDriver<T>) {
) -> (ChainHeadBackend<T>, ChainHeadBackendDriver<T>) {
// Construct the underlying follow_stream layers:
let rpc_methods = UnstableRpcMethods::new(client.into());
let rpc_methods = ChainHeadRpcMethods::new(client.into());
let follow_stream =
follow_stream::FollowStream::<T::Hash>::from_methods(rpc_methods.clone());
let follow_stream_unpin = follow_stream_unpin::FollowStreamUnpin::<T::Hash>::from_methods(
Expand All @@ -92,26 +97,60 @@ impl<T: Config> UnstableBackendBuilder<T> {
let follow_stream_driver = FollowStreamDriver::new(follow_stream_unpin);

// Wrap these into the backend and driver that we'll expose.
let backend = UnstableBackend {
let backend = ChainHeadBackend {
methods: rpc_methods,
follow_handle: follow_stream_driver.handle(),
};
let driver = UnstableBackendDriver {
let driver = ChainHeadBackendDriver {
driver: follow_stream_driver,
};

(backend, driver)
}

/// An API to build the backend and driver which will run in the background until completion
/// on the default runtime.
///
/// - On non-wasm targets, this will spawn a tokio task to poll the driver.
/// - On wasm targets, this will spawn a wasm-bindgen task to poll the driver.
#[cfg(feature = "runtime")]
pub fn build_with_background_driver(self, client: impl Into<RpcClient>) -> ChainHeadBackend<T> {
fn spawn<F: std::future::Future + Send + 'static>(future: F) {
#[cfg(not(target_family = "wasm"))]
tokio::spawn(async move {
future.await;
});
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
}

let (backend, mut driver) = self.build(client);

spawn(async move {
while let Some(res) = driver.next().await {
if let Err(e) = res {
if !e.is_disconnected_will_reconnect() {
tracing::debug!(target: "subxt", "chainHead driver was closed: {e}");
break;
}
}
}
});

backend
}
}

/// Driver for the [`UnstableBackend`]. This must be polled in order for the
/// Driver for the [`ChainHeadBackend`]. This must be polled in order for the
/// backend to make progress.
#[derive(Debug)]
pub struct UnstableBackendDriver<T: Config> {
pub struct ChainHeadBackendDriver<T: Config> {
driver: FollowStreamDriver<T::Hash>,
}

impl<T: Config> Stream for UnstableBackendDriver<T> {
impl<T: Config> Stream for ChainHeadBackendDriver<T> {
type Item = <FollowStreamDriver<T::Hash> as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand All @@ -121,19 +160,19 @@ impl<T: Config> Stream for UnstableBackendDriver<T> {
}
}

/// The unstable backend.
/// The chainHead backend.
#[derive(Debug, Clone)]
pub struct UnstableBackend<T: Config> {
pub struct ChainHeadBackend<T: Config> {
// RPC methods we'll want to call:
methods: UnstableRpcMethods<T>,
methods: ChainHeadRpcMethods<T>,
// A handle to the chainHead_follow subscription:
follow_handle: FollowStreamDriverHandle<T::Hash>,
}

impl<T: Config> UnstableBackend<T> {
/// Configure and construct an [`UnstableBackend`] and the associated [`UnstableBackendDriver`].
pub fn builder() -> UnstableBackendBuilder<T> {
UnstableBackendBuilder::new()
impl<T: Config> ChainHeadBackend<T> {
/// Configure and construct an [`ChainHeadBackend`] and the associated [`ChainHeadBackendDriver`].
pub fn builder() -> ChainHeadBackendBuilder<T> {
ChainHeadBackendBuilder::new()
}

/// Stream block headers based on the provided filter fn
Expand Down Expand Up @@ -193,10 +232,10 @@ impl<Hash: BlockHash + 'static> From<follow_stream_unpin::BlockRef<Hash>> for Bl
}
}

impl<T: Config> super::sealed::Sealed for UnstableBackend<T> {}
impl<T: Config> super::sealed::Sealed for ChainHeadBackend<T> {}

#[async_trait]
impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
async fn storage_fetch_values(
&self,
keys: Vec<Vec<u8>>,
Expand Down
Loading