diff --git a/Cargo.toml b/Cargo.toml index c6ec1f14d6bd0..624bc577600e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1613,3 +1613,6 @@ inherits = "release" opt-level = "z" lto = "fat" codegen-units = 1 + +[profile.release] +lto = "fat" \ No newline at end of file diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs index 4537354a69c05..4fa50dbc4b65a 100644 --- a/crates/bevy_core/src/task_pool_options.rs +++ b/crates/bevy_core/src/task_pool_options.rs @@ -1,6 +1,9 @@ use bevy_ecs::prelude::Resource; -use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder}; +use bevy_tasks::{ + core_affinity, AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder, +}; use bevy_utils::tracing::trace; +use std::sync::{Arc, Mutex}; /// Defines a simple way to determine how many threads to use given the number of remaining cores /// and number of total cores @@ -13,6 +16,15 @@ pub struct TaskPoolThreadAssignmentPolicy { /// Target using this percentage of total cores, clamped by min_threads and max_threads. It is /// permitted to use 1.0 to try to use all remaining threads pub percent: f32, + /// If set to true, this will use [processor affinity] to forcibly pin each thread to a different + /// physical CPU core. + /// + /// Only has an effect on Windows, Mac OSX, Linux, Android. This is a no-op on other platforms. + /// + /// Defaults to true. + /// + /// [processor affinity]: https://en.wikipedia.org/wiki/Processor_affinity + pub use_core_affinity: bool, } impl TaskPoolThreadAssignmentPolicy { @@ -62,6 +74,7 @@ impl Default for TaskPoolOptions { min_threads: 1, max_threads: 4, percent: 0.25, + use_core_affinity: true, }, // Use 25% of cores for async compute, at least 1, no more than 4 @@ -69,6 +82,7 @@ impl Default for TaskPoolOptions { min_threads: 1, max_threads: 4, percent: 0.25, + use_core_affinity: true, }, // Use all remaining cores for compute (at least 1) @@ -76,6 +90,7 @@ impl Default for TaskPoolOptions { min_threads: 1, max_threads: std::usize::MAX, percent: 1.0, // This 1.0 here means "whatever is left over" + use_core_affinity: true, }, } } @@ -98,6 +113,7 @@ impl TaskPoolOptions { trace!("Assigning {} cores to default task pools", total_threads); let mut remaining_threads = total_threads; + let core_ids = core_affinity::get_core_ids().map(|core_ids| Arc::new(Mutex::new(core_ids))); { // Determine the number of IO threads we will use @@ -109,10 +125,13 @@ impl TaskPoolOptions { remaining_threads = remaining_threads.saturating_sub(io_threads); IoTaskPool::init(|| { - TaskPoolBuilder::default() + let mut builder = TaskPoolBuilder::new() .num_threads(io_threads) - .thread_name("IO Task Pool".to_string()) - .build() + .thread_name("IO Task Pool".to_string()); + if let Some(core_ids) = core_ids.clone() { + builder = builder.core_id_fn(move || core_ids.lock().ok()?.pop()); + } + builder.build() }); } @@ -126,10 +145,13 @@ impl TaskPoolOptions { remaining_threads = remaining_threads.saturating_sub(async_compute_threads); AsyncComputeTaskPool::init(|| { - TaskPoolBuilder::default() + let mut builder = TaskPoolBuilder::new() .num_threads(async_compute_threads) - .thread_name("Async Compute Task Pool".to_string()) - .build() + .thread_name("Async Compute Task Pool".to_string()); + if let Some(core_ids) = core_ids.clone() { + builder = builder.core_id_fn(move || core_ids.lock().ok()?.pop()); + } + builder.build() }); } @@ -143,10 +165,13 @@ impl TaskPoolOptions { trace!("Compute Threads: {}", compute_threads); ComputeTaskPool::init(|| { - TaskPoolBuilder::default() + let mut builder = TaskPoolBuilder::new() .num_threads(compute_threads) - .thread_name("Compute Task Pool".to_string()) - .build() + .thread_name("Compute Task Pool".to_string()); + if let Some(core_ids) = core_ids.clone() { + builder = builder.core_id_fn(move || core_ids.lock().ok()?.pop()); + } + builder.build() }); } } diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index f86fb78d3bc23..2f19b611170f2 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -15,6 +15,7 @@ async-channel = "1.4.2" async-task = "4.2.0" once_cell = "1.7" concurrent-queue = "1.2.2" +core_affinity = "0.7" [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 802f6c267b7cf..ab1c2bcaaa181 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -23,6 +23,7 @@ pub use usages::tick_global_task_pools_on_main_thread; pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; mod iter; +pub use core_affinity; pub use iter::ParallelIterator; #[allow(missing_docs)] diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 099f96e93d006..11dfb92da431c 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,3 +1,4 @@ +use core_affinity::CoreId; use std::{ future::Future, marker::PhantomData, @@ -13,7 +14,7 @@ use futures_lite::{future, pin, FutureExt}; use crate::Task; /// Used to create a [`TaskPool`] -#[derive(Debug, Default, Clone)] +#[derive(Clone)] #[must_use] pub struct TaskPoolBuilder { /// If set, we'll set up the thread pool to use at most `num_threads` threads. @@ -24,12 +25,25 @@ pub struct TaskPoolBuilder { /// Allows customizing the name of the threads - helpful for debugging. If set, threads will /// be named (), i.e. "MyThreadPool (2)" thread_name: Option, + + core_id_gen: Option Option + Send + Sync>>, +} + +impl Default for TaskPoolBuilder { + fn default() -> Self { + Self::new() + } } impl TaskPoolBuilder { /// Creates a new [`TaskPoolBuilder`] instance - pub fn new() -> Self { - Self::default() + pub const fn new() -> Self { + Self { + num_threads: None, + thread_name: None, + stack_size: None, + core_id_gen: None, + } } /// Override the number of threads created for the pool. If unset, we default to the number @@ -52,13 +66,19 @@ impl TaskPoolBuilder { self } + /// Sets a callback for getting a per-thread [`CoreId`]. If set, it'll be + /// called once when starting up each thread in the [`TaskPool`]. + pub fn core_id_fn(mut self, f: F) -> Self + where + F: Fn() -> Option + Send + Sync + 'static, + { + self.core_id_gen = Some(Arc::new(f)); + self + } + /// Creates a new [`TaskPool`] based on the current options. pub fn build(self) -> TaskPool { - TaskPool::new_internal( - self.num_threads, - self.stack_size, - self.thread_name.as_deref(), - ) + TaskPool::new_internal(self) } } @@ -88,35 +108,45 @@ impl TaskPool { TaskPoolBuilder::new().build() } - fn new_internal( - num_threads: Option, - stack_size: Option, - thread_name: Option<&str>, - ) -> Self { + fn new_internal(builder: TaskPoolBuilder) -> Self { let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); let executor = Arc::new(async_executor::Executor::new()); - let num_threads = num_threads.unwrap_or_else(crate::available_parallelism); + let num_threads = builder + .num_threads + .unwrap_or_else(crate::available_parallelism); let threads = (0..num_threads) .map(|i| { let ex = Arc::clone(&executor); let shutdown_rx = shutdown_rx.clone(); - let thread_name = if let Some(thread_name) = thread_name { + let thread_name = if let Some(thread_name) = builder.thread_name.as_deref() { format!("{thread_name} ({i})") } else { format!("TaskPool ({i})") }; let mut thread_builder = thread::Builder::new().name(thread_name); - if let Some(stack_size) = stack_size { + if let Some(stack_size) = builder.stack_size { thread_builder = thread_builder.stack_size(stack_size); } + let core_id_gen = builder.core_id_gen.clone(); thread_builder .spawn(move || { + if let Some(core_id_gen) = core_id_gen { + if let Some(core_id) = core_id_gen() { + core_affinity::set_for_current(core_id); + println!( + "Assigned thread {:?} to core {:?}", + std::thread::current().name(), + core_id + ); + } + } + TaskPool::LOCAL_EXECUTOR.with(|local_executor| { loop { let res = std::panic::catch_unwind(|| {