Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Wire unified scheduler into banking experimentally
Browse files Browse the repository at this point in the history
ryoqun committed Dec 6, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 744db01 commit 16c8b91
Showing 48 changed files with 2,285 additions and 649 deletions.
71 changes: 68 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -294,11 +294,13 @@ curve25519-dalek = { version = "4.1.3", features = ["digest", "rand_core"] }
dashmap = "5.5.3"
derivation-path = { version = "0.2.0", default-features = false }
derive-where = "1.2.7"
derive_more = { version = "1.0.0", features = ["full"] }
dialoguer = "0.10.4"
digest = "0.10.7"
dir-diff = "0.3.3"
dirs-next = "2.0.0"
dlopen2 = "0.5.0"
dyn-clone = "1.0.17"
eager = "0.1.0"
ed25519-dalek = "=1.0.1"
ed25519-dalek-bip32 = "0.2.0"
@@ -629,6 +631,7 @@ tokio-util = "0.7"
toml = "0.8.12"
tonic = "0.9.2"
tonic-build = "0.9.2"
trait-set = "0.3.0"
trees = "0.4.2"
tungstenite = "0.20.1"
uriparse = "0.6.4"
4 changes: 3 additions & 1 deletion banking-bench/Cargo.toml
Original file line number Diff line number Diff line change
@@ -9,13 +9,14 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
assert_matches = { workspace = true }
clap = { version = "3.1.8", features = ["derive", "cargo"] }
crossbeam-channel = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
solana-client = { workspace = true }
solana-core = { workspace = true }
solana-core = { workspace = true, features = ["dev-context-only-utils"] }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-logger = { workspace = true }
@@ -26,6 +27,7 @@ solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-tpu-client = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-version = { workspace = true }

[features]
89 changes: 66 additions & 23 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#![allow(clippy::arithmetic_side_effects)]
use {
assert_matches::assert_matches,
clap::{crate_description, crate_name, Arg, ArgEnum, Command},
crossbeam_channel::{unbounded, Receiver},
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::BankingStage,
banking_trace::{BankingPacketBatch, BankingTracer, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT},
banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage},
banking_trace::{
BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
},
validator::BlockProductionMethod,
},
solana_gossip::cluster_info::{ClusterInfo, Node},
@@ -29,13 +32,15 @@ use {
hash::Hash,
message::Message,
pubkey::{self, Pubkey},
scheduling::SchedulingMode,
signature::{Keypair, Signature, Signer},
system_instruction, system_transaction,
timing::timestamp,
transaction::Transaction,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_unified_scheduler_pool::{DefaultSchedulerPool, SupportedSchedulingMode},
std::{
sync::{atomic::Ordering, Arc, RwLock},
thread::sleep,
@@ -347,7 +352,7 @@ fn main() {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let bank0 = Bank::new_for_benches(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank0);
let mut bank = bank_forks.read().unwrap().working_bank();
let mut bank = bank_forks.read().unwrap().working_bank_with_scheduler();

// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker()
@@ -440,9 +445,36 @@ fn main() {
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
)))
.unwrap();
let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();
let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) = banking_tracer.create_channel_gossip_vote();
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let scheduler_pool = if matches!(
block_production_method,
BlockProductionMethod::UnifiedScheduler
) {
let pool = DefaultSchedulerPool::new(
SupportedSchedulingMode::Either(SchedulingMode::BlockProduction),
None,
None,
None,
Some(replay_vote_sender.clone()),
prioritization_fee_cache.clone(),
poh_recorder.read().unwrap().new_recorder(),
);
bank_forks
.write()
.unwrap()
.install_scheduler_pool(pool.clone());
Some(pool)
} else {
None
};
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(scheduler_pool.as_ref());
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
@@ -461,7 +493,7 @@ fn main() {
),
};
let banking_stage = BankingStage::new_num_threads(
block_production_method,
block_production_method.clone(),
&cluster_info,
&poh_recorder,
non_vote_receiver,
@@ -473,10 +505,23 @@ fn main() {
None,
Arc::new(connection_cache),
bank_forks.clone(),
&Arc::new(PrioritizationFeeCache::new(0u64)),
&prioritization_fee_cache,
false,
scheduler_pool,
);

// This bench processes transactions, starting from the very first bank, so special-casing is
// needed for unified scheduler.
if matches!(
block_production_method,
BlockProductionMethod::UnifiedScheduler
) {
bank = bank_forks
.write()
.unwrap()
.reinstall_block_production_scheduler_into_working_genesis_bank();
}

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
@@ -537,33 +582,31 @@ fn main() {
tx_total_us += now.elapsed().as_micros() as u64;

let mut poh_time = Measure::start("poh_time");
poh_recorder
let cleared_bank = poh_recorder
.write()
.unwrap()
.reset(bank.clone(), Some((bank.slot(), bank.slot() + 1)));
assert_matches!(cleared_bank, None);
poh_time.stop();

let mut new_bank_time = Measure::start("new_bank");
if let Some((result, _timings)) = bank.wait_for_completed_scheduler() {
assert_matches!(result, Ok(_));
}
let new_slot = bank.slot() + 1;
let new_bank = Bank::new_from_parent(bank, &collector, new_slot);
let new_bank = Bank::new_from_parent(bank.clone(), &collector, new_slot);
new_bank_time.stop();

let mut insert_time = Measure::start("insert_time");
bank_forks.write().unwrap().insert(new_bank);
bank = bank_forks.read().unwrap().working_bank();
update_bank_forks_and_poh_recorder_for_new_tpu_bank(
&bank_forks,
&poh_recorder,
new_bank,
false,
);
bank = bank_forks.read().unwrap().working_bank_with_scheduler();
insert_time.stop();

// set cost tracker limits to MAX so it will not filter out TXs
bank.write_cost_tracker()
.unwrap()
.set_limits(u64::MAX, u64::MAX, u64::MAX);

assert!(poh_recorder.read().unwrap().bank().is_none());
poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());
assert!(poh_recorder.read().unwrap().bank().is_some());
debug!(
"new_bank_time: {}us insert_time: {}us poh_time: {}us",
new_bank_time.as_us(),
Loading

0 comments on commit 16c8b91

Please sign in to comment.