Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimisation: Prevent creating multiple ws connections on client #654

Merged
merged 5 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions avail-rust/src/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ pub struct SDK {

impl SDK {
pub async fn new(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
let api = Api::from_url(endpoint).await?;
let rpc = Rpc::new(endpoint, true).await?;
// Cloning RpcClient is cheaper and doesn't create a new WS connection.
let api = Api::from_rpc_client(rpc.client.clone()).await?;

Ok(SDK {
tx: Transactions::new(api.clone(), rpc.clone()),
Expand All @@ -22,8 +23,8 @@ impl SDK {
}

pub async fn new_insecure(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
let api = Api::from_insecure_url(endpoint).await?;
let rpc = Rpc::new(endpoint, false).await?;
let api = Api::from_rpc_client(rpc.client.clone()).await?;

Ok(SDK {
tx: Transactions::new(api.clone(), rpc.clone()),
Expand Down
54 changes: 2 additions & 52 deletions avail-subxt/src/avail_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::AvailConfig;

use core::ops::Deref;
pub use jsonrpsee::core::client::Client as RpcMethods;
use subxt::{
backend::{legacy::LegacyRpcMethods, rpc::RpcClient},
utils::validate_url_is_secure,
Expand All @@ -10,41 +9,24 @@ use subxt::{

#[derive(Debug)]
pub struct AvailClient {
rpc_methods: RpcMethods,
online: OnlineClient<AvailConfig>,
rpc: RpcClient,
}

impl AvailClient {
pub async fn new<U: AsRef<str>>(ws_uri: U) -> Result<Self, Error> {
validate_url_is_secure(ws_uri.as_ref())?;
let rpc_methods = jsonrpsee_helpers::client(ws_uri.as_ref())
.await
.map_err(|e| Error::Other(format!("Client cannot be created: {e:?}")))?;

let rpc = RpcClient::from_url(ws_uri).await?;
let online = OnlineClient::<AvailConfig>::from_rpc_client(rpc.clone()).await?;

Ok(AvailClient {
rpc_methods,
online,
rpc,
})
Ok(AvailClient { online, rpc })
}

pub async fn new_insecure<U: AsRef<str>>(ws_uri: U) -> Result<Self, Error> {
let rpc_methods = jsonrpsee_helpers::client(ws_uri.as_ref())
.await
.map_err(|e| Error::Other(format!("Client cannot be created: {e:?}")))?;

let rpc = RpcClient::from_insecure_url(ws_uri).await?;
let online = OnlineClient::<AvailConfig>::from_rpc_client(rpc.clone()).await?;

Ok(AvailClient {
rpc_methods,
online,
rpc,
})
Ok(AvailClient { online, rpc })
}

pub fn legacy_rpc(&self) -> LegacyRpcMethods<AvailConfig> {
Expand All @@ -58,10 +40,6 @@ impl AvailClient {
pub fn online(&self) -> &OnlineClient<AvailConfig> {
&self.online
}

pub fn rpc_methods(&self) -> &RpcMethods {
&self.rpc_methods
}
}

impl Deref for AvailClient {
Expand All @@ -70,31 +48,3 @@ impl Deref for AvailClient {
&self.online
}
}

// #[cfg(feature = "native")]
mod jsonrpsee_helpers {
pub use jsonrpsee::{
client_transport::ws::{self, EitherStream, Url, WsTransportClientBuilder},
core::client::{Client, Error},
};
use tokio_util::compat::Compat;

pub type Sender = ws::Sender<Compat<EitherStream>>;
pub type Receiver = ws::Receiver<Compat<EitherStream>>;

/// Build WS RPC client from URL
pub async fn client(url: &str) -> Result<Client, Error> {
let (sender, receiver) = ws_transport(url).await?;
Ok(Client::builder()
.max_buffer_capacity_per_subscription(4096)
.build_with_tokio(sender, receiver))
}

async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> {
let url = Url::parse(url).map_err(|e| Error::Transport(e.into()))?;
WsTransportClientBuilder::default()
.build(url)
.await
.map_err(|e| Error::Transport(e.into()))
}
}
58 changes: 39 additions & 19 deletions e2e/src/tests/rpc_queries.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
use super::{alice_nonce, local_connection, no_concurrency};

use avail_core::data_proof::ProofResponse;
use avail_core::{AppExtrinsic, AppId, Keccak256};
use avail_subxt::{
api::{
self,
runtime_types::avail_core::{
header::extension::HeaderExtension, BlockLengthColumns, BlockLengthRows,
runtime_types::{
avail_core::{header::extension::HeaderExtension, BlockLengthColumns, BlockLengthRows},
frame_system::limits::BlockLength,
},
},
avail::{Cells, GDataProof, GRawScalar, Rows},
rpc::{GProof, KateRpcClient as _},
avail::{Cells, GDataProof, GRawScalar, GRow, Rows},
rpc::GProof,
submit::submit_data_with_nonce as submit_data,
tx,
utils::H256,
AvailClient, Cell,
AvailClient, Cell, RpcParams,
};
use kate::{
com::Cell as KateCell,
gridgen::{AsBytes as _, EvaluationGrid},
};
use kate_recovery::{matrix::Dimensions, proof::verify};
use subxt::Error;
use subxt_signer::sr25519::dev;

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -51,7 +54,9 @@ async fn eval_grid_from_block(client: &AvailClient, block_hash: H256) -> Result<
app_extrinsics.push(app_extrinsic);
}

let block_len = client.rpc_methods().query_block_length(block_hash).await?;
let mut params = RpcParams::new();
params.push(block_hash)?;
let block_len: BlockLength = client.rpc().request("kate_blockLength", params).await?;
let max_width = block_len.cols.0 as usize;
let max_height = block_len.rows.0 as usize;
let seed = [0u8; 32];
Expand Down Expand Up @@ -112,10 +117,10 @@ pub async fn rpc_query_proof_test() -> Result<()> {
.collect();

// RPC call
let actual_proofs: Vec<GDataProof> = client
.rpc_methods()
.query_proof(Cells::try_from(cells).unwrap(), block_hash)
.await?;
let mut params = RpcParams::new();
params.push(Cells::try_from(cells).unwrap())?;
params.push(block_hash)?;
let actual_proofs: Vec<GDataProof> = client.rpc().request("kate_queryProof", params).await?;

let len = actual_proofs.len();
assert_eq!(actual_proofs.len(), expected_proofs.len());
Expand Down Expand Up @@ -147,7 +152,10 @@ pub async fn rpc_query_proof_test_2() -> Result<()> {
let cells = Cells::try_from(vec![cell.clone()]).unwrap();

// RPC call
let actual_proof: Vec<GDataProof> = client.rpc_methods().query_proof(cells, block_hash).await?;
let mut params = RpcParams::new();
params.push(cells)?;
params.push(block_hash)?;
let actual_proof: Vec<GDataProof> = client.rpc().request("kate_queryProof", params).await?;
let actual_proof: Vec<u8> = actual_proof
.iter()
.flat_map(|(raw_scalar, g_proof)| {
Expand Down Expand Up @@ -191,7 +199,10 @@ pub async fn rpc_query_proof_test_2() -> Result<()> {
let cells = Cells::try_from(vec![cell.clone()]).unwrap();

// RPC call
let actual_proof: Vec<GDataProof> = client.rpc_methods().query_proof(cells, block_hash).await?;
let mut params = RpcParams::new();
params.push(cells)?;
params.push(block_hash)?;
let actual_proof: Vec<GDataProof> = client.rpc().request("kate_queryProof", params).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As side-effect, this also remove the capability of use the RPC with type safety.
I think it is not a good a Rusty-approach, because any future change in RPC on future release will be hardly detected on Rust clients.

let actual_proof: Vec<u8> = actual_proof
.iter()
.flat_map(|(raw_scalar, g_proof)| {
Expand Down Expand Up @@ -242,17 +253,21 @@ pub async fn empty_commitments_test() -> Result<()> {

// query_rows should fail for block with empty commitments
let row_indexes = Rows::truncate_from(vec![0]);
let rows = client
.rpc_methods()
.query_rows(row_indexes, block_hash)
.await;
let mut params = RpcParams::new();
params.push(row_indexes)?;
params.push(block_hash)?;
let rows: Result<Vec<GRow>, Error> = client.rpc().request("kate_queryRows", params).await;
assert!(rows.is_err());

// query_proof should fail for block with empty commitments
let cell = Cell { row: 0, col: 0 };
let cells = Cells::try_from(vec![cell.clone()]).unwrap();

let proof = client.rpc_methods().query_proof(cells, block_hash).await;
let mut params = RpcParams::new();
params.push(cells)?;
params.push(block_hash)?;
let proof: Result<Vec<GDataProof>, Error> =
client.rpc().request("kate_queryProof", params).await;
assert!(proof.is_err());
Ok(())
}
Expand All @@ -270,7 +285,9 @@ pub async fn rpc_query_block_length_test() -> Result<()> {
.block_hash();

// RPC call
let length = client.rpc_methods().query_block_length(block_hash).await?;
let mut params = RpcParams::new();
params.push(block_hash)?;
let length: BlockLength = client.rpc().request("kate_blockLength", params).await?;
assert_eq!(length.cols, BlockLengthColumns(256));
assert_eq!(length.rows, BlockLengthRows(256));
assert_eq!(length.chunk_size, 32);
Expand All @@ -292,7 +309,10 @@ pub async fn rpc_query_data_proof_test() -> Result<()> {

let expected_proof_root = merkle_proof::<Keccak256, _, _>(vec![keccak_256(DATA)], 0);

let actual_proof = client.rpc_methods().query_data_proof(1, block_hash).await?;
let mut params = RpcParams::new();
params.push(1)?;
params.push(block_hash)?;
let actual_proof: ProofResponse = client.rpc().request("kate_queryDataProof", params).await?;
// root is calculated keccak256(blob_root, bridge_root)
let mut root_data = vec![];
root_data.extend(expected_proof_root.root.as_bytes());
Expand Down
21 changes: 10 additions & 11 deletions e2e/src/tests/submit_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use super::{alice_nonce, allow_concurrency, local_connection};

use avail_core::AppId;
use avail_subxt::{
avail::{Cells, Rows, TxInBlock, TxProgress},
avail::{GDataProof, GRow, Rows, TxInBlock, TxProgress},
primitives::Cell,
rpc::KateRpcClient as _,
submit::submit_data_with_nonce,
tx,
tx, RpcParams,
};
use subxt_signer::sr25519::dev;

Expand Down Expand Up @@ -94,10 +93,10 @@ async fn main() -> anyhow::Result<()> {
// Note: Ideal way to get the rows for specific appData, we should use the app_specific_rows from kate recovery, which is out scope for this example
// 1. Check query rows.
let row_indexes = Rows::truncate_from(vec![0]);
let query_rows = client
.rpc_methods()
.query_rows(Rows::truncate_from(row_indexes.to_vec()), block_hash)
.await?;
let mut params = RpcParams::new();
params.push(row_indexes)?;
params.push(block_hash)?;
let query_rows: Vec<GRow> = client.rpc().request("kate_queryRows", params).await?;
trace!("Query rows RPC: {query_rows:?}");

// 3. Check proof.
Expand All @@ -108,10 +107,10 @@ async fn main() -> anyhow::Result<()> {
Cell::new(0, col)
})
.collect::<Vec<_>>();
let proof = client
.rpc_methods()
.query_proof(Cells::truncate_from(cells), block_hash)
.await?;
let mut params = RpcParams::new();
params.push(cells)?;
params.push(block_hash)?;
let proof: Vec<GDataProof> = client.rpc().request("kate_queryProof", params).await?;
trace!("Query proof RPC: {proof:?}");

Ok(())
Expand Down
26 changes: 15 additions & 11 deletions e2e/src/tests/vector_send_msg.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::{alice_nonce, allow_concurrency, local_connection};

use avail_core::{
data_proof::{tx_uid, AddressedMessage, BoundedData, Message, SubTrie},
data_proof::{tx_uid, AddressedMessage, BoundedData, Message, ProofResponse, SubTrie},
AppId, Keccak256,
};
use avail_subxt::{
api, avail_client::RpcMethods, rpc::KateRpcClient as _, tx, AccountId, AvailClient,
api, api::runtime_types::frame_system::limits::BlockLength, tx, AccountId, AvailClient,
RpcParams,
};
use subxt::{backend::BlockRef, error::RpcError, utils::H256, Error};
use subxt_signer::sr25519::dev;
Expand Down Expand Up @@ -116,11 +117,8 @@ fn messages_to_leaves(block_number: u32, tx_indexes: Vec<u32>) -> Vec<Leaf> {
.collect::<Vec<_>>()
}

async fn check_query_data_proof_rpc(
rpc_methods: &RpcMethods,
block_hash: H256,
leaves: &[Leaf],
) -> Result<(), Error> {
async fn check_query_data_proof_rpc(block_hash: H256, leaves: &[Leaf]) -> Result<(), Error> {
let client = local_connection().await.unwrap();
let indexed_leafs_len = leaves.len();
for indexed_leaf in leaves {
let Leaf {
Expand All @@ -129,8 +127,12 @@ async fn check_query_data_proof_rpc(
leaf,
} = indexed_leaf;

let rpc_proof = rpc_methods
.query_data_proof(*tx_idx, block_hash)
let mut params = RpcParams::new();
params.push(*tx_idx)?;
params.push(block_hash)?;
let rpc_proof: ProofResponse = client
.rpc()
.request("kate_queryDataProof", params)
.await
.map_err(|je| RpcError::ClientError(Box::new(je)))?;
let bridge_root = rpc_proof.data_proof.roots.bridge_root;
Expand Down Expand Up @@ -167,10 +169,12 @@ async fn vector_send_msg() -> anyhow::Result<()> {
let indexed_leaves = messages_to_leaves(block_number, tx_indexes);

// 2. Use Kate to get the proof and double-check it.
check_query_data_proof_rpc(client.rpc_methods(), block_hash, &indexed_leaves).await?;
check_query_data_proof_rpc(block_hash, &indexed_leaves).await?;

// 3. Test query_block len RPC.
let block_len = client.rpc_methods().query_block_length(block_hash).await?;
let mut params = RpcParams::new();
params.push(block_hash)?;
let block_len: BlockLength = client.rpc().request("kate_blockLength", params).await?;
trace!(
"Test query_block_length RPC: cols={}, rows={}",
block_len.cols.0,
Expand Down
Loading