Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
jsdw committed Jan 27, 2025
1 parent c4cdedb commit 467a5ec
Show file tree
Hide file tree
Showing 18 changed files with 131 additions and 107 deletions.
4 changes: 2 additions & 2 deletions rpcs/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@

crate::macros::cfg_jsonrpsee! {
mod jsonrpsee_impl;
pub use jsonrpsee::core::client::Client as JsonrpseeRpcClient;
pub use jsonrpsee::core::client::Client as JsonrpseeRpcClient;
}

crate::macros::cfg_unstable_light_client! {
mod lightclient_impl;
pub use lightclient_impl::LightClientRpc as LightClientRpcClient;
pub use lightclient_impl::LightClientRpc as LightClientRpcClient;
}

crate::macros::cfg_reconnecting_rpc_client! {
Expand Down
8 changes: 4 additions & 4 deletions rpcs/src/client/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ impl RpcParams {
} else {
self.0.push(b',')
}
serde_json::to_writer(&mut self.0, &param)
.map_err(Error::Deserialization)?;
serde_json::to_writer(&mut self.0, &param).map_err(Error::Deserialization)?;
Ok(())
}
/// Build a [`RawValue`] from our params, returning `None` if no parameters
Expand Down Expand Up @@ -236,8 +235,9 @@ impl<Res: DeserializeOwned> Stream for RpcSubscription<Res> {
// Decode the inner RawValue to the type we're expecting and map
// any errors to the right shape:
let res = res.map(|r| {
r.map_err(|e| e.into())
.and_then(|raw_val| serde_json::from_str(raw_val.get()).map_err(Error::Deserialization))
r.map_err(|e| e.into()).and_then(|raw_val| {
serde_json::from_str(raw_val.get()).map_err(Error::Deserialization)
})
});

Poll::Ready(res)
Expand Down
2 changes: 1 addition & 1 deletion rpcs/src/client/rpc_client_t.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use crate::Error;
use futures::Stream;
use std::{future::Future, pin::Pin};
use crate::Error;

// Re-exporting for simplicity since it's used a bunch in the trait definition.
pub use serde_json::value::RawValue;
Expand Down
20 changes: 11 additions & 9 deletions rpcs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,18 @@
))]
compile_error!("subxt-rpcs: exactly one of the 'web' and 'native' features should be used.");


mod macros;

pub mod utils;
pub mod client;
pub mod methods;
pub mod utils;

// Expose the most common things at the top level:
pub use client::{ RpcClient, RpcClientT };
pub use methods::{ ChainHeadRpcMethods, LegacyRpcMethods };
pub use client::{RpcClient, RpcClientT};
pub use methods::{ChainHeadRpcMethods, LegacyRpcMethods};

/// Configuration used by some of the RPC methods to determine the shape of
/// some of the inputs or responses.
/// some of the inputs or responses.
pub trait RpcConfig {
/// The block header type.
type Header: Header;
Expand All @@ -34,20 +33,23 @@ pub trait RpcConfig {

/// A trait which is applied to any type that is a valid block header.
pub trait Header: std::fmt::Debug + codec::Decode + serde::de::DeserializeOwned {}
impl <T> Header for T where T: std::fmt::Debug + codec::Decode + serde::de::DeserializeOwned {}
impl<T> Header for T where T: std::fmt::Debug + codec::Decode + serde::de::DeserializeOwned {}

/// A trait which is applied to any type that is a valid block hash.
pub trait BlockHash: serde::de::DeserializeOwned + serde::Serialize {}
impl <T> BlockHash for T where T: serde::de::DeserializeOwned + serde::Serialize {}
impl<T> BlockHash for T where T: serde::de::DeserializeOwned + serde::Serialize {}

/// A trait which is applied to any type that is a valid Account ID.
pub trait AccountId: serde::Serialize {}
impl <T> AccountId for T where T: serde::Serialize {}
impl<T> AccountId for T where T: serde::Serialize {}

#[cfg(feature = "subxt-core")]
mod impl_config {
use super::*;
impl <T> RpcConfig for T where T: subxt_core::Config {
impl<T> RpcConfig for T
where
T: subxt_core::Config,
{
type Header = T::Header;
type Hash = T::Hash;
type AccountId = T::AccountId;
Expand Down
2 changes: 1 addition & 1 deletion rpcs/src/methods/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use crate::client::{rpc_params, RpcClient, RpcSubscription};
use crate::BlockHash;
use crate::{RpcConfig, Error};
use crate::{Error, RpcConfig};
use derive_where::derive_where;
use futures::{Stream, StreamExt};
use serde::{Deserialize, Deserializer, Serialize};
Expand Down
15 changes: 10 additions & 5 deletions rpcs/src/methods/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
//! An interface to call the raw legacy RPC methods.
use crate::client::{rpc_params, RpcClient, RpcSubscription};
use crate::{RpcConfig, Error};
use crate::{Error, RpcConfig};
use codec::Decode;
use frame_metadata::RuntimeMetadataPrefixed;
use derive_where::derive_where;
use frame_metadata::RuntimeMetadataPrefixed;
use primitive_types::U256;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -101,7 +101,10 @@ impl<T: RpcConfig> LegacyRpcMethods<T> {
}

/// Fetch the metadata via the legacy `state_getMetadata` RPC method.
pub async fn state_get_metadata(&self, at: Option<T::Hash>) -> Result<StateGetMetadataResponse, Error> {
pub async fn state_get_metadata(
&self,
at: Option<T::Hash>,
) -> Result<StateGetMetadataResponse, Error> {
let bytes: Bytes = self
.client
.request("state_getMetadata", rpc_params![at])
Expand Down Expand Up @@ -398,12 +401,14 @@ impl<T: RpcConfig> LegacyRpcMethods<T> {
pub struct StateGetMetadataResponse(Vec<u8>);

impl StateGetMetadataResponse {
/// Return the raw SCALE encoded metadata bytes
/// Return the raw SCALE encoded metadata bytes
pub fn into_raw(self) -> Vec<u8> {
self.0
}
/// Decode and return [`frame_metadata::RuntimeMetadataPrefixed`].
pub fn to_frame_metadata(&self) -> Result<frame_metadata::RuntimeMetadataPrefixed, codec::Error> {
pub fn to_frame_metadata(
&self,
) -> Result<frame_metadata::RuntimeMetadataPrefixed, codec::Error> {
RuntimeMetadataPrefixed::decode(&mut &*self.0)
}
}
Expand Down
8 changes: 4 additions & 4 deletions rpcs/src/methods/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
// see LICENSE for license details.

//! RPC methods are defined in this module. At the moment we have:
//!
//!
//! - [`ChainHeadRpcMethods`] (and the types in [`chain_head`]): these methods
//! implement the RPC spec at <https://paritytech.github.io/json-rpc-interface-spec/api/chainHead.html>
//!
//!
//! We also have (although their use is not advised):
//!
//!
//! - [`LegacyRpcMethods`] (and the types in [`legacy`]): a collection of legacy RPCs.
//! These are not well specified and may change in implementations without warning,
//! but for those methods we expose, we make a best effort to work against latest Substrate versions.
Expand All @@ -17,4 +17,4 @@ pub mod chain_head;
pub mod legacy;

pub use chain_head::ChainHeadRpcMethods;
pub use legacy::LegacyRpcMethods;
pub use legacy::LegacyRpcMethods;
4 changes: 2 additions & 2 deletions rpcs/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

//! A couple of utility methods that we make use of.
use url::Url;
use crate::Error;
use url::Url;

/// A URL is considered secure if it uses a secure scheme ("https" or "wss") or is referring to localhost.
///
Expand All @@ -30,4 +30,4 @@ pub fn validate_url_is_secure(url: &str) -> Result<(), Error> {
} else {
Ok(())
}
}
}
6 changes: 2 additions & 4 deletions subxt/src/backend/chain_head/follow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

use crate::config::Config;
use crate::error::Error;
use subxt_rpcs::methods::chain_head::{ChainHeadRpcMethods, FollowEvent};
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use subxt_rpcs::methods::chain_head::{ChainHeadRpcMethods, FollowEvent};

/// A `Stream` whose goal is to remain subscribed to `chainHead_follow`. It will re-subscribe if the subscription
/// is ended for any reason, and it will return the current `subscription_id` as an event, along with the other
Expand Down Expand Up @@ -217,12 +217,10 @@ impl<Hash> Stream for FollowStream<Hash> {
#[cfg(test)]
pub(super) mod test_utils {
use super::*;
use subxt_rpcs::methods::chain_head::{
BestBlockChanged, Finalized, Initialized, NewBlock,
};
use crate::config::substrate::H256;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use subxt_rpcs::methods::chain_head::{BestBlockChanged, Finalized, Initialized, NewBlock};

/// Given some events, returns a follow stream getter that we can use in
/// place of the usual RPC method.
Expand Down
11 changes: 7 additions & 4 deletions subxt/src/backend/chain_head/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin};
use crate::config::BlockHash;
use crate::error::{Error, RpcError};
use subxt_rpcs::methods::chain_head::{FollowEvent, Initialized, RuntimeEvent};
use futures::stream::{Stream, StreamExt};
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use subxt_rpcs::methods::chain_head::{FollowEvent, Initialized, RuntimeEvent};

/// A `Stream` which builds on `FollowStreamDriver`, and allows multiple subscribers to obtain events
/// from the single underlying subscription (each being provided an `Initialized` message and all new
Expand Down Expand Up @@ -454,9 +454,12 @@ where
.iter()
.position(|b| b.hash() == p.hash())
else {
return Poll::Ready(Some(Err(RpcError::ClientError(subxt_rpcs::Error::DisconnectedWillReconnect(
"Missed at least one block when the connection was lost".to_owned(),
))
return Poll::Ready(Some(Err(RpcError::ClientError(
subxt_rpcs::Error::DisconnectedWillReconnect(
"Missed at least one block when the connection was lost"
.to_owned(),
),
)
.into())));
};

Expand Down
2 changes: 1 addition & 1 deletion subxt/src/backend/chain_head/follow_stream_unpin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use super::follow_stream::FollowStream;
use super::ChainHeadRpcMethods;
use crate::config::{BlockHash, Config};
use crate::error::Error;
use futures::stream::{FuturesUnordered, Stream, StreamExt};
use subxt_rpcs::methods::chain_head::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock,
};
use futures::stream::{FuturesUnordered, Stream, StreamExt};

use std::collections::{HashMap, HashSet};
use std::future::Future;
Expand Down
19 changes: 9 additions & 10 deletions subxt/src/backend/chain_head/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@ mod follow_stream_driver;
mod follow_stream_unpin;
mod storage_items;

use subxt_rpcs::RpcClient;
use subxt_rpcs::methods::chain_head::{
FollowEvent, MethodResponse, RuntimeEvent, StorageQuery, StorageQueryType, StorageResultType,
};
use self::follow_stream_driver::FollowStreamFinalizedHeads;
use crate::backend::{
utils::retry, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse,
StreamOf, StreamOfResults, TransactionStatus,
utils::retry, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf,
StreamOfResults, TransactionStatus,
};
use crate::config::BlockHash;
use crate::error::{Error, RpcError};
Expand All @@ -35,6 +31,10 @@ use futures::{Stream, StreamExt};
use std::collections::HashMap;
use std::task::Poll;
use storage_items::StorageItems;
use subxt_rpcs::methods::chain_head::{
FollowEvent, MethodResponse, RuntimeEvent, StorageQuery, StorageQueryType, StorageResultType,
};
use subxt_rpcs::RpcClient;

// Expose the RPC methods.
pub use subxt_rpcs::methods::chain_head::ChainHeadRpcMethods;
Expand Down Expand Up @@ -340,7 +340,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
retry(|| async {
let genesis_hash = self.methods.chainspec_v1_genesis_hash().await?;
Ok(genesis_hash)
}).await
})
.await
}

async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
Expand Down Expand Up @@ -670,9 +671,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
finalized_hash = Some(block.hash);
continue;
}
RpcTransactionStatus::BestChainBlockIncluded {
block: Some(block),
} => {
RpcTransactionStatus::BestChainBlockIncluded { block: Some(block) } => {
// Look up a pinned block ref if we can, else return a non-pinned
// block that likely isn't accessible. We have no guarantee that a best
// block on the node a tx was sent to will ever be known about on the
Expand Down
12 changes: 7 additions & 5 deletions subxt/src/backend/chain_head/storage_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use super::follow_stream_driver::FollowStreamDriverHandle;
use super::follow_stream_unpin::BlockRef;
use crate::config::Config;
use crate::error::{Error, RpcError};
use subxt_rpcs::methods::chain_head::{
ChainHeadRpcMethods, FollowEvent, MethodResponse, StorageQuery, StorageResult,
};
use futures::{FutureExt, Stream, StreamExt};
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use subxt_rpcs::methods::chain_head::{
ChainHeadRpcMethods, FollowEvent, MethodResponse, StorageQuery, StorageResult,
};

/// Obtain a stream of storage items given some query. this handles continuing
/// and stopping under the hood, and returns a stream of `StorageResult`s.
Expand Down Expand Up @@ -59,8 +59,10 @@ impl<T: Config> StorageItems<T> {
let operation_id = operation_id.clone();
let methods = methods.clone();

Box::pin(async move {
let cont = methods.chainhead_v1_continue(&sub_id, &operation_id).await?;
Box::pin(async move {
let cont = methods
.chainhead_v1_continue(&sub_id, &operation_id)
.await?;
Ok(cont)
})
})
Expand Down
Loading

0 comments on commit 467a5ec

Please sign in to comment.