Skip to content

Commit

Permalink
Have one thread per chain on benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Feb 20, 2025
1 parent fb96a12 commit 43ce4d1
Show file tree
Hide file tree
Showing 9 changed files with 376 additions and 322 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ colored = "2.1.0"
comfy-table = "7.1.0"
convert_case = "0.6.0"
criterion = { version = "0.5.1", default-features = false }
crossbeam-channel = "0.5.14"
custom_debug_derive = "0.6.1"
dashmap = "5.5.3"
derive_more = "1.0.0"
Expand Down
8 changes: 7 additions & 1 deletion linera-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ version.workspace = true

[features]
test = ["linera-views/test", "linera-execution/test"]
benchmark = ["linera-base/test", "dep:linera-sdk", "dep:tokio-util"]
benchmark = [
"linera-base/test",
"dep:linera-sdk",
"dep:tokio-util",
"dep:crossbeam-channel",
]
wasmer = [
"linera-core/wasmer",
"linera-execution/wasmer",
Expand Down Expand Up @@ -55,6 +60,7 @@ bcs.workspace = true
cfg-if.workspace = true
chrono = { workspace = true, features = ["clock"] }
clap.workspace = true
crossbeam-channel = { workspace = true, optional = true }
derive_more = { workspace = true, features = ["deref", "deref_mut"] }
dirs.workspace = true
futures.workspace = true
Expand Down
259 changes: 259 additions & 0 deletions linera-client/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;

use linera_base::{
crypto::AccountSecretKey,
data_types::Timestamp,
hashed::Hashed,
identifiers::{ChainId, Owner},
listen_for_shutdown_signals,
time::Instant,
};
use linera_chain::{
data_types::{BlockProposal, ProposedBlock},
types::ConfirmedBlock,
};
use linera_core::{client::ChainClient, local_node::LocalNodeClient};
use linera_execution::{
committee::{Committee, Epoch},
system::SystemOperation,
Operation,
};
use linera_rpc::node_provider::NodeProvider;
use linera_storage::Storage;
use tokio::{runtime::Handle, task, time};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn, Instrument as _};

use crate::Error;

pub struct Benchmark<Storage>
where
Storage: linera_storage::Storage,
{
_phantom: std::marker::PhantomData<Storage>,
}

impl<S> Benchmark<S>
where
S: Storage + Clone + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
pub async fn run_benchmark(
num_chains: usize,
transactions_per_block: usize,
bps: Option<usize>,
chain_clients: HashMap<ChainId, ChainClient<NodeProvider, S>>,
epoch: Epoch,
blocks_infos: Vec<(ChainId, Vec<Operation>, AccountSecretKey)>,
committee: Committee,
local_node: LocalNodeClient<S>,
) -> Result<(), Error> {
let shutdown_notifier = CancellationToken::new();
tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone()));

// The bps control task will control the BPS from the threads. `crossbeam_channel` is used
// for two reasons:
// 1. it allows bounded channels with zero sized buffers.
// 2. it blocks the current thread if the message can't be sent, which is exactly
// what we want to happen here. `tokio::sync::mpsc` doesn't do that. `std::sync::mpsc`
// does, but it is slower than `crossbeam_channel`.
// Number 1 is the main reason. `tokio::sync::mpsc` doesn't allow 0 sized buffers.
// With a channel with a buffer of size 1 or larger, even if we have already reached
// the desired BPS, the tasks would continue sending block proposals until the channel's
// buffer is filled, which would cause us to not properly control the BPS rate.
let (sender, receiver) = crossbeam_channel::bounded(0);
let bps_control_task = tokio::spawn(async move {
let mut recv_count = 0;
let mut start = time::Instant::now();
while let Ok(()) = receiver.recv() {
recv_count += 1;
if recv_count == num_chains {
let elapsed = start.elapsed();
if let Some(bps) = bps {
if elapsed > time::Duration::from_secs(1) {
warn!(
"Failed to achieve {} BPS/{} TPS in {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis(),
);
} else {
time::sleep(time::Duration::from_secs(1) - elapsed).await;
info!(
"Achieved {} BPS/{} TPS in {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis(),
);
}
} else {
let achieved_bps = num_chains as f64 / elapsed.as_secs_f64();
info!(
"Achieved {} BPS/{} TPS in {} ms",
achieved_bps,
achieved_bps * transactions_per_block as f64,
elapsed.as_millis(),
);
}

recv_count = 0;
start = time::Instant::now();
}
}

info!("Exiting logging task...");
});

let mut bps_remainder = bps.unwrap_or_default() % num_chains;
let bps_share = bps.map(|bps| bps / num_chains);

let mut join_set = task::JoinSet::<Result<(), Error>>::new();
for (chain_id, operations, key_pair) in blocks_infos {
let bps_share = if bps_remainder > 0 {
bps_remainder -= 1;
Some(bps_share.unwrap() + 1)
} else {
bps_share
};

let shutdown_notifier = shutdown_notifier.clone();
let sender = sender.clone();
let handle = Handle::current();
let committee = committee.clone();
let local_node = local_node.clone();
let chain_client = chain_clients[&chain_id].clone();
let short_chain_id = format!("{:?}", chain_id);
join_set.spawn_blocking(move || {
handle.block_on(
async move {
Self::run_benchmark_internal(
bps_share,
operations,
key_pair,
epoch,
chain_client,
shutdown_notifier,
sender,
committee,
local_node,
)
.await?;

Ok(())
}
.instrument(tracing::info_span!(
"benchmark_chain_id",
chain_id = short_chain_id
)),
)
});
}

join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
drop(sender);
info!("All benchmark tasks completed");
bps_control_task.await?;

Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn run_benchmark_internal(
bps: Option<usize>,
operations: Vec<Operation>,
key_pair: AccountSecretKey,
epoch: Epoch,
chain_client: ChainClient<NodeProvider, S>,
shutdown_notifier: CancellationToken,
sender: crossbeam_channel::Sender<()>,
committee: Committee,
local_node: LocalNodeClient<S>,
) -> Result<(), Error> {
let chain_id = chain_client.chain_id();
info!(
"Starting benchmark at target BPS of {:?}, for chain {:?}",
bps, chain_id
);
let cross_chain_message_delivery = chain_client.options().cross_chain_message_delivery;
let mut num_sent_proposals = 0;
loop {
if shutdown_notifier.is_cancelled() {
info!("Shutdown signal received, stopping benchmark");
break;
}
let block = ProposedBlock {
epoch,
chain_id,
incoming_bundles: Vec::new(),
operations: operations.clone(),
previous_block_hash: chain_client.block_hash(),
height: chain_client.next_block_height(),
authenticated_signer: Some(Owner::from(key_pair.public())),
timestamp: chain_client.timestamp().max(Timestamp::now()),
};
let executed_block = local_node
.stage_block_execution(block.clone(), None)
.await?
.0;

let value = Hashed::new(ConfirmedBlock::new(executed_block));
let proposal =
BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, &key_pair);

chain_client
.submit_block_proposal(&committee, Box::new(proposal), value)
.await?;
let next_block_height = chain_client.next_block_height();
// We assume the committee will not change during the benchmark.
chain_client
.communicate_chain_updates(
&committee,
chain_id,
next_block_height,
cross_chain_message_delivery,
)
.await?;

num_sent_proposals += 1;
if let Some(bps) = bps {
if num_sent_proposals == bps {
sender.send(())?;
num_sent_proposals = 0;
}
} else {
sender.send(())?;
break;
}
}

Self::close_benchmark_chain(chain_client).await?;
info!("Exiting task...");
Ok(())
}

/// Closes the chain that was created for the benchmark.
async fn close_benchmark_chain(
chain_client: ChainClient<NodeProvider, S>,
) -> Result<(), Error> {
let start = Instant::now();
chain_client
.execute_operation(Operation::System(SystemOperation::CloseChain))
.await?
.expect("Close chain operation should not fail!");

debug!(
"Closed chain {:?} in {} ms",
chain_client.chain_id(),
start.elapsed().as_millis()
);

Ok(())
}
}
Loading

0 comments on commit 43ce4d1

Please sign in to comment.