Skip to content

Commit

Permalink
Use Arc for public parameters to avoid cloning.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Aug 11, 2023
1 parent bc66e9a commit 4ff443f
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 28 deletions.
18 changes: 9 additions & 9 deletions src/app_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::{
trait AppClient {
async fn reconstruct_rows_from_dht(
&self,
pp: PublicParameters,
pp: Arc<PublicParameters>,
block_number: u32,
dimensions: Dimensions,
commitments: &[[u8; config::COMMITMENT_SIZE]],
Expand Down Expand Up @@ -86,7 +86,7 @@ struct AppClientImpl {
impl AppClient for AppClientImpl {
async fn reconstruct_rows_from_dht(
&self,
pp: PublicParameters,
pp: Arc<PublicParameters>,
block_number: u32,
dimensions: Dimensions,
commitments: &[[u8; config::COMMITMENT_SIZE]],
Expand All @@ -104,7 +104,7 @@ impl AppClient for AppClientImpl {
missing_cells.len()
);
let (fetched, unfetched) = fetch_verified(
&pp,
pp.clone(),
&self.network_client,
block_number,
dimensions,
Expand All @@ -124,7 +124,7 @@ impl AppClient for AppClientImpl {
columns_positions(dimensions, &unfetched, Percent::from_percent(66), &mut rng);

let (missing_fetched, _) = fetch_verified(
&pp,
pp,
&self.network_client,
block_number,
dimensions,
Expand Down Expand Up @@ -253,7 +253,7 @@ fn data_cell(
}

async fn fetch_verified(
pp: &PublicParameters,
pp: Arc<PublicParameters>,
network_client: &Client,
block_number: u32,
dimensions: Dimensions,
Expand All @@ -280,7 +280,7 @@ async fn process_block(
cfg: &AppClientConfig,
app_id: AppId,
block: &BlockVerified,
pp: PublicParameters,
pp: Arc<PublicParameters>,
) -> Result<()> {
let lookup = &block.lookup;
let block_number = block.block_num;
Expand Down Expand Up @@ -419,7 +419,7 @@ pub async fn run(
rpc_client: avail::Client,
app_id: AppId,
mut block_receive: Receiver<BlockVerified>,
pp: PublicParameters,
pp: Arc<PublicParameters>,
) {
info!("Starting for app {app_id}...");

Expand Down Expand Up @@ -463,7 +463,7 @@ mod tests {
async fn test_process_blocks_without_rpc() {
let mut cfg = AppClientConfig::from(&RuntimeConfig::default());
cfg.disable_rpc = true;
let pp = testnet::public_params(1024);
let pp = Arc::new(testnet::public_params(1024));
let dimensions: Dimensions = Dimensions::new(1, 128).unwrap();
let mut mock_client = MockAppClient::new();
let dht_fetched_rows: Vec<Option<Vec<u8>>> = [
Expand Down Expand Up @@ -517,7 +517,7 @@ mod tests {
#[tokio::test]
async fn test_process_block_with_rpc() {
let cfg = AppClientConfig::from(&RuntimeConfig::default());
let pp = testnet::public_params(1024);
let pp = Arc::new(testnet::public_params(1024));
let dimensions: Dimensions = Dimensions::new(1, 16).unwrap();
let mut mock_client = MockAppClient::new();
// let dht_missing_rows: Vec<u32> = vec![0];
Expand Down
14 changes: 7 additions & 7 deletions src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl LightClient for LightClientImpl {
pub async fn process_block(
light_client: &impl LightClient,
cfg: &LightClientConfig,
pp: &PublicParameters,
pp: Arc<PublicParameters>,
header: &Header,
received_at: Instant,
metrics: &Metrics,
Expand Down Expand Up @@ -395,7 +395,7 @@ pub struct Channels {
pub async fn run(
light_client: impl LightClient,
cfg: LightClientConfig,
pp: PublicParameters,
pp: Arc<PublicParameters>,
metrics: Metrics,
counter: Arc<Mutex<u32>>,
mut channels: Channels,
Expand All @@ -411,7 +411,7 @@ pub async fn run(
if let Err(error) = process_block(
&light_client,
&cfg,
&pp,
pp.clone(),
&header,
received_at,
&metrics,
Expand Down Expand Up @@ -482,7 +482,7 @@ mod tests {
async fn test_process_block_with_rpc() {
let mut mock_client = MockLightClient::new();
let cfg = LightClientConfig::from(&RuntimeConfig::default());
let pp = testnet::public_params(1024);
let pp = Arc::new(testnet::public_params(1024));
let mut metric_registry = Registry::default();
let metrics = telemetry::metrics::Metrics::new(&mut metric_registry);
let cells_fetched: Vec<Cell> = vec![];
Expand Down Expand Up @@ -602,7 +602,7 @@ mod tests {
mock_client
.expect_network_stats()
.returning(|| Box::pin(async move { Ok(()) }));
process_block(&mock_client, &cfg, &pp, &header, recv, &metrics, counter)
process_block(&mock_client, &cfg, pp, &header, recv, &metrics, counter)
.await
.unwrap();
}
Expand All @@ -612,7 +612,7 @@ mod tests {
let mut mock_client = MockLightClient::new();
let mut cfg = LightClientConfig::from(&RuntimeConfig::default());
cfg.disable_rpc = true;
let pp = testnet::public_params(1024);
let pp = Arc::new(testnet::public_params(1024));
let mut metric_registry = Registry::default();
let metrics = telemetry::metrics::Metrics::new(&mut metric_registry);
let cells_unfetched: Vec<Position> = vec![];
Expand Down Expand Up @@ -722,7 +722,7 @@ mod tests {
mock_client
.expect_network_stats()
.returning(|| Box::pin(async move { Ok(()) }));
process_block(&mock_client, &cfg, &pp, &header, recv, &metrics, counter)
process_block(&mock_client, &cfg, pp, &header, recv, &metrics, counter)
.await
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async fn run(error_sender: Sender<anyhow::Error>) -> Result<()> {
#[cfg(feature = "network-analysis")]
tokio::task::spawn(network_analyzer::start_traffic_analyzer(cfg.port, 10));

let pp = kate_recovery::testnet::public_params(1024);
let pp = Arc::new(kate_recovery::testnet::public_params(1024));
let raw_pp = pp.to_raw_var_bytes();
let public_params_hash = hex::encode(sp_core::blake2_128(&raw_pp));
let public_params_len = hex::encode(raw_pp).len();
Expand Down
4 changes: 2 additions & 2 deletions src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use kate_recovery::{
matrix::{Dimensions, Position},
proof,
};
use std::sync::mpsc::channel;
use std::sync::{mpsc::channel, Arc};
use tracing::error;

/// Verifies proofs for given block, cells and commitments
Expand All @@ -16,7 +16,7 @@ pub fn verify(
dimensions: Dimensions,
cells: &[Cell],
commitments: &[[u8; 48]],
public_parameters: &PublicParameters,
public_parameters: Arc<PublicParameters>,
) -> Result<(Vec<Position>, Vec<Position>), proof::Error> {
let cpus = num_cpus::get();
let pool = threadpool::ThreadPool::new(cpus);
Expand Down
19 changes: 10 additions & 9 deletions src/sync_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async fn process_block(
sync_client: &impl SyncClient,
block_number: u32,
cfg: &SyncClientConfig,
pp: &PublicParameters,
pp: Arc<PublicParameters>,
block_verified_sender: Option<Sender<BlockVerified>>,
) -> Result<()> {
if sync_client
Expand Down Expand Up @@ -261,7 +261,7 @@ pub async fn run(
cfg: SyncClientConfig,
start_block: u32,
end_block: u32,
pp: PublicParameters,
pp: Arc<PublicParameters>,
block_verified_sender: Option<Sender<BlockVerified>>,
) {
let rpc_client = sync_client.get_client();
Expand Down Expand Up @@ -338,8 +338,9 @@ pub async fn run(

// TODO: Should we handle unprocessed blocks differently?
let block_verified_sender = block_verified_sender.clone();
let pp = pp.clone();
if let Err(error) =
process_block(&sync_client, block_number, &cfg, &pp, block_verified_sender).await
process_block(&sync_client, block_number, &cfg, pp, block_verified_sender).await
{
error!(block_number, "Cannot process block: {error:#}");
}
Expand Down Expand Up @@ -380,7 +381,7 @@ mod tests {
#[tokio::test]
pub async fn test_process_blocks_without_rpc() {
let (block_tx, _) = channel::<types::BlockVerified>(10);
let pp = testnet::public_params(1024);
let pp = Arc::new(testnet::public_params(1024));
let mut cfg = SyncClientConfig::from(&RuntimeConfig::default());
cfg.disable_rpc = true;
let mut mock_client = MockSyncClient::new();
Expand Down Expand Up @@ -505,15 +506,15 @@ mod tests {
.expect_insert_cells_into_dht()
.withf(move |x, _| *x == 42)
.returning(move |_, _| Box::pin(async move { 1f32 }));
process_block(&mock_client, 42, &cfg, &pp, Some(block_tx))
process_block(&mock_client, 42, &cfg, pp, Some(block_tx))
.await
.unwrap();
}

#[tokio::test]
pub async fn test_process_blocks_with_rpc() {
let (block_tx, _) = channel::<types::BlockVerified>(10);
let pp = testnet::public_params(1024);
let pp = Arc::new(testnet::public_params(1024));
let cfg = SyncClientConfig::from(&RuntimeConfig::default());
let mut mock_client = MockSyncClient::new();
let header: DaHeader = DaHeader {
Expand Down Expand Up @@ -640,14 +641,14 @@ mod tests {
.expect_insert_cells_into_dht()
.withf(move |x, _| *x == 42)
.returning(move |_, _| Box::pin(async move { 1f32 }));
process_block(&mock_client, 42, &cfg, &pp, Some(block_tx))
process_block(&mock_client, 42, &cfg, pp, Some(block_tx))
.await
.unwrap();
}
#[tokio::test]
pub async fn test_header_in_dbstore() {
let (block_tx, _) = channel::<types::BlockVerified>(10);
let pp = testnet::public_params(1024);
let pp = Arc::new(testnet::public_params(1024));
let cfg = SyncClientConfig::from(&RuntimeConfig::default());
let mut mock_client = MockSyncClient::new();
mock_client
Expand All @@ -656,7 +657,7 @@ mod tests {
.returning(|_| Ok(true));
mock_client.expect_get_header_by_block_number().never();
mock_client.block_header_in_db(42).unwrap();
process_block(&mock_client, 42, &cfg, &pp, Some(block_tx))
process_block(&mock_client, 42, &cfg, pp, Some(block_tx))
.await
.unwrap();
}
Expand Down

0 comments on commit 4ff443f

Please sign in to comment.