diff --git a/crates/bevy_app/src/task_pool_plugin.rs b/crates/bevy_app/src/task_pool_plugin.rs index 174bca105b6b5..b76d29fdb017a 100644 --- a/crates/bevy_app/src/task_pool_plugin.rs +++ b/crates/bevy_app/src/task_pool_plugin.rs @@ -2,7 +2,7 @@ use crate::{App, Plugin}; use alloc::string::ToString; use bevy_platform_support::sync::Arc; -use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder}; +use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use core::{fmt::Debug, marker::PhantomData}; use log::trace; @@ -12,7 +12,7 @@ use {crate::Last, bevy_ecs::prelude::NonSend}; #[cfg(not(target_arch = "wasm32"))] use bevy_tasks::tick_global_task_pools_on_main_thread; -/// Setup of default task pools: [`AsyncComputeTaskPool`], [`ComputeTaskPool`], [`IoTaskPool`]. +/// Setup of default task pools: [`ComputeTaskPool`]. #[derive(Default)] pub struct TaskPoolPlugin { /// Options for the [`TaskPool`](bevy_tasks::TaskPool) created at application start. @@ -40,17 +40,16 @@ fn tick_global_task_pools(_main_thread_marker: Option>) { tick_global_task_pools_on_main_thread(); } -/// Defines a simple way to determine how many threads to use given the number of remaining cores -/// and number of total cores +/// Helper for configuring and creating the default task pools. For end-users who want full control, +/// set up [`TaskPoolPlugin`] #[derive(Clone)] -pub struct TaskPoolThreadAssignmentPolicy { - /// Force using at least this many threads - pub min_threads: usize, - /// Under no circumstance use more than this many threads for this pool - pub max_threads: usize, - /// Target using this percentage of total cores, clamped by `min_threads` and `max_threads`. It is - /// permitted to use 1.0 to try to use all remaining threads - pub percent: f32, +pub struct TaskPoolOptions { + /// If the number of physical cores is less than `min_total_threads`, force using + /// `min_total_threads` + pub min_total_threads: usize, + /// If the number of physical cores is greater than `max_total_threads`, force using + /// `max_total_threads` + pub max_total_threads: usize, /// Callback that is invoked once for every created thread as it starts. /// This configuration will be ignored under wasm platform. pub on_thread_spawn: Option>, @@ -59,91 +58,25 @@ pub struct TaskPoolThreadAssignmentPolicy { pub on_thread_destroy: Option>, } -impl Debug for TaskPoolThreadAssignmentPolicy { +impl Debug for TaskPoolOptions { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("TaskPoolThreadAssignmentPolicy") - .field("min_threads", &self.min_threads) - .field("max_threads", &self.max_threads) - .field("percent", &self.percent) + f.debug_struct("TaskPoolOptions") + .field("min_total_threads", &self.min_total_threads) + .field("max_total_threads", &self.max_total_threads) + .field("on_thread_spawn", &self.on_thread_spawn.is_some()) + .field("on_thread_destroy", &self.on_thread_destroy.is_some()) .finish() } } -impl TaskPoolThreadAssignmentPolicy { - /// Determine the number of threads to use for this task pool - fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize { - assert!(self.percent >= 0.0); - let proportion = total_threads as f32 * self.percent; - let mut desired = proportion as usize; - - // Equivalent to round() for positive floats without libm requirement for - // no_std compatibility - if proportion - desired as f32 >= 0.5 { - desired += 1; - } - - // Limit ourselves to the number of cores available - desired = desired.min(remaining_threads); - - // Clamp by min_threads, max_threads. (This may result in us using more threads than are - // available, this is intended. An example case where this might happen is a device with - // <= 2 threads. - desired.clamp(self.min_threads, self.max_threads) - } -} - -/// Helper for configuring and creating the default task pools. For end-users who want full control, -/// set up [`TaskPoolPlugin`] -#[derive(Clone, Debug)] -pub struct TaskPoolOptions { - /// If the number of physical cores is less than `min_total_threads`, force using - /// `min_total_threads` - pub min_total_threads: usize, - /// If the number of physical cores is greater than `max_total_threads`, force using - /// `max_total_threads` - pub max_total_threads: usize, - - /// Used to determine number of IO threads to allocate - pub io: TaskPoolThreadAssignmentPolicy, - /// Used to determine number of async compute threads to allocate - pub async_compute: TaskPoolThreadAssignmentPolicy, - /// Used to determine number of compute threads to allocate - pub compute: TaskPoolThreadAssignmentPolicy, -} - impl Default for TaskPoolOptions { fn default() -> Self { TaskPoolOptions { // By default, use however many cores are available on the system min_total_threads: 1, max_total_threads: usize::MAX, - - // Use 25% of cores for IO, at least 1, no more than 4 - io: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: 4, - percent: 0.25, - on_thread_spawn: None, - on_thread_destroy: None, - }, - - // Use 25% of cores for async compute, at least 1, no more than 4 - async_compute: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: 4, - percent: 0.25, - on_thread_spawn: None, - on_thread_destroy: None, - }, - - // Use all remaining cores for compute (at least 1) - compute: TaskPoolThreadAssignmentPolicy { - min_threads: 1, - max_threads: usize::MAX, - percent: 1.0, // This 1.0 here means "whatever is left over" - on_thread_spawn: None, - on_thread_destroy: None, - }, + on_thread_spawn: None, + on_thread_destroy: None, } } } @@ -164,101 +97,31 @@ impl TaskPoolOptions { .clamp(self.min_total_threads, self.max_total_threads); trace!("Assigning {} cores to default task pools", total_threads); - let mut remaining_threads = total_threads; - - { - // Determine the number of IO threads we will use - let io_threads = self - .io - .get_number_of_threads(remaining_threads, total_threads); + ComputeTaskPool::get_or_init(|| { + #[cfg_attr(target_arch = "wasm32", expect(unused_mut))] + let mut builder = TaskPoolBuilder::default() + .num_threads(total_threads) + .thread_name("Compute Task Pool".to_string()); - trace!("IO Threads: {}", io_threads); - remaining_threads = remaining_threads.saturating_sub(io_threads); - - IoTaskPool::get_or_init(|| { - #[cfg_attr(target_arch = "wasm32", expect(unused_mut))] - let mut builder = TaskPoolBuilder::default() - .num_threads(io_threads) - .thread_name("IO Task Pool".to_string()); - - #[cfg(not(target_arch = "wasm32"))] - { - if let Some(f) = self.io.on_thread_spawn.clone() { - builder = builder.on_thread_spawn(move || f()); - } - if let Some(f) = self.io.on_thread_destroy.clone() { - builder = builder.on_thread_destroy(move || f()); - } + #[cfg(not(target_arch = "wasm32"))] + { + if let Some(f) = self.on_thread_spawn.clone() { + builder = builder.on_thread_spawn(move || f()); } - - builder.build() - }); - } - - { - // Determine the number of async compute threads we will use - let async_compute_threads = self - .async_compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Async Compute Threads: {}", async_compute_threads); - remaining_threads = remaining_threads.saturating_sub(async_compute_threads); - - AsyncComputeTaskPool::get_or_init(|| { - #[cfg_attr(target_arch = "wasm32", expect(unused_mut))] - let mut builder = TaskPoolBuilder::default() - .num_threads(async_compute_threads) - .thread_name("Async Compute Task Pool".to_string()); - - #[cfg(not(target_arch = "wasm32"))] - { - if let Some(f) = self.async_compute.on_thread_spawn.clone() { - builder = builder.on_thread_spawn(move || f()); - } - if let Some(f) = self.async_compute.on_thread_destroy.clone() { - builder = builder.on_thread_destroy(move || f()); - } - } - - builder.build() - }); - } - - { - // Determine the number of compute threads we will use - // This is intentionally last so that an end user can specify 1.0 as the percent - let compute_threads = self - .compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Compute Threads: {}", compute_threads); - - ComputeTaskPool::get_or_init(|| { - #[cfg_attr(target_arch = "wasm32", expect(unused_mut))] - let mut builder = TaskPoolBuilder::default() - .num_threads(compute_threads) - .thread_name("Compute Task Pool".to_string()); - - #[cfg(not(target_arch = "wasm32"))] - { - if let Some(f) = self.compute.on_thread_spawn.clone() { - builder = builder.on_thread_spawn(move || f()); - } - if let Some(f) = self.compute.on_thread_destroy.clone() { - builder = builder.on_thread_destroy(move || f()); - } + if let Some(f) = self.on_thread_destroy.clone() { + builder = builder.on_thread_destroy(move || f()); } + } - builder.build() - }); - } + builder.build() + }); } } #[cfg(test)] mod tests { use super::*; - use bevy_tasks::prelude::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; + use bevy_tasks::prelude::ComputeTaskPool; #[test] fn runs_spawn_local_tasks() { @@ -266,7 +129,7 @@ mod tests { app.add_plugins(TaskPoolPlugin::default()); let (async_tx, async_rx) = crossbeam_channel::unbounded(); - AsyncComputeTaskPool::get() + ComputeTaskPool::get() .spawn_local(async move { async_tx.send(()).unwrap(); }) @@ -280,7 +143,7 @@ mod tests { .detach(); let (io_tx, io_rx) = crossbeam_channel::unbounded(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn_local(async move { io_tx.send(()).unwrap(); }) diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index d0db3dc90f718..b08222af0dd42 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -59,7 +59,7 @@ use crate::{ use alloc::{borrow::ToOwned, boxed::Box, collections::VecDeque, sync::Arc, vec, vec::Vec}; use bevy_ecs::prelude::*; use bevy_platform_support::collections::{HashMap, HashSet}; -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use futures_io::ErrorKind; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use parking_lot::RwLock; @@ -218,7 +218,7 @@ impl AssetProcessor { pub fn process_assets(&self) { let start_time = std::time::Instant::now(); debug!("Processing Assets"); - IoTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(|scope| { scope.spawn(async move { self.initialize().await.unwrap(); for source in self.sources().iter_processed() { @@ -368,7 +368,7 @@ impl AssetProcessor { #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] error!("AddFolder event cannot be handled in single threaded mode (or Wasm) yet."); #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] - IoTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(|scope| { scope.spawn(async move { self.process_assets_internal(scope, source, path) .await @@ -510,7 +510,7 @@ impl AssetProcessor { loop { let mut check_reprocess_queue = core::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue); - IoTaskPool::get().scope(|scope| { + ComputeTaskPool::get().scope(|scope| { for path in check_reprocess_queue.drain(..) { let processor = self.clone(); let source = self.get_source(path.source()).unwrap(); diff --git a/crates/bevy_asset/src/server/loaders.rs b/crates/bevy_asset/src/server/loaders.rs index 1250ff666ae9f..2913f0fd92f93 100644 --- a/crates/bevy_asset/src/server/loaders.rs +++ b/crates/bevy_asset/src/server/loaders.rs @@ -5,7 +5,7 @@ use crate::{ use alloc::{boxed::Box, sync::Arc, vec::Vec}; use async_broadcast::RecvError; use bevy_platform_support::collections::HashMap; -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_utils::TypeIdMap; use core::any::TypeId; use thiserror::Error; @@ -93,7 +93,7 @@ impl AssetLoaders { match maybe_loader { MaybeAssetLoader::Ready(_) => unreachable!(), MaybeAssetLoader::Pending { sender, .. } => { - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let _ = sender.broadcast(loader).await; }) diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index fec52f78d0967..eb143308e4af9 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -26,7 +26,7 @@ use alloc::{ use atomicow::CowArc; use bevy_ecs::prelude::*; use bevy_platform_support::collections::HashSet; -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use core::{any::TypeId, future::Future, panic::AssertUnwindSafe, task::Poll}; use crossbeam_channel::{Receiver, Sender}; use either::Either; @@ -426,7 +426,7 @@ impl AssetServer { let owned_handle = handle.clone(); let server = self.clone(); - let task = IoTaskPool::get().spawn(async move { + let task = ComputeTaskPool::get().spawn(async move { if let Err(err) = server .load_internal(Some(owned_handle), path, false, None) .await @@ -487,7 +487,7 @@ impl AssetServer { let id = handle.id().untyped(); let server = self.clone(); - let task = IoTaskPool::get().spawn(async move { + let task = ComputeTaskPool::get().spawn(async move { let path_clone = path.clone(); match server.load_untyped_async(path).await { Ok(handle) => server.send_asset_event(InternalAssetEvent::Loaded { @@ -716,7 +716,7 @@ impl AssetServer { pub fn reload<'a>(&self, path: impl Into>) { let server = self.clone(); let path = path.into().into_owned(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let mut reloaded = false; @@ -810,7 +810,7 @@ impl AssetServer { let event_sender = self.data.asset_event_sender.clone(); - let task = IoTaskPool::get().spawn(async move { + let task = ComputeTaskPool::get().spawn(async move { match future.await { Ok(asset) => { let loaded_asset = LoadedAsset::new_with_dependencies(asset).into(); @@ -913,7 +913,7 @@ impl AssetServer { let path = path.into_owned(); let server = self.clone(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let Ok(source) = server.get_source(path.source()) else { error!( diff --git a/crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs b/crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs index 55616fca4b961..1376f849c34e1 100644 --- a/crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs +++ b/crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs @@ -71,7 +71,7 @@ pub mod internal { use bevy_ecs::resource::Resource; use bevy_ecs::{prelude::ResMut, system::Local}; use bevy_platform_support::time::Instant; - use bevy_tasks::{available_parallelism, block_on, poll_once, AsyncComputeTaskPool, Task}; + use bevy_tasks::{available_parallelism, block_on, poll_once, ComputeTaskPool, Task}; use log::info; use std::sync::Mutex; use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System}; @@ -123,7 +123,7 @@ pub mod internal { let last_refresh = last_refresh.get_or_insert_with(Instant::now); - let thread_pool = AsyncComputeTaskPool::get(); + let thread_pool = ComputeTaskPool::get(); // Only queue a new system refresh task when necessary // Queuing earlier than that will not give new data @@ -133,7 +133,7 @@ pub mod internal { && tasks.tasks.len() * 2 < available_parallelism() { let sys = Arc::clone(sysinfo); - let task = thread_pool.spawn(async move { + let task = thread_pool.spawn_blocking(move || { let mut sys = sys.lock().unwrap(); sys.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage()); diff --git a/crates/bevy_gltf/src/loader/mod.rs b/crates/bevy_gltf/src/loader/mod.rs index 9d400e44bc0cb..23931bec13212 100644 --- a/crates/bevy_gltf/src/loader/mod.rs +++ b/crates/bevy_gltf/src/loader/mod.rs @@ -44,7 +44,7 @@ use bevy_render::{ }; use bevy_scene::Scene; #[cfg(not(target_arch = "wasm32"))] -use bevy_tasks::IoTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_transform::components::Transform; use gltf::{ @@ -528,13 +528,13 @@ async fn load_gltf<'a, 'b, 'c>( } } else { #[cfg(not(target_arch = "wasm32"))] - IoTaskPool::get() + ComputeTaskPool::get() .scope(|scope| { gltf.textures().for_each(|gltf_texture| { let parent_path = load_context.path().parent().unwrap(); let linear_textures = &linear_textures; let buffer_data = &buffer_data; - scope.spawn(async move { + scope.spawn_blocking_async(async move { load_image( gltf_texture, buffer_data, diff --git a/crates/bevy_remote/src/http.rs b/crates/bevy_remote/src/http.rs index 4e36e4a0bfe94..4f16dadab8c96 100644 --- a/crates/bevy_remote/src/http.rs +++ b/crates/bevy_remote/src/http.rs @@ -17,7 +17,7 @@ use async_io::Async; use bevy_app::{App, Plugin, Startup}; use bevy_ecs::resource::Resource; use bevy_ecs::system::Res; -use bevy_tasks::{futures_lite::StreamExt, IoTaskPool}; +use bevy_tasks::{futures_lite::StreamExt, ComputeTaskPool}; use core::{ convert::Infallible, net::{IpAddr, Ipv4Addr}, @@ -201,7 +201,7 @@ fn start_http_server( remote_port: Res, headers: Res, ) { - IoTaskPool::get() + ComputeTaskPool::get() .spawn(server_main( address.0, remote_port.0, @@ -236,7 +236,7 @@ async fn listen( let request_sender = request_sender.clone(); let headers = headers.clone(); - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { let _ = handle_client(client, request_sender, headers).await; }) diff --git a/crates/bevy_render/src/lib.rs b/crates/bevy_render/src/lib.rs index fcb8e624607d7..c1ebbeb0c095e 100644 --- a/crates/bevy_render/src/lib.rs +++ b/crates/bevy_render/src/lib.rs @@ -379,7 +379,7 @@ impl Plugin for RenderPlugin { }; // In wasm, spawn a task and detach it for execution #[cfg(target_arch = "wasm32")] - bevy_tasks::IoTaskPool::get() + bevy_tasks::ComputeTaskPool::get() .spawn_local(async_renderer) .detach(); // Otherwise, just block for it to complete diff --git a/crates/bevy_render/src/render_resource/pipeline_cache.rs b/crates/bevy_render/src/render_resource/pipeline_cache.rs index 37211edb04c3d..09693d79227db 100644 --- a/crates/bevy_render/src/render_resource/pipeline_cache.rs +++ b/crates/bevy_render/src/render_resource/pipeline_cache.rs @@ -14,7 +14,7 @@ use bevy_ecs::{ use bevy_platform_support::collections::{hash_map::EntryRef, HashMap, HashSet}; use bevy_tasks::Task; use bevy_utils::default; -use core::{future::Future, hash::Hash, mem, ops::Deref}; +use core::{hash::Hash, mem, ops::Deref}; use naga::valid::Capabilities; use std::sync::{Mutex, PoisonError}; use thiserror::Error; @@ -680,7 +680,7 @@ impl PipelineCache { let layout_cache = self.layout_cache.clone(); create_pipeline_task( - async move { + move || { let mut shader_cache = shader_cache.lock().unwrap(); let mut layout_cache = layout_cache.lock().unwrap(); @@ -791,7 +791,7 @@ impl PipelineCache { let layout_cache = self.layout_cache.clone(); create_pipeline_task( - async move { + move || { let mut shader_cache = shader_cache.lock().unwrap(); let mut layout_cache = layout_cache.lock().unwrap(); @@ -955,14 +955,16 @@ impl PipelineCache { feature = "multi_threaded" ))] fn create_pipeline_task( - task: impl Future> + Send + 'static, + task: impl FnOnce() -> Result + Send + 'static, sync: bool, ) -> CachedPipelineState { if !sync { - return CachedPipelineState::Creating(bevy_tasks::AsyncComputeTaskPool::get().spawn(task)); + return CachedPipelineState::Creating( + bevy_tasks::ComputeTaskPool::get().spawn_blocking(task), + ); } - match futures_lite::future::block_on(task) { + match task() { Ok(pipeline) => CachedPipelineState::Ok(pipeline), Err(err) => CachedPipelineState::Err(err), } @@ -974,10 +976,10 @@ fn create_pipeline_task( not(feature = "multi_threaded") ))] fn create_pipeline_task( - task: impl Future> + Send + 'static, + task: impl FnOnce() -> Result + Send + 'static, _sync: bool, ) -> CachedPipelineState { - match futures_lite::future::block_on(task) { + match task() { Ok(pipeline) => CachedPipelineState::Ok(pipeline), Err(err) => CachedPipelineState::Err(err), } diff --git a/crates/bevy_render/src/view/window/screenshot.rs b/crates/bevy_render/src/view/window/screenshot.rs index d8a309036edb4..8cdbf4bfebef1 100644 --- a/crates/bevy_render/src/view/window/screenshot.rs +++ b/crates/bevy_render/src/view/window/screenshot.rs @@ -25,7 +25,7 @@ use bevy_ecs::{ use bevy_image::{Image, TextureFormatPixelInfo}; use bevy_platform_support::collections::HashSet; use bevy_reflect::Reflect; -use bevy_tasks::AsyncComputeTaskPool; +use bevy_tasks::ComputeTaskPool; use bevy_utils::default; use bevy_window::{PrimaryWindow, WindowRef}; use core::ops::Deref; @@ -690,6 +690,6 @@ pub(crate) fn collect_screenshots(world: &mut World) { } }; - AsyncComputeTaskPool::get().spawn(finish).detach(); + ComputeTaskPool::get().spawn_blocking_async(finish).detach(); } } diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 3f9c9f2a3a995..d2375d7b5b9a9 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -16,7 +16,12 @@ std = [ "edge-executor?/std", "bevy_platform_support/std", ] -multi_threaded = ["std", "dep:async-channel", "dep:concurrent-queue"] +multi_threaded = [ + "std", + "dep:async-channel", + "dep:concurrent-queue", + "dep:blocking", +] async_executor = ["std", "dep:async-executor"] edge_executor = ["dep:edge-executor"] critical-section = [ @@ -43,6 +48,7 @@ edge-executor = { version = "0.4.1", default-features = false, optional = true } async-channel = { version = "2.3.0", optional = true } async-io = { version = "2.0.0", optional = true } concurrent-queue = { version = "2.0.0", optional = true } +blocking = { version = "1", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md index 2af6a606f65fa..5ed0b3a96c084 100644 --- a/crates/bevy_tasks/README.md +++ b/crates/bevy_tasks/README.md @@ -25,14 +25,9 @@ bevy provides three different thread pools via which tasks of different kinds ca This currently applies to Wasm targets.) The determining factor for what kind of work should go in each pool is latency requirements: -* For CPU-intensive work (tasks that generally spin until completion) we have a standard - [`ComputeTaskPool`] and an [`AsyncComputeTaskPool`]. Work that does not need to be completed to - present the next frame should go to the [`AsyncComputeTaskPool`]. - -* For IO-intensive work (tasks that spend very little time in a "woken" state) we have an - [`IoTaskPool`] whose tasks are expected to complete very quickly. Generally speaking, they should just - await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready - for consumption. (likely via channels) +For CPU-intensive work, the standard [`ComputeTaskPool`] can be used. It can also be used for +non-blocking IO-intensive work (tasks that spend very little time in a "woken" state). +Work that does not need to be completed to present the next frame should use [`TaskPool::spawn_blocking`]. ## `no_std` Support diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 6055960060ac8..6bbb899bca179 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -69,7 +69,7 @@ pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExec mod usages; #[cfg(not(target_arch = "wasm32"))] pub use usages::tick_global_task_pools_on_main_thread; -pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; +pub use usages::ComputeTaskPool; #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] mod thread_executor; @@ -95,7 +95,7 @@ pub mod prelude { pub use crate::{ iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, - usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, + usages::ComputeTaskPool, }; #[cfg(feature = "std")] diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index fc1a73e754946..26577f088b2da 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -270,6 +270,34 @@ impl TaskPool { self.spawn(future) } + /// Spawns a static future on the JS event loop to be executed at a later point. + /// + /// This is potentially dangerous in browsers when running long standing computations that + /// may block, as the browser will panic if the process does not periodically yield back to + /// the browser. Consider using [`spawn_blocking_async`] instead. + /// + /// [`spawn_blocking_async`]: Self::spawn_blocking_async + pub fn spawn_blocking( + &self, + f: impl FnOnce() -> T + 'static + MaybeSend + MaybeSync, + ) -> Task + where + T: 'static + MaybeSend + MaybeSync, + { + self.spawn(async { f() }) + } + + /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`]. + pub fn spawn_blocking_async( + &self, + future: impl Future + 'static + MaybeSend + MaybeSync, + ) -> Task + where + T: 'static + MaybeSend + MaybeSync, + { + self.spawn(future) + } + /// Runs a function with the local executor. Typically used to tick /// the local executor on the main thread as it needs to share time with /// other things. diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 819fbd1235053..b0a5d75987536 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -7,6 +7,7 @@ use std::{ use crate::executor::FallibleTask; use bevy_platform_support::sync::Arc; +use blocking::unblock; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; @@ -33,6 +34,10 @@ pub struct TaskPoolBuilder { /// If set, we'll set up the thread pool to use at most `num_threads` threads. /// Otherwise use the logical core count of the system num_threads: Option, + /// If set, this sets the maximum number of threads used for blocking operations. + /// Otherwise, it will default to the value set by the `BLOCKING_MAX_THREADS` environment variable, + /// or 500 if not set. + max_blocking_threads: Option, /// If set, we'll use the given stack size rather than the system default stack_size: Option, /// Allows customizing the name of the threads - helpful for debugging. If set, threads will @@ -49,13 +54,30 @@ impl TaskPoolBuilder { Self::default() } - /// Override the number of threads created for the pool. If unset, we default to the number + /// Override the number of threads created for the pool. If unset, it will default to the number /// of logical cores of the system pub fn num_threads(mut self, num_threads: usize) -> Self { self.num_threads = Some(num_threads); self } + /// The task pool contains a dynamically scaling group of threads for handling blocking tasks. + /// The pool will spin up and down threads as needed, unlike the threads allocated by + /// [`Self::num_threads`] which are always available. By default, zero threads will be spawned at + /// initialization, and up to `num_blocking_threads` will be spun up. Upon reaching that limit, + /// calls to [`spawn_blocking`] and [`spawn_blocking_async`] will wait until one of the threads + /// becomes available. + /// + /// By default, this will use the `BLOCKING_MAX_THREADS` environment variable to determine, + /// the maximum, or 500 if that environment variable is not set. + /// + /// [`spawn_blocking`]: TaskPool::spawn_blocking + /// [`spawn_blocking_async`]: TaskPool::spawn_blocking_async + pub fn max_blocking_threads(mut self, num_blocking_threads: usize) -> Self { + self.max_blocking_threads = Some(num_blocking_threads); + self + } + /// Override the stack size of the threads created for the pool pub fn stack_size(mut self, stack_size: usize) -> Self { self.stack_size = Some(stack_size); @@ -158,6 +180,15 @@ impl TaskPool { } fn new_internal(builder: TaskPoolBuilder) -> Self { + // if let Some(thread_count) = builder.max_blocking_threads { + // Safety: This is likely unsafe as this could be called if the TaskPoolBuilder is called from + // multiple threads. + // #[expect(unsafe_code, reason = "TaskPools are only initialized from one thread")] + // unsafe { + // env::set_var("BLOCKING_MAX_THREADS", thread_count.to_string().as_str()); + // } + // } + let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); let executor = Arc::new(crate::executor::Executor::new()); @@ -554,8 +585,16 @@ impl TaskPool { /// any case, the pool will execute the task even without polling by the /// end-user. /// - /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should + /// If the provided future is non-`Send`, [`spawn_local`] should /// be used instead. + /// + /// If the provided future performs blocking IO or may have long lasting + /// CPU-bound operations, use [`spawn_blocking`] or [`spawn_blocking_async`] + /// instead. + /// + /// [`spawn_local`]: Self::spawn_local + /// [`spawn_blocking`]: Self::spawn_blocking + /// [`spawn_blocking_async`]: Self::spawn_blocking_async pub fn spawn(&self, future: impl Future + Send + 'static) -> Task where T: Send + 'static, @@ -581,6 +620,81 @@ impl TaskPool { Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future))) } + /// Runs the provided closure on a thread where blocking is acceptable. + /// + /// In general, issuing a blocking call or performing a lot of compute in a + /// future without yielding is not okay, as it may prevent the task pool + /// from driving other futures forward. This function runs the provided + /// closure on a thread dedicated to blocking operations. + /// + /// This call will spawn more blocking threads when they are requested + /// through this function until the upper limit configured. + /// This limit is very large by default (500), because `spawn_blocking` is often + /// used for various kinds of IO operations that cannot be performed + /// asynchronously. When you run CPU-bound code using `spawn_blocking`, + /// you should keep this large upper limit in mind; to run your + /// CPU-bound computations on only a few threads. Spawning too many threads + /// will cause the OS to [thrash], which may impact the performance + /// of the non-blocking tasks scheduled onto the `TaskPool`. + /// + /// Closures spawned using `spawn_blocking` cannot be cancelled. When the + /// executor is shutdown, it will wait indefinitely for all blocking operations + /// to finish. + /// + /// ## Platform Specific Behavior + /// Long running blocking operations in browser environments will panic, so the app + /// must yield back to the browser periodically. If you're targeting web platforms, + /// consider using [`Self::spawn_blocking_async`]. + /// + /// [thrash]: https://en.wikipedia.org/wiki/Thrashing_(computer_science) + pub fn spawn_blocking(&self, f: impl FnOnce() -> T + Send + 'static) -> Task + where + T: Send + 'static, + { + Task::new(unblock(f)) + } + + /// Spawns a static future onto on a thread where blocking is acceptable. + /// The returned [`Task`] is a future that can be polled for the result. + /// It can also be "detached", allowing the task to continue + /// running even if dropped. In any case, the pool will execute the task + /// even without polling by the end-user. + /// + /// This function is equivalent to calling `task_pool.spawn_blocking(|| block_on(f))`. + /// + /// If the future is expected to terminate quickly, or will not spend a + /// significant amount of time performing blocking CPU-bound or IO-bound + /// operations, [`spawn`] should be used instead. The ideal use case for + /// this function is for launching a future that may involve a combination + /// of async IO and blocking operations (i.e. loading large scenes). + /// + /// This call will spawn more blocking threads when they are requested + /// through this function until the upper limit configured. + /// This limit is very large by default (500), because `spawn_blocking` is often + /// used for various kinds of IO operations that cannot be performed + /// asynchronously. When you run CPU-bound code using `spawn_blocking`, + /// you should keep this large upper limit in mind; to run your + /// CPU-bound computations on only a few threads. Spawning too many threads + /// will cause the OS to thrash, which may impact the performance + /// of the non-blocking tasks scheduled onto the `TaskPool`. + /// + /// The returned task can be detached or cancelled; however, any long standing + /// blocking operations will continue until the future yields. + /// + /// ## Platform Specific Behavior + /// This function behaves identically to `spawn` on `wasm` targets, or if + /// the `multi-threaded` feature on the crate is not enabled. + /// + /// [`spawn`]: Self::spawn + /// [`spawn_blocking_async`]: Self::spawn_blocking_async + #[inline] + pub fn spawn_blocking_async(&self, f: impl Future + Send + 'static) -> Task + where + T: Send + 'static, + { + self.spawn_blocking(|| block_on(f)) + } + /// Runs a function with the local executor. Typically used to tick /// the local executor on the main thread as it needs to share time with /// other things. @@ -669,7 +783,7 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { self.spawned.push(task).unwrap(); } - /// Spawns a scoped future onto the thread of the external thread executor. + /// Spawns a scoped future onto the thread pool. /// This is typically the main thread. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. Users should generally prefer to use @@ -685,6 +799,45 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { // close and use an unbounded queue, so it is safe to unwrap self.spawned.push(task).unwrap(); } + + /// Spawns a closure onto the blocking thread pool. This is useful when longer running + /// work is necessary, but blocking the task pool needs to be avoided. + /// The scope *must* outlive + /// the provided future. The results of the future will be returned as a part of + /// [`TaskPool::scope`]'s return value. Users should generally prefer to use + /// [`Scope::spawn`] instead, unless the provided future needs to run on the external thread. + /// + /// For more information, see [`TaskPool::scope`]. + pub fn spawn_blocking(&self, f: impl FnOnce() -> T + Send + 'scope) + where + T: Send + 'static, + { + // We box the closure so we can name the type and transmute it to 'scope. + let f: Box T + Send + 'scope> = Box::new(f); + #[expect(unsafe_code, reason = "required to transmute lifetimes")] + // SAFETY: task is forced to complete before scope is done. + let f: Box T + Send + 'static> = unsafe { mem::transmute(f) }; + + let task = unblock(|| Ok(f())).fallible(); + + self.spawned.push(task).unwrap(); + } + + /// Spawns a scoped future onto the blocking thread pool. This is useful when longer running + /// work is necessary, but blocking the task pool needs to be avoided. + /// The scope *must* outlive + /// the provided future. The results of the future will be returned as a part of + /// [`TaskPool::scope`]'s return value. Users should generally prefer to use + /// [`Scope::spawn`] instead, unless the provided future needs to run on the external thread. + /// + /// For more information, see [`TaskPool::scope`]. + #[inline] + pub fn spawn_blocking_async(&self, f: impl Future + Send + 'scope) + where + T: Send + 'static, + { + self.spawn_blocking(|| block_on(f)); + } } impl<'scope, 'env, T> Drop for Scope<'scope, 'env, T> diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 82da333ef47c6..e4d43072008e4 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -2,77 +2,44 @@ use super::TaskPool; use bevy_platform_support::sync::OnceLock; use core::ops::Deref; -macro_rules! taskpool { - ($(#[$attr:meta])* ($static:ident, $type:ident)) => { - static $static: OnceLock<$type> = OnceLock::new(); +static COMPUTE_TASK_POOL: OnceLock = OnceLock::new(); - $(#[$attr])* - #[derive(Debug)] - pub struct $type(TaskPool); - - impl $type { - #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance, or initializes it with `f`.")] - pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { - $static.get_or_init(|| Self(f())) - } - - #[doc = concat!(" Attempts to get the global [`", stringify!($type), "`] instance, \ - or returns `None` if it is not initialized.")] - pub fn try_get() -> Option<&'static Self> { - $static.get() - } - - #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance.")] - #[doc = ""] - #[doc = " # Panics"] - #[doc = " Panics if the global instance has not been initialized yet."] - pub fn get() -> &'static Self { - $static.get().expect( - concat!( - "The ", - stringify!($type), - " has not been initialized yet. Please call ", - stringify!($type), - "::get_or_init beforehand." - ) - ) - } - } - - impl Deref for $type { - type Target = TaskPool; - - fn deref(&self) -> &Self::Target { - &self.0 - } - } - }; -} - -taskpool! { - /// A newtype for a task pool for CPU-intensive work that must be completed to - /// deliver the next frame +/// A newtype for a task pool for CPU-intensive work that must be completed to +/// deliver the next frame +/// +/// See [`TaskPool`] documentation for details on Bevy tasks. +#[derive(Debug)] +pub struct ComputeTaskPool(TaskPool); + +impl ComputeTaskPool { + /// Gets the global [`ComputeTaskPool`] instance, or initializes it with `f`. + pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { + COMPUTE_TASK_POOL.get_or_init(|| Self(f())) + } + + /// Attempts to get the global [`ComputeTaskPool`] instance, + /// or returns `None` if it is not initialized. + pub fn try_get() -> Option<&'static Self> { + COMPUTE_TASK_POOL.get() + } + + /// Gets the global [`ComputeTaskPool`] instance." /// - /// See [`TaskPool`] documentation for details on Bevy tasks. - /// [`AsyncComputeTaskPool`] should be preferred if the work does not have to be - /// completed before the next frame. - (COMPUTE_TASK_POOL, ComputeTaskPool) + /// # Panics + /// Panics if the global instance has not been initialized yet. + pub fn get() -> &'static Self { + COMPUTE_TASK_POOL.get().expect( + "The ComputeTaskPool has not been initialized yet. Please call ComputeTaskPool::get_or_init beforehand" + ) + } } -taskpool! { - /// A newtype for a task pool for CPU-intensive work that may span across multiple frames - /// - /// See [`TaskPool`] documentation for details on Bevy tasks. - /// Use [`ComputeTaskPool`] if the work must be complete before advancing to the next frame. - (ASYNC_COMPUTE_TASK_POOL, AsyncComputeTaskPool) -} +impl Deref for ComputeTaskPool { + type Target = TaskPool; -taskpool! { - /// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a - /// "woken" state) - /// - /// See [`TaskPool`] documentation for details on Bevy tasks. - (IO_TASK_POOL, IoTaskPool) + fn deref(&self) -> &Self::Target { + &self.0 + } } /// A function used by `bevy_app` to tick the global tasks pools on the main thread. @@ -87,20 +54,8 @@ pub fn tick_global_task_pools_on_main_thread() { .get() .unwrap() .with_local_executor(|compute_local_executor| { - ASYNC_COMPUTE_TASK_POOL - .get() - .unwrap() - .with_local_executor(|async_local_executor| { - IO_TASK_POOL - .get() - .unwrap() - .with_local_executor(|io_local_executor| { - for _ in 0..100 { - compute_local_executor.try_tick(); - async_local_executor.try_tick(); - io_local_executor.try_tick(); - } - }); - }); + for _ in 0..100 { + compute_local_executor.try_tick(); + } }); } diff --git a/examples/animation/animation_graph.rs b/examples/animation/animation_graph.rs index d063f321d0c67..3db70c73d101b 100644 --- a/examples/animation/animation_graph.rs +++ b/examples/animation/animation_graph.rs @@ -16,7 +16,7 @@ use argh::FromArgs; #[cfg(not(target_arch = "wasm32"))] use { - bevy::{asset::io::file::FileAssetReader, tasks::IoTaskPool}, + bevy::{asset::io::file::FileAssetReader, tasks::ComputeTaskPool}, ron::ser::PrettyConfig, std::{fs::File, path::Path}, }; @@ -173,8 +173,8 @@ fn setup_assets_programmatically( if _save { let animation_graph = animation_graph.clone(); - IoTaskPool::get() - .spawn(async move { + ComputeTaskPool::get() + .spawn_blocking(move || { let mut animation_graph_writer = File::create(Path::join( &FileAssetReader::get_base_path(), Path::join(Path::new("assets"), Path::new(ANIMATION_GRAPH_PATH)), diff --git a/examples/asset/multi_asset_sync.rs b/examples/asset/multi_asset_sync.rs index c4b4c9268480f..354ae7b077c4f 100644 --- a/examples/asset/multi_asset_sync.rs +++ b/examples/asset/multi_asset_sync.rs @@ -9,7 +9,7 @@ use std::{ }, }; -use bevy::{gltf::Gltf, prelude::*, tasks::AsyncComputeTaskPool}; +use bevy::{gltf::Gltf, prelude::*, tasks::ComputeTaskPool}; use event_listener::Event; use futures_lite::Future; @@ -153,8 +153,8 @@ fn setup_assets(mut commands: Commands, asset_server: Res) { commands.insert_resource(AsyncLoadingState(loading_state.clone())); // await the `AssetBarrierFuture`. - AsyncComputeTaskPool::get() - .spawn(async move { + ComputeTaskPool::get() + .spawn_blocking_async(async move { future.await; // Notify via `AsyncLoadingState` loading_state.store(true, Ordering::Release); diff --git a/examples/async_tasks/async_compute.rs b/examples/async_tasks/async_compute.rs index 0b5a044563528..92c16fb456d9a 100644 --- a/examples/async_tasks/async_compute.rs +++ b/examples/async_tasks/async_compute.rs @@ -1,10 +1,10 @@ -//! This example shows how to use the ECS and the [`AsyncComputeTaskPool`] +//! This example shows how to use the ECS and blocking tasks on the [`ComputeTaskPool`] //! to spawn, poll, and complete tasks across systems and system ticks. use bevy::{ ecs::{system::SystemState, world::CommandQueue}, prelude::*, - tasks::{block_on, futures_lite::future, AsyncComputeTaskPool, Task}, + tasks::{block_on, futures_lite::future, ComputeTaskPool, Task}, }; use rand::Rng; use std::time::Duration; @@ -50,11 +50,11 @@ struct ComputeTransform(Task); /// system, [`handle_tasks`], will poll the spawned tasks on subsequent /// frames/ticks, and use the results to spawn cubes fn spawn_tasks(mut commands: Commands) { - let thread_pool = AsyncComputeTaskPool::get(); + let thread_pool = ComputeTaskPool::get(); for x in 0..NUM_CUBES { for y in 0..NUM_CUBES { for z in 0..NUM_CUBES { - // Spawn new task on the AsyncComputeTaskPool; the task will be + // Spawn a new blocking task on the ComputeTaskPool; the task will be // executed in the background, and the Task future returned by // spawn() can be used to poll for the result let entity = commands.spawn_empty().id(); diff --git a/examples/scene/scene.rs b/examples/scene/scene.rs index e0072df1704ac..f052ec56ecf55 100644 --- a/examples/scene/scene.rs +++ b/examples/scene/scene.rs @@ -24,7 +24,7 @@ //! won't work on WASM because WASM typically doesn't have direct filesystem access. //! -use bevy::{asset::LoadState, prelude::*, tasks::IoTaskPool}; +use bevy::{asset::LoadState, prelude::*, tasks::ComputeTaskPool}; use core::time::Duration; use std::{fs::File, io::Write}; @@ -198,7 +198,7 @@ fn save_scene_system(world: &mut World) { // // This can't work in Wasm as there is no filesystem access. #[cfg(not(target_arch = "wasm32"))] - IoTaskPool::get() + ComputeTaskPool::get() .spawn(async move { // Write the scene RON data to file File::create(format!("assets/{NEW_SCENE_FILE_PATH}"))