Skip to content

Commit

Permalink
Make sure the same cores are not used for plotting and replotting con…
Browse files Browse the repository at this point in the history
…currently
  • Loading branch information
nazar-pc committed Jan 5, 2024
1 parent 983310a commit 9182942
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 87 deletions.
16 changes: 6 additions & 10 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::utils::ss58::parse_ss58_reward_address;
use subspace_farmer::utils::{
all_cpu_cores, create_tokio_thread_pool_manager_for_pinned_nodes,
run_future_in_dedicated_thread, thread_pool_core_indices, AsyncJoinOnDrop,
all_cpu_cores, create_plotting_thread_pool_manager, run_future_in_dedicated_thread,
thread_pool_core_indices, AsyncJoinOnDrop,
};
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::PlottedSector;
Expand Down Expand Up @@ -454,13 +454,10 @@ where
));

let all_cpu_cores = all_cpu_cores();
let plotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_nodes(
"plotting",
plotting_thread_pool_core_indices,
)?;
let replotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_nodes(
"replotting",
replotting_thread_pool_core_indices,
let plotting_thread_pool_manager = create_plotting_thread_pool_manager(
plotting_thread_pool_core_indices
.into_iter()
.zip(replotting_thread_pool_core_indices),
)?;
let farming_thread_pool_size = farming_thread_pool_size
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
Expand Down Expand Up @@ -519,7 +516,6 @@ where
farm_during_initial_plotting,
farming_thread_pool_size,
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
replotting_thread_pool_manager: replotting_thread_pool_manager.clone(),
plotting_delay: Some(plotting_delay_receiver),
},
disk_farm_index,
Expand Down
9 changes: 2 additions & 7 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use crate::single_disk_farm::plotting::PlottingError;
use crate::single_disk_farm::plotting::{
plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions,
};
use crate::thread_pool_manager::ThreadPoolManager;
use crate::thread_pool_manager::PlottingThreadPoolManager;
use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop};
use crate::KNOWN_PEERS_CACHE_SIZE;
use async_lock::RwLock;
Expand Down Expand Up @@ -288,10 +288,7 @@ pub struct SingleDiskFarmOptions<NC, PG> {
/// compute-intensive operations during proving)
pub farming_thread_pool_size: usize,
/// Thread pool manager used for plotting
pub plotting_thread_pool_manager: ThreadPoolManager,
/// Thread pool manager used for replotting, typically smaller pool than for plotting to not
/// affect farming as much
pub replotting_thread_pool_manager: ThreadPoolManager,
pub plotting_thread_pool_manager: PlottingThreadPoolManager,
/// Notification for plotter to start, can be used to delay plotting until some initialization
/// has happened externally
pub plotting_delay: Option<oneshot::Receiver<()>>,
Expand Down Expand Up @@ -625,7 +622,6 @@ impl SingleDiskFarm {
downloading_semaphore,
farming_thread_pool_size,
plotting_thread_pool_manager,
replotting_thread_pool_manager,
plotting_delay,
farm_during_initial_plotting,
} = options;
Expand Down Expand Up @@ -929,7 +925,6 @@ impl SingleDiskFarm {
sectors_to_plot_receiver,
downloading_semaphore,
plotting_thread_pool_manager,
replotting_thread_pool_manager,
stop_receiver: &mut stop_receiver.resubscribe(),
};

Expand Down
11 changes: 5 additions & 6 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::single_disk_farm::{
BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails,
RESERVED_PLOT_METADATA,
};
use crate::thread_pool_manager::ThreadPoolManager;
use crate::thread_pool_manager::PlottingThreadPoolManager;
use crate::utils::AsyncJoinOnDrop;
use crate::{node_client, NodeClient};
use async_lock::RwLock;
Expand Down Expand Up @@ -116,8 +116,7 @@ pub(super) struct PlottingOptions<'a, NC, PG> {
/// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
/// usage of the plotting process, permit will be held until the end of the plotting process
pub(crate) downloading_semaphore: Arc<Semaphore>,
pub(super) plotting_thread_pool_manager: ThreadPoolManager,
pub(super) replotting_thread_pool_manager: ThreadPoolManager,
pub(super) plotting_thread_pool_manager: PlottingThreadPoolManager,
pub(super) stop_receiver: &'a mut broadcast::Receiver<()>,
}

Expand Down Expand Up @@ -151,7 +150,6 @@ where
mut sectors_to_plot_receiver,
downloading_semaphore,
plotting_thread_pool_manager,
replotting_thread_pool_manager,
stop_receiver,
} = plotting_options;

Expand Down Expand Up @@ -324,10 +322,11 @@ where
})
};

let thread_pools = plotting_thread_pool_manager.get_thread_pools();
let thread_pool = if replotting {
replotting_thread_pool_manager.get_thread_pool()
&thread_pools.replotting
} else {
plotting_thread_pool_manager.get_thread_pool()
&thread_pools.plotting
};

// Give a chance to interrupt plotting if necessary
Expand Down
68 changes: 40 additions & 28 deletions crates/subspace-farmer/src/thread_pool_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,69 +4,81 @@ use std::num::NonZeroUsize;
use std::ops::Deref;
use std::sync::Arc;

/// A wrapper around thread pool pair for plotting purposes
#[derive(Debug)]
pub struct PlottingThreadPoolPair {
pub plotting: ThreadPool,
pub replotting: ThreadPool,
}

#[derive(Debug)]
struct Inner {
thread_pools: Vec<ThreadPool>,
thread_pool_pairs: Vec<PlottingThreadPoolPair>,
}

/// Wrapper around [`ThreadPool`] that on `Drop` will return thread pool back into corresponding
/// [`ThreadPoolManager`].
/// Wrapper around [`PlottingThreadPoolPair`] that on `Drop` will return thread pool back into corresponding
/// [`PlottingThreadPoolManager`].
#[derive(Debug)]
pub struct ThreadPoolGuard {
pub struct PlottingThreadPoolsGuard {
inner: Arc<(Mutex<Inner>, Condvar)>,
thread_pool: Option<ThreadPool>,
thread_pool_pair: Option<PlottingThreadPoolPair>,
}

impl Deref for ThreadPoolGuard {
type Target = ThreadPool;
impl Deref for PlottingThreadPoolsGuard {
type Target = PlottingThreadPoolPair;

fn deref(&self) -> &Self::Target {
self.thread_pool
self.thread_pool_pair
.as_ref()
.expect("Value exists until `Drop`; qed")
}
}

impl Drop for ThreadPoolGuard {
impl Drop for PlottingThreadPoolsGuard {
fn drop(&mut self) {
let (mutex, cvar) = &*self.inner;
let mut inner = mutex.lock();
inner.thread_pools.push(
self.thread_pool
inner.thread_pool_pairs.push(
self.thread_pool_pair
.take()
.expect("Happens only once in `Drop`; qed"),
);
cvar.notify_one();
}
}

/// Thread pool manager.
/// Plotting thread pool manager.
///
/// This abstraction wraps a set of thread pool pairs and allows to use them one at a time.
///
/// This abstraction wraps a set of thread pools and allows to use them one at a time.
/// Each pair contains one thread pool for plotting purposes and one for replotting, this is because
/// they'll share the same set of CPU cores in most cases and wit would be inefficient to use them
/// concurrently.
///
/// For example on machine with 64 logical cores and 4 NUMA nodes it would be recommended to create
/// 4 thread pools with 16 threads each, which would mean work done within thread pool is tied to
/// that thread pool.
/// 4 thread pools with 16 threads each plotting thread pool and 8 threads in each replotting thread
/// pool, which would mean work done within thread pool is tied to CPU cores dedicated for that
/// thread pool.
#[derive(Debug, Clone)]
pub struct ThreadPoolManager {
pub struct PlottingThreadPoolManager {
inner: Arc<(Mutex<Inner>, Condvar)>,
}

impl ThreadPoolManager {
impl PlottingThreadPoolManager {
/// Create new thread pool manager by instantiating `thread_pools` thread pools using
/// `create_thread_pool`.
///
/// `create_thread_pool` takes one argument `thread_pool_index`.
pub fn new<C>(
create_thread_pool: C,
thread_pools: NonZeroUsize,
create_thread_pools: C,
thread_pool_pairs: NonZeroUsize,
) -> Result<Self, ThreadPoolBuildError>
where
C: FnMut(usize) -> Result<ThreadPool, ThreadPoolBuildError>,
C: FnMut(usize) -> Result<PlottingThreadPoolPair, ThreadPoolBuildError>,
{
let inner = Inner {
thread_pools: (0..thread_pools.get())
.map(create_thread_pool)
thread_pool_pairs: (0..thread_pool_pairs.get())
.map(create_thread_pools)
.collect::<Result<Vec<_>, _>>()?,
};

Expand All @@ -75,23 +87,23 @@ impl ThreadPoolManager {
})
}

/// Get one of inner thread pools, will block until one is available if needed
/// Get one of inner thread pool pairs, will block until one is available if needed
#[must_use]
pub fn get_thread_pool(&self) -> ThreadPoolGuard {
pub fn get_thread_pools(&self) -> PlottingThreadPoolsGuard {
let (mutex, cvar) = &*self.inner;
let mut inner = mutex.lock();

let thread_pool = inner.thread_pools.pop().unwrap_or_else(|| {
let thread_pool_pair = inner.thread_pool_pairs.pop().unwrap_or_else(|| {
cvar.wait(&mut inner);

inner.thread_pools.pop().expect(
inner.thread_pool_pairs.pop().expect(
"Guaranteed by parking_lot's API to happen when thread pool is inserted; qed",
)
});

ThreadPoolGuard {
PlottingThreadPoolsGuard {
inner: Arc::clone(&self.inner),
thread_pool: Some(thread_pool),
thread_pool_pair: Some(thread_pool_pair),
}
}
}
93 changes: 57 additions & 36 deletions crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ pub mod ss58;
#[cfg(test)]
mod tests;

use crate::thread_pool_manager::ThreadPoolManager;
use crate::thread_pool_manager::{PlottingThreadPoolManager, PlottingThreadPoolPair};
use futures::channel::oneshot;
use futures::channel::oneshot::Canceled;
use futures::future::Either;
use rayon::{ThreadBuilder, ThreadPoolBuildError, ThreadPoolBuilder};
use rayon::{ThreadBuilder, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder};
use std::future::Future;
use std::num::NonZeroUsize;
use std::ops::Deref;
Expand Down Expand Up @@ -298,47 +298,68 @@ pub fn thread_pool_core_indices(
}
}

/// Creates thread pools for each of CPU core set with number of threads corresponding to number of cores in
/// each set and pins threads to all of those CPU cores (all at once, not thread per core). Each thread will
/// also have Tokio context available.
fn create_plotting_thread_pool_manager_thread_pool_pair(
thread_prefix: &'static str,
thread_pool_index: usize,
cpu_core_set: CpuCoreSet,
) -> Result<ThreadPool, ThreadPoolBuildError> {
ThreadPoolBuilder::new()
.thread_name(move |thread_index| {
format!("{thread_prefix}-{thread_pool_index}.{thread_index}")
})
.num_threads(cpu_core_set.cpu_cores().len())
.spawn_handler({
let handle = Handle::current();

rayon_custom_spawn_handler(move |thread| {
let cpu_core_set = cpu_core_set.clone();
let handle = handle.clone();

move || {
cpu_core_set.pin_current_thread();
drop(cpu_core_set);

let _guard = handle.enter();

task::block_in_place(|| thread.run())
}
})
})
.build()
}

/// Creates thread pool pairs for each of CPU core set pair with number of plotting and replotting threads corresponding
/// to number of cores in each set and pins threads to all of those CPU cores (each thread to all cors in a set, not
/// thread per core). Each thread will also have Tokio context available.
///
/// The easiest way to obtain CPUs is using [`all_cpu_cores`], but [`thread_pool_core_indices`] in case
/// support for user customizations is desired.
pub fn create_tokio_thread_pool_manager_for_pinned_nodes(
thread_prefix: &'static str,
mut cpu_core_sets: Vec<CpuCoreSet>,
) -> Result<ThreadPoolManager, ThreadPoolBuildError> {
/// support for user customizations is desired. They will then have to be composed into pairs for this function.
pub fn create_plotting_thread_pool_manager<I>(
mut cpu_core_sets: I,
) -> Result<PlottingThreadPoolManager, ThreadPoolBuildError>
where
I: ExactSizeIterator<Item = (CpuCoreSet, CpuCoreSet)>,
{
let total_thread_pools = cpu_core_sets.len();

ThreadPoolManager::new(
PlottingThreadPoolManager::new(
|thread_pool_index| {
let cpu_core_set = cpu_core_sets
.pop()
let (plotting_cpu_core_set, replotting_cpu_core_set) = cpu_core_sets
.next()
.expect("Number of thread pools is the same as cpu core sets; qed");

ThreadPoolBuilder::new()
.thread_name(move |thread_index| {
format!("{thread_prefix}-{thread_pool_index}.{thread_index}")
})
.num_threads(cpu_core_set.cpu_cores().len())
.spawn_handler({
let handle = Handle::current();

rayon_custom_spawn_handler(move |thread| {
let cpu_core_set = cpu_core_set.clone();
let handle = handle.clone();

move || {
cpu_core_set.pin_current_thread();
drop(cpu_core_set);

let _guard = handle.enter();

task::block_in_place(|| thread.run())
}
})
})
.build()
Ok(PlottingThreadPoolPair {
plotting: create_plotting_thread_pool_manager_thread_pool_pair(
"plotting",
thread_pool_index,
plotting_cpu_core_set,
)?,
replotting: create_plotting_thread_pool_manager_thread_pool_pair(
"replotting",
thread_pool_index,
replotting_cpu_core_set,
)?,
})
},
NonZeroUsize::new(total_thread_pools)
.expect("Thread pool is guaranteed to be non-empty; qed"),
Expand Down

0 comments on commit 9182942

Please sign in to comment.