Skip to content

Add default feature for pinning TaskPool threads to CPU cores #6471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1613,3 +1613,6 @@ inherits = "release"
opt-level = "z"
lto = "fat"
codegen-units = 1

[profile.release]
lto = "fat"
45 changes: 35 additions & 10 deletions crates/bevy_core/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use bevy_ecs::prelude::Resource;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder};
use bevy_tasks::{
core_affinity, AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder,
};
use bevy_utils::tracing::trace;
use std::sync::{Arc, Mutex};

/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
Expand All @@ -13,6 +16,15 @@ pub struct TaskPoolThreadAssignmentPolicy {
/// Target using this percentage of total cores, clamped by min_threads and max_threads. It is
/// permitted to use 1.0 to try to use all remaining threads
pub percent: f32,
/// If set to true, this will use [processor affinity] to forcibly pin each thread to a different
/// physical CPU core.
///
/// Only has an effect on Windows, Mac OSX, Linux, Android. This is a no-op on other platforms.
///
/// Defaults to true.
///
/// [processor affinity]: https://en.wikipedia.org/wiki/Processor_affinity
pub use_core_affinity: bool,
}

impl TaskPoolThreadAssignmentPolicy {
Expand Down Expand Up @@ -62,20 +74,23 @@ impl Default for TaskPoolOptions {
min_threads: 1,
max_threads: 4,
percent: 0.25,
use_core_affinity: true,
},

// Use 25% of cores for async compute, at least 1, no more than 4
async_compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
use_core_affinity: true,
},

// Use all remaining cores for compute (at least 1)
compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: std::usize::MAX,
percent: 1.0, // This 1.0 here means "whatever is left over"
use_core_affinity: true,
},
}
}
Expand All @@ -98,6 +113,7 @@ impl TaskPoolOptions {
trace!("Assigning {} cores to default task pools", total_threads);

let mut remaining_threads = total_threads;
let core_ids = core_affinity::get_core_ids().map(|core_ids| Arc::new(Mutex::new(core_ids)));

{
// Determine the number of IO threads we will use
Expand All @@ -109,10 +125,13 @@ impl TaskPoolOptions {
remaining_threads = remaining_threads.saturating_sub(io_threads);

IoTaskPool::init(|| {
TaskPoolBuilder::default()
let mut builder = TaskPoolBuilder::new()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())
.build()
.thread_name("IO Task Pool".to_string());
if let Some(core_ids) = core_ids.clone() {
builder = builder.core_id_fn(move || core_ids.lock().ok()?.pop());
}
builder.build()
});
}

Expand All @@ -126,10 +145,13 @@ impl TaskPoolOptions {
remaining_threads = remaining_threads.saturating_sub(async_compute_threads);

AsyncComputeTaskPool::init(|| {
TaskPoolBuilder::default()
let mut builder = TaskPoolBuilder::new()
.num_threads(async_compute_threads)
.thread_name("Async Compute Task Pool".to_string())
.build()
.thread_name("Async Compute Task Pool".to_string());
if let Some(core_ids) = core_ids.clone() {
builder = builder.core_id_fn(move || core_ids.lock().ok()?.pop());
}
builder.build()
});
}

Expand All @@ -143,10 +165,13 @@ impl TaskPoolOptions {
trace!("Compute Threads: {}", compute_threads);

ComputeTaskPool::init(|| {
TaskPoolBuilder::default()
let mut builder = TaskPoolBuilder::new()
.num_threads(compute_threads)
.thread_name("Compute Task Pool".to_string())
.build()
.thread_name("Compute Task Pool".to_string());
if let Some(core_ids) = core_ids.clone() {
builder = builder.core_id_fn(move || core_ids.lock().ok()?.pop());
}
builder.build()
});
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-channel = "1.4.2"
async-task = "4.2.0"
once_cell = "1.7"
concurrent-queue = "1.2.2"
core_affinity = "0.7"

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use usages::tick_global_task_pools_on_main_thread;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};

mod iter;
pub use core_affinity;
pub use iter::ParallelIterator;

#[allow(missing_docs)]
Expand Down
62 changes: 46 additions & 16 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core_affinity::CoreId;
use std::{
future::Future,
marker::PhantomData,
Expand All @@ -13,7 +14,7 @@ use futures_lite::{future, pin, FutureExt};
use crate::Task;

/// Used to create a [`TaskPool`]
#[derive(Debug, Default, Clone)]
#[derive(Clone)]
#[must_use]
pub struct TaskPoolBuilder {
/// If set, we'll set up the thread pool to use at most `num_threads` threads.
Expand All @@ -24,12 +25,25 @@ pub struct TaskPoolBuilder {
/// Allows customizing the name of the threads - helpful for debugging. If set, threads will
/// be named <thread_name> (<thread_index>), i.e. "MyThreadPool (2)"
thread_name: Option<String>,

core_id_gen: Option<Arc<dyn Fn() -> Option<CoreId> + Send + Sync>>,
}

impl Default for TaskPoolBuilder {
fn default() -> Self {
Self::new()
}
}

impl TaskPoolBuilder {
/// Creates a new [`TaskPoolBuilder`] instance
pub fn new() -> Self {
Self::default()
pub const fn new() -> Self {
Self {
num_threads: None,
thread_name: None,
stack_size: None,
core_id_gen: None,
}
}

/// Override the number of threads created for the pool. If unset, we default to the number
Expand All @@ -52,13 +66,19 @@ impl TaskPoolBuilder {
self
}

/// Sets a callback for getting a per-thread [`CoreId`]. If set, it'll be
/// called once when starting up each thread in the [`TaskPool`].
pub fn core_id_fn<F>(mut self, f: F) -> Self
where
F: Fn() -> Option<CoreId> + Send + Sync + 'static,
{
self.core_id_gen = Some(Arc::new(f));
self
}

/// Creates a new [`TaskPool`] based on the current options.
pub fn build(self) -> TaskPool {
TaskPool::new_internal(
self.num_threads,
self.stack_size,
self.thread_name.as_deref(),
)
TaskPool::new_internal(self)
}
}

Expand Down Expand Up @@ -88,35 +108,45 @@ impl TaskPool {
TaskPoolBuilder::new().build()
}

fn new_internal(
num_threads: Option<usize>,
stack_size: Option<usize>,
thread_name: Option<&str>,
) -> Self {
fn new_internal(builder: TaskPoolBuilder) -> Self {
let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>();

let executor = Arc::new(async_executor::Executor::new());

let num_threads = num_threads.unwrap_or_else(crate::available_parallelism);
let num_threads = builder
.num_threads
.unwrap_or_else(crate::available_parallelism);

let threads = (0..num_threads)
.map(|i| {
let ex = Arc::clone(&executor);
let shutdown_rx = shutdown_rx.clone();

let thread_name = if let Some(thread_name) = thread_name {
let thread_name = if let Some(thread_name) = builder.thread_name.as_deref() {
format!("{thread_name} ({i})")
} else {
format!("TaskPool ({i})")
};
let mut thread_builder = thread::Builder::new().name(thread_name);

if let Some(stack_size) = stack_size {
if let Some(stack_size) = builder.stack_size {
thread_builder = thread_builder.stack_size(stack_size);
}

let core_id_gen = builder.core_id_gen.clone();
thread_builder
.spawn(move || {
if let Some(core_id_gen) = core_id_gen {
if let Some(core_id) = core_id_gen() {
core_affinity::set_for_current(core_id);
println!(
"Assigned thread {:?} to core {:?}",
std::thread::current().name(),
core_id
);
}
}

TaskPool::LOCAL_EXECUTOR.with(|local_executor| {
loop {
let res = std::panic::catch_unwind(|| {
Expand Down