Skip to content

Commit

Permalink
Merge pull request #2483 from subspace/improve-farmer-thread-allocation
Browse files Browse the repository at this point in the history
Improve farmer thread allocation
  • Loading branch information
nazar-pc authored Jan 31, 2024
2 parents 56ff653 + cc0aff4 commit 7dd004d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 38 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ hwlocality = { version = "1.0.0-alpha.1", features = ["vendored"], optional = tr
jsonrpsee = { version = "0.16.3", features = ["client"] }
lru = "0.12.1"
mimalloc = "0.1.39"
libmimalloc-sys = { version = "0.1.35", features = ["extended"] }
libmimalloc-sys = "0.1.35"
num_cpus = "1.16.0"
parity-scale-codec = "3.6.9"
parking_lot = "0.12.1"
Expand Down
52 changes: 24 additions & 28 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use subspace_farmer::utils::ss58::parse_ss58_reward_address;
use subspace_farmer::utils::{
all_cpu_cores, create_plotting_thread_pool_manager, parse_cpu_cores_sets,
recommended_number_of_farming_threads, run_future_in_dedicated_thread,
thread_pool_core_indices, AsyncJoinOnDrop,
thread_pool_core_indices, AsyncJoinOnDrop, CpuCoreSet,
};
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::PlottedSector;
Expand Down Expand Up @@ -132,7 +132,8 @@ pub(crate) struct FarmingArgs {
farm_during_initial_plotting: bool,
/// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some
/// compute-intensive operations during proving), defaults to number of logical CPUs
/// available on UMA system and number of logical CPUs in first NUMA node on NUMA system
/// available on UMA system and number of logical CPUs in first NUMA node on NUMA system, but
/// not more than 32 threads
#[arg(long)]
farming_thread_pool_size: Option<NonZeroUsize>,
/// Size of one thread pool used for plotting, defaults to number of logical CPUs available
Expand Down Expand Up @@ -469,8 +470,8 @@ where
None => farmer_app_info.protocol_info.max_pieces_in_sector,
};

let plotting_thread_pool_core_indices;
let replotting_thread_pool_core_indices;
let mut plotting_thread_pool_core_indices;
let mut replotting_thread_pool_core_indices;
if let Some(plotting_cpu_cores) = plotting_cpu_cores {
plotting_thread_pool_core_indices = parse_cpu_cores_sets(&plotting_cpu_cores)
.map_err(|error| anyhow::anyhow!("Failed to parse `--plotting-cpu-cores`: {error}"))?;
Expand Down Expand Up @@ -503,6 +504,25 @@ where
}
replotting_thread_pool_core_indices
};

if plotting_thread_pool_core_indices.len() > 1 {
info!(
l3_cache_groups = %plotting_thread_pool_core_indices.len(),
"Multiple L3 cache groups detected"
);

if plotting_thread_pool_core_indices.len() > disk_farms.len() {
plotting_thread_pool_core_indices =
CpuCoreSet::regroup(&plotting_thread_pool_core_indices, disk_farms.len());
replotting_thread_pool_core_indices =
CpuCoreSet::regroup(&replotting_thread_pool_core_indices, disk_farms.len());

info!(
farms_count = %disk_farms.len(),
"Regrouped CPU cores to match number of farms, more farms may leverage CPU more efficiently"
);
}
}
}

let downloading_semaphore = Arc::new(Semaphore::new(
Expand All @@ -520,30 +540,6 @@ where
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
.unwrap_or_else(recommended_number_of_farming_threads);

let all_cpu_cores = all_cpu_cores();
if all_cpu_cores.len() > 1 {
info!(l3_cache_groups = %all_cpu_cores.len(), "Multiple L3 cache groups detected");

if all_cpu_cores.len() > disk_farms.len() {
warn!(
l3_cache_groups = %all_cpu_cores.len(),
farms_count = %disk_farms.len(),
"Too few disk farms, CPU will not be utilized fully during plotting, same number \
of farms as L3 cache groups or more is recommended"
);
}
}

// TODO: Remove code or environment variable once identified whether it helps or not
if std::env::var("NUMA_ALLOCATOR").is_ok() && all_cpu_cores.len() > 1 {
unsafe {
libmimalloc_sys::mi_option_set(
libmimalloc_sys::mi_option_use_numa_nodes,
all_cpu_cores.len() as std::ffi::c_long,
);
}
}

let mut plotting_delay_senders = Vec::with_capacity(disk_farms.len());

for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() {
Expand Down
29 changes: 27 additions & 2 deletions crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use tracing::debug;
#[cfg(feature = "numa")]
use tracing::warn;

/// It doesn't make a lot of sense to have a huge number of farming threads, 32 is plenty
const MAX_DEFAULT_FARMING_THREADS: usize = 32;

/// Joins async join handle on drop
pub struct AsyncJoinOnDrop<T> {
handle: Option<task::JoinHandle<T>>,
Expand Down Expand Up @@ -151,6 +154,27 @@ pub struct CpuCoreSet {
}

impl CpuCoreSet {
/// Regroup CPU core sets to contain at most `target_sets` sets, useful when there are many L3
/// cache groups and not as many farms
pub fn regroup(cpu_core_sets: &[Self], target_sets: usize) -> Vec<Self> {
cpu_core_sets
// Chunk CPU core sets
.chunks(cpu_core_sets.len().div_ceil(target_sets))
.map(|sets| Self {
// Combine CPU cores
cores: sets
.iter()
.flat_map(|set| set.cores.iter())
.copied()
.collect(),
// Preserve topology object
#[cfg(feature = "numa")]
topology: sets[0].topology.clone(),
})
.collect()
}

/// Get cpu core numbers in this set
pub fn cpu_cores(&self) -> &[usize] {
&self.cores
}
Expand Down Expand Up @@ -201,13 +225,14 @@ pub fn recommended_number_of_farming_threads() -> usize {
// Get number of CPU cores
.map(|cpuset| cpuset.iter_set().count())
.find(|&count| count > 0)
.unwrap_or_else(num_cpus::get);
.unwrap_or_else(num_cpus::get)
.min(MAX_DEFAULT_FARMING_THREADS);
}
Err(error) => {
warn!(%error, "Failed to get NUMA topology");
}
}
num_cpus::get()
num_cpus::get().min(MAX_DEFAULT_FARMING_THREADS)
}

/// Get all cpu cores, grouped into sets according to NUMA nodes or L3 cache groups on large CPUs.
Expand Down

0 comments on commit 7dd004d

Please sign in to comment.