Skip to content

Commit

Permalink
caches epoch specific info which stay fixed throughout epoch
Browse files Browse the repository at this point in the history
Gossip repeatedly locks bank-forks to read epoch stakes, epoch duration
or active features, all of which stay fixed throughout the epoch and
only change at epoch boundaries.
https://github.com/anza-xyz/agave/blob/4d0fc227d/gossip/src/cluster_info.rs#L1562
https://github.com/anza-xyz/agave/blob/4d0fc227d/gossip/src/cluster_info.rs#L2369
https://github.com/anza-xyz/agave/blob/4d0fc227d/gossip/src/cluster_info.rs#L2525

The commit instead implements a cache which refreshes only if the root
bank has moved to a new epoch.
  • Loading branch information
behzadnouri committed Jan 26, 2025
1 parent 981c4c1 commit 25d4097
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 67 deletions.
89 changes: 28 additions & 61 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use {
crds_value::{CrdsValue, CrdsValueLabel},
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots,
epoch_specs::EpochSpecs,
gossip_error::GossipError,
ping_pong::Pong,
protocol::{
Expand All @@ -49,7 +50,6 @@ use {
itertools::Itertools,
rand::{seq::SliceRandom, thread_rng, CryptoRng, Rng},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_feature_set::FeatureSet,
solana_ledger::shred::Shred,
solana_measure::measure::Measure,
solana_net_utils::{
Expand Down Expand Up @@ -103,6 +103,8 @@ use {
thiserror::Error,
};

const DEFAULT_EPOCH_DURATION: Duration =
Duration::from_millis(DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT);
/// milliseconds we sleep for between gossip requests
pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// A hard limit on incoming gossip messages
Expand Down Expand Up @@ -1456,11 +1458,10 @@ impl ClusterInfo {
fn handle_purge(
&self,
thread_pool: &ThreadPool,
bank_forks: Option<&RwLock<BankForks>>,
epoch_duration: Duration,
stakes: &HashMap<Pubkey, u64>,
) {
let self_pubkey = self.id();
let epoch_duration = get_epoch_duration(bank_forks, &self.stats);
let timeouts = self
.gossip
.make_timeouts(self_pubkey, stakes, epoch_duration);
Expand Down Expand Up @@ -1517,6 +1518,7 @@ impl ClusterInfo {
.thread_name(|i| format!("solRunGossip{i:02}"))
.build()
.unwrap();
let mut epoch_specs = bank_forks.map(EpochSpecs::from);
Builder::new()
.name("solGossip".to_string())
.spawn(move || {
Expand All @@ -1536,7 +1538,7 @@ impl ClusterInfo {
self.push_message(value);
}
let mut generate_pull_requests = true;
loop {
while !exit.load(Ordering::Relaxed) {
let start = timestamp();
if self.contact_debug_interval != 0
&& start - last_contact_info_trace > self.contact_debug_interval
Expand All @@ -1556,17 +1558,11 @@ impl ClusterInfo {
self.save_contact_info();
last_contact_info_save = start;
}

let (stakes, _feature_set) = match bank_forks {
Some(ref bank_forks) => {
let root_bank = bank_forks.read().unwrap().root_bank();
(
root_bank.current_epoch_staked_nodes(),
Some(root_bank.feature_set.clone()),
)
}
None => (Arc::default(), None),
};
let stakes = epoch_specs
.as_mut()
.map(EpochSpecs::current_epoch_staked_nodes)
.cloned()
.unwrap_or_default();
let _ = self.run_gossip(
&thread_pool,
gossip_validators.as_ref(),
Expand All @@ -1575,10 +1571,11 @@ impl ClusterInfo {
&sender,
generate_pull_requests,
);
if exit.load(Ordering::Relaxed) {
return;
}
self.handle_purge(&thread_pool, bank_forks.as_deref(), &stakes);
let epoch_duration = epoch_specs
.as_mut()
.map(EpochSpecs::epoch_duration)
.unwrap_or(DEFAULT_EPOCH_DURATION);
self.handle_purge(&thread_pool, epoch_duration, &stakes);
entrypoints_processed = entrypoints_processed || self.process_entrypoints();
//TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
Expand Down Expand Up @@ -2120,7 +2117,6 @@ impl ClusterInfo {
recycler: &PacketBatchRecycler,
response_sender: &PacketBatchSender,
stakes: &HashMap<Pubkey, u64>,
_feature_set: Option<&FeatureSet>,
epoch_duration: Duration,
should_check_duplicate_instance: bool,
) -> Result<(), GossipError> {
Expand Down Expand Up @@ -2339,7 +2335,7 @@ impl ClusterInfo {
fn run_listen(
&self,
recycler: &PacketBatchRecycler,
bank_forks: Option<&RwLock<BankForks>>,
mut epoch_specs: Option<&mut EpochSpecs>,
receiver: &Receiver<Vec<(/*from:*/ SocketAddr, Protocol)>>,
response_sender: &PacketBatchSender,
thread_pool: &ThreadPool,
Expand All @@ -2360,25 +2356,21 @@ impl ClusterInfo {
.add_relaxed(excess_count as u64);
}
}
// Using root_bank instead of working_bank here so that an enbaled
// feature does not roll back (if the feature happens to get enabled in
// a minority fork).
let (feature_set, stakes) = match bank_forks {
None => (None, Arc::default()),
Some(bank_forks) => {
let bank = bank_forks.read().unwrap().root_bank();
let feature_set = bank.feature_set.clone();
(Some(feature_set), bank.current_epoch_staked_nodes())
}
};
let stakes = epoch_specs
.as_mut()
.map(|epoch_specs| epoch_specs.current_epoch_staked_nodes())
.cloned()
.unwrap_or_default();
let epoch_duration = epoch_specs
.map(EpochSpecs::epoch_duration)
.unwrap_or(DEFAULT_EPOCH_DURATION);
self.process_packets(
packets,
thread_pool,
recycler,
response_sender,
&stakes,
feature_set.as_deref(),
get_epoch_duration(bank_forks, &self.stats),
epoch_duration,
should_check_duplicate_instance,
)?;
if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL {
Expand Down Expand Up @@ -2434,13 +2426,14 @@ impl ClusterInfo {
.thread_name(|i| format!("solGossipWork{i:02}"))
.build()
.unwrap();
let mut epoch_specs = bank_forks.map(EpochSpecs::from);
Builder::new()
.name("solGossipListen".to_string())
.spawn(move || {
while !exit.load(Ordering::Relaxed) {
if let Err(err) = self.run_listen(
&recycler,
bank_forks.as_deref(),
epoch_specs.as_mut(),
&requests_receiver,
&response_sender,
&thread_pool,
Expand Down Expand Up @@ -2512,23 +2505,6 @@ impl ClusterInfo {
}
}

// Returns root bank's epoch duration. Falls back on
// DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT
// if there are no working banks.
fn get_epoch_duration(bank_forks: Option<&RwLock<BankForks>>, stats: &GossipStats) -> Duration {
let num_slots = match bank_forks {
None => {
stats.get_epoch_duration_no_working_bank.add_relaxed(1);
DEFAULT_SLOTS_PER_EPOCH
}
Some(bank_forks) => {
let bank = bank_forks.read().unwrap().root_bank();
bank.get_slots_in_epoch(bank.epoch())
}
};
Duration::from_millis(num_slots * DEFAULT_MS_PER_SLOT)
}

#[derive(Debug)]
pub struct Sockets {
pub gossip: UdpSocket,
Expand Down Expand Up @@ -4186,15 +4162,6 @@ mod tests {
);
}

#[test]
fn test_get_epoch_millis_no_bank() {
let epoch_duration = get_epoch_duration(/*bank_forks:*/ None, &GossipStats::default());
assert_eq!(
epoch_duration.as_millis() as u64,
DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // 48 hours
);
}

#[test]
fn test_get_duplicate_shreds() {
let host1_key = Arc::new(Keypair::new());
Expand Down
6 changes: 0 additions & 6 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ pub struct GossipStats {
pub(crate) filter_pull_response: Counter,
pub(crate) generate_prune_messages: Counter,
pub(crate) generate_pull_responses: Counter,
pub(crate) get_epoch_duration_no_working_bank: Counter,
pub(crate) get_votes: Counter,
pub(crate) get_votes_count: Counter,
pub(crate) gossip_listen_loop_iterations_since_last_report: Counter,
Expand Down Expand Up @@ -515,11 +514,6 @@ pub(crate) fn submit_gossip_stats(
stats.window_request_loopback.clear(),
i64
),
(
"get_epoch_duration_no_working_bank",
stats.get_epoch_duration_no_working_bank.clear(),
i64
),
(
"generate_prune_messages",
stats.generate_prune_messages.clear(),
Expand Down
132 changes: 132 additions & 0 deletions gossip/src/epoch_specs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use {
solana_runtime::{
bank::Bank,
bank_forks::{BankForks, ReadOnlyAtomicSlot},
},
solana_sdk::{
clock::{Epoch, DEFAULT_MS_PER_SLOT},
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
},
std::{
collections::HashMap,
sync::{Arc, RwLock},
time::Duration,
},
};

// Caches epoch specific information which stay fixed throughout the epoch.
// Refreshes only if the root bank has moved to a new epoch.
pub(crate) struct EpochSpecs {
epoch: Epoch, // when fields were last updated.
epoch_schedule: EpochSchedule,
root: ReadOnlyAtomicSlot, // updated by bank-forks.
bank_forks: Arc<RwLock<BankForks>>,
current_epoch_staked_nodes: Arc<HashMap<Pubkey, /*stake:*/ u64>>,
epoch_duration: Duration,
}

impl EpochSpecs {
#[inline]
pub(crate) fn current_epoch_staked_nodes(&mut self) -> &Arc<HashMap<Pubkey, /*stake:*/ u64>> {
self.maybe_refresh();
&self.current_epoch_staked_nodes
}

#[inline]
pub(crate) fn epoch_duration(&mut self) -> Duration {
self.maybe_refresh();
self.epoch_duration
}

// Updates fields if root bank has moved to a new epoch.
fn maybe_refresh(&mut self) {
if self.epoch_schedule.get_epoch(self.root.get()) == self.epoch {
return; // still same epoch. nothing to update.
}
let root_bank = self.bank_forks.read().unwrap().root_bank();
debug_assert_eq!(
self.epoch_schedule.get_epoch(root_bank.slot()),
root_bank.epoch()
);
self.epoch = root_bank.epoch();
self.epoch_schedule = root_bank.epoch_schedule().clone();
self.current_epoch_staked_nodes = root_bank.current_epoch_staked_nodes();
self.epoch_duration = get_epoch_duration(&root_bank);
}
}

impl From<Arc<RwLock<BankForks>>> for EpochSpecs {
fn from(bank_forks: Arc<RwLock<BankForks>>) -> Self {
let (root, root_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.get_atomic_root(), bank_forks.root_bank())
};
Self {
epoch: root_bank.epoch(),
epoch_schedule: root_bank.epoch_schedule().clone(),
root,
bank_forks,
current_epoch_staked_nodes: root_bank.current_epoch_staked_nodes(),
epoch_duration: get_epoch_duration(&root_bank),
}
}
}

fn get_epoch_duration(bank: &Bank) -> Duration {
let num_slots = bank.get_slots_in_epoch(bank.epoch());
Duration::from_millis(num_slots * DEFAULT_MS_PER_SLOT)
}

#[cfg(test)]
mod tests {
use {
super::*,
solana_runtime::genesis_utils::{create_genesis_config, GenesisConfigInfo},
};

#[test]
fn test_get_epoch_duration() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let mut bank = Bank::new_for_tests(&genesis_config);
let epoch = 0;
let num_slots = 32;
assert_eq!(bank.epoch(), epoch);
assert_eq!(bank.get_slots_in_epoch(epoch), num_slots);
assert_eq!(
get_epoch_duration(&bank),
Duration::from_millis(num_slots * 400)
);
for slot in 1..32 {
bank = Bank::new_from_parent(Arc::new(bank), &Pubkey::new_unique(), slot);
assert_eq!(bank.epoch(), epoch);
assert_eq!(bank.get_slots_in_epoch(epoch), num_slots);
assert_eq!(
get_epoch_duration(&bank),
Duration::from_millis(num_slots * 400)
);
}
let epoch = 1;
let num_slots = 64;
for slot in 32..32 + num_slots {
bank = Bank::new_from_parent(Arc::new(bank), &Pubkey::new_unique(), slot);
assert_eq!(bank.epoch(), epoch);
assert_eq!(bank.get_slots_in_epoch(epoch), num_slots);
assert_eq!(
get_epoch_duration(&bank),
Duration::from_millis(num_slots * 400)
);
}
let epoch = 2;
let num_slots = 128;
for slot in 96..96 + num_slots {
bank = Bank::new_from_parent(Arc::new(bank), &Pubkey::new_unique(), slot);
assert_eq!(bank.epoch(), epoch);
assert_eq!(bank.get_slots_in_epoch(epoch), num_slots);
assert_eq!(
get_epoch_duration(&bank),
Duration::from_millis(num_slots * 400)
);
}
}
}
1 change: 1 addition & 0 deletions gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod duplicate_shred;
pub mod duplicate_shred_handler;
pub mod duplicate_shred_listener;
pub mod epoch_slots;
mod epoch_specs;
pub mod gossip_error;
pub mod gossip_service;
#[macro_use]
Expand Down

0 comments on commit 25d4097

Please sign in to comment.