Skip to content

Commit 362294f

Browse files
authored
Try #4740:
2 parents 947d3f9 + 8c722e3 commit 362294f

File tree

14 files changed

+379
-204
lines changed

14 files changed

+379
-204
lines changed

benches/benches/bevy_tasks/iter.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ fn bench_overhead(c: &mut Criterion) {
3434
let mut v = (0..10000).collect::<Vec<usize>>();
3535
let mut group = c.benchmark_group("overhead_par_iter");
3636
for thread_count in &[1, 2, 4, 8, 16, 32] {
37-
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
37+
let pool = TaskPoolBuilder::new().compute_threads(*thread_count).build();
3838
group.bench_with_input(
3939
BenchmarkId::new("threads", thread_count),
4040
thread_count,
@@ -69,7 +69,7 @@ fn bench_for_each(c: &mut Criterion) {
6969
let mut v = (0..10000).collect::<Vec<usize>>();
7070
let mut group = c.benchmark_group("for_each_par_iter");
7171
for thread_count in &[1, 2, 4, 8, 16, 32] {
72-
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
72+
let pool = TaskPoolBuilder::new().compute_threads(*thread_count).build();
7373
group.bench_with_input(
7474
BenchmarkId::new("threads", thread_count),
7575
thread_count,
@@ -115,7 +115,7 @@ fn bench_many_maps(c: &mut Criterion) {
115115
let v = (0..10000).collect::<Vec<usize>>();
116116
let mut group = c.benchmark_group("many_maps_par_iter");
117117
for thread_count in &[1, 2, 4, 8, 16, 32] {
118-
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
118+
let pool = TaskPoolBuilder::new().compute_threads(*thread_count).build();
119119
group.bench_with_input(
120120
BenchmarkId::new("threads", thread_count),
121121
thread_count,

crates/bevy_asset/src/asset_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ impl AssetServer {
379379
let owned_path = asset_path.to_owned();
380380
self.server
381381
.task_pool
382-
.spawn(async move {
382+
.spawn_io(async move {
383383
if let Err(err) = server.load_async(owned_path, force).await {
384384
warn!("{}", err);
385385
}

crates/bevy_asset/src/debug_asset_server.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bevy_ecs::{
44
schedule::SystemLabel,
55
system::{NonSendMut, Res, ResMut, SystemState},
66
};
7-
use bevy_tasks::{IoTaskPool, TaskPoolBuilder};
7+
use bevy_tasks::TaskPoolBuilder;
88
use bevy_utils::HashMap;
99
use std::{
1010
ops::{Deref, DerefMut},
@@ -60,12 +60,12 @@ impl Plugin for DebugAssetServerPlugin {
6060
fn build(&self, app: &mut bevy_app::App) {
6161
let mut debug_asset_app = App::new();
6262
debug_asset_app
63-
.insert_resource(IoTaskPool(
63+
.insert_resource(
6464
TaskPoolBuilder::default()
65-
.num_threads(2)
65+
.io_threads(2)
6666
.thread_name("Debug Asset Server IO Task Pool".to_string())
6767
.build(),
68-
))
68+
)
6969
.insert_resource(AssetServerSettings {
7070
asset_folder: "crates".to_string(),
7171
watch_for_changes: true,

crates/bevy_asset/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub use path::*;
3030

3131
use bevy_app::{prelude::Plugin, App};
3232
use bevy_ecs::schedule::{StageLabel, SystemStage};
33-
use bevy_tasks::IoTaskPool;
33+
use bevy_tasks::TaskPool;
3434

3535
/// The names of asset stages in an App Schedule
3636
#[derive(Debug, Hash, PartialEq, Eq, Clone, StageLabel)]
@@ -82,7 +82,7 @@ pub fn create_platform_default_asset_io(app: &mut App) -> Box<dyn AssetIo> {
8282
impl Plugin for AssetPlugin {
8383
fn build(&self, app: &mut App) {
8484
if !app.world.contains_resource::<AssetServer>() {
85-
let task_pool = app.world.resource::<IoTaskPool>().0.clone();
85+
let task_pool = app.world.resource::<TaskPool>().clone();
8686

8787
let source = create_platform_default_asset_io(app);
8888

crates/bevy_core/src/task_pool_options.rs

+33-49
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use bevy_ecs::world::World;
2-
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder};
2+
use bevy_tasks::{TaskPool, TaskPoolBuilder};
33
use bevy_utils::tracing::trace;
44

55
/// Defines a simple way to determine how many threads to use given the number of remaining cores
@@ -100,54 +100,38 @@ impl DefaultTaskPoolOptions {
100100

101101
let mut remaining_threads = total_threads;
102102

103-
if !world.contains_resource::<IoTaskPool>() {
104-
// Determine the number of IO threads we will use
105-
let io_threads = self
106-
.io
107-
.get_number_of_threads(remaining_threads, total_threads);
108-
109-
trace!("IO Threads: {}", io_threads);
110-
remaining_threads = remaining_threads.saturating_sub(io_threads);
111-
112-
world.insert_resource(IoTaskPool(
113-
TaskPoolBuilder::default()
114-
.num_threads(io_threads)
115-
.thread_name("IO Task Pool".to_string())
116-
.build(),
117-
));
118-
}
119-
120-
if !world.contains_resource::<AsyncComputeTaskPool>() {
121-
// Determine the number of async compute threads we will use
122-
let async_compute_threads = self
123-
.async_compute
124-
.get_number_of_threads(remaining_threads, total_threads);
125-
126-
trace!("Async Compute Threads: {}", async_compute_threads);
127-
remaining_threads = remaining_threads.saturating_sub(async_compute_threads);
128-
129-
world.insert_resource(AsyncComputeTaskPool(
130-
TaskPoolBuilder::default()
131-
.num_threads(async_compute_threads)
132-
.thread_name("Async Compute Task Pool".to_string())
133-
.build(),
134-
));
135-
}
136-
137-
if !world.contains_resource::<ComputeTaskPool>() {
138-
// Determine the number of compute threads we will use
139-
// This is intentionally last so that an end user can specify 1.0 as the percent
140-
let compute_threads = self
141-
.compute
142-
.get_number_of_threads(remaining_threads, total_threads);
143-
144-
trace!("Compute Threads: {}", compute_threads);
145-
world.insert_resource(ComputeTaskPool(
146-
TaskPoolBuilder::default()
147-
.num_threads(compute_threads)
148-
.thread_name("Compute Task Pool".to_string())
149-
.build(),
150-
));
103+
if world.contains_resource::<TaskPool>() {
104+
return;
151105
}
106+
// Determine the number of IO threads we will use
107+
let io_threads = self
108+
.io
109+
.get_number_of_threads(remaining_threads, total_threads);
110+
111+
trace!("IO Threads: {}", io_threads);
112+
remaining_threads = remaining_threads.saturating_sub(io_threads);
113+
114+
// Determine the number of async compute threads we will use
115+
let async_compute_threads = self
116+
.async_compute
117+
.get_number_of_threads(remaining_threads, total_threads);
118+
119+
trace!("Async Compute Threads: {}", async_compute_threads);
120+
remaining_threads = remaining_threads.saturating_sub(async_compute_threads);
121+
122+
// Determine the number of compute threads we will use
123+
// This is intentionally last so that an end user can specify 1.0 as the percent
124+
let compute_threads = self
125+
.compute
126+
.get_number_of_threads(remaining_threads, total_threads);
127+
128+
world.insert_resource(
129+
TaskPoolBuilder::default()
130+
.compute_threads(compute_threads)
131+
.async_compute_threads(async_compute_threads)
132+
.io_threads(io_threads)
133+
.thread_name("Task Pool".to_string())
134+
.build(),
135+
);
152136
}
153137
}

crates/bevy_ecs/src/schedule/executor_parallel.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
world::World,
66
};
77
use async_channel::{Receiver, Sender};
8-
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
8+
use bevy_tasks::{Scope, TaskPool};
99
#[cfg(feature = "trace")]
1010
use bevy_utils::tracing::Instrument;
1111
use fixedbitset::FixedBitSet;
@@ -123,10 +123,8 @@ impl ParallelSystemExecutor for ParallelExecutor {
123123
}
124124
}
125125

126-
let compute_pool = world
127-
.get_resource_or_insert_with(|| ComputeTaskPool(TaskPool::default()))
128-
.clone();
129-
compute_pool.scope(|scope| {
126+
let task_pool = world.get_resource_or_insert_with(TaskPool::default).clone();
127+
task_pool.scope(|scope| {
130128
self.prepare_systems(scope, systems, world);
131129
let parallel_executor = async {
132130
// All systems have been ran if there are no queued or running systems.

crates/bevy_tasks/examples/busy_behavior.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use bevy_tasks::TaskPoolBuilder;
22

3-
// This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin
4-
// for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical
3+
// This sample demonstrates creating a thread pool with 4 compute threads and spawning 40 tasks that
4+
// spin for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical
55
// cores)
66

77
fn main() {
88
let pool = TaskPoolBuilder::new()
99
.thread_name("Busy Behavior ThreadPool".to_string())
10-
.num_threads(4)
10+
.compute_threads(4)
1111
.build();
1212

1313
let t0 = instant::Instant::now();

crates/bevy_tasks/src/lib.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,19 @@ mod single_threaded_task_pool;
1717
#[cfg(target_arch = "wasm32")]
1818
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};
1919

20-
mod usages;
21-
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};
22-
2320
mod iter;
2421
pub use iter::ParallelIterator;
2522

2623
#[allow(missing_docs)]
2724
pub mod prelude {
25+
#[cfg(target_arch = "wasm32")]
26+
pub use crate::single_threaded_task_pool::TaskPool;
27+
#[cfg(not(target_arch = "wasm32"))]
28+
pub use crate::task_pool::TaskPool;
2829
#[doc(hidden)]
2930
pub use crate::{
3031
iter::ParallelIterator,
3132
slice::{ParallelSlice, ParallelSliceMut},
32-
usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool},
3333
};
3434
}
3535

crates/bevy_tasks/src/single_threaded_task_pool.rs

+88-3
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,19 @@ impl TaskPoolBuilder {
1414
Self::default()
1515
}
1616

17-
/// No op on the single threaded task pool
18-
pub fn num_threads(self, _num_threads: usize) -> Self {
17+
/// Override the number of compute-priority threads created for the pool. If unset, this default to the number
18+
/// of logical cores of the system
19+
pub fn compute_threads(self, num_threads: usize) -> Self {
20+
self
21+
}
22+
23+
/// Override the number of async-compute priority threads created for the pool. If unset, this defaults to 0.
24+
pub fn async_compute_threads(self, num_threads: usize) -> Self {
25+
self
26+
}
27+
28+
/// Override the number of IO-priority threads created for the pool. If unset, this defaults to 0.
29+
pub fn io_threads(self, num_threads: usize) -> Self {
1930
self
2031
}
2132

@@ -37,6 +48,20 @@ impl TaskPoolBuilder {
3748

3849
/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
3950
/// the pool on threads owned by the pool. In this case - main thread only.
51+
///
52+
/// # Scheduling Semantics
53+
/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async
54+
/// Compute. Compute is higher priority than IO, which are both higher priority than async compute.
55+
/// Every task is assigned to a group upon being spawned. A lower priority thread will always prioritize
56+
/// its specific tasks (i.e. IO tasks on a IO thread), but will run higher priority tasks if it would
57+
/// otherwise be sitting idle.
58+
///
59+
/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and
60+
/// async compute thread groups, but any IO task will take precedence over any compute task on the IO
61+
/// threads. Likewise, async compute tasks will never be scheduled on a compute or IO thread.
62+
///
63+
/// By default, all threads in the pool are dedicated to compute group. Thread counts can be altered
64+
/// via [`TaskPoolBuilder`] when constructing the pool.
4065
#[derive(Debug, Default, Clone)]
4166
pub struct TaskPool {}
4267

@@ -106,6 +131,44 @@ impl TaskPool {
106131
FakeTask
107132
}
108133

134+
/// Spawns a static future onto the JS event loop. For now it is returning FakeTask
135+
/// instance with no-op detach method. Returning real Task is possible here, but tricky:
136+
/// future is running on JS event loop, Task is running on async_executor::LocalExecutor
137+
/// so some proxy future is needed. Moreover currently we don't have long-living
138+
/// LocalExecutor here (above `spawn` implementation creates temporary one)
139+
/// But for typical use cases it seems that current implementation should be sufficient:
140+
/// caller can spawn long-running future writing results to some channel / event queue
141+
/// and simply call detach on returned Task (like AssetServer does) - spawned future
142+
/// can write results to some channel / event queue.
143+
pub fn spawn_async_compute<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
144+
where
145+
T: Send + 'static,
146+
{
147+
wasm_bindgen_futures::spawn_local(async move {
148+
future.await;
149+
});
150+
FakeTask
151+
}
152+
153+
/// Spawns a static future onto the JS event loop. For now it is returning FakeTask
154+
/// instance with no-op detach method. Returning real Task is possible here, but tricky:
155+
/// future is running on JS event loop, Task is running on async_executor::LocalExecutor
156+
/// so some proxy future is needed. Moreover currently we don't have long-living
157+
/// LocalExecutor here (above `spawn` implementation creates temporary one)
158+
/// But for typical use cases it seems that current implementation should be sufficient:
159+
/// caller can spawn long-running future writing results to some channel / event queue
160+
/// and simply call detach on returned Task (like AssetServer does) - spawned future
161+
/// can write results to some channel / event queue.
162+
pub fn spawn_io<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
163+
where
164+
T: Send + 'static,
165+
{
166+
wasm_bindgen_futures::spawn_local(async move {
167+
future.await;
168+
});
169+
FakeTask
170+
}
171+
109172
/// Spawns a static future on the JS event loop. This is exactly the same as [`TaskSpool::spawn`].
110173
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
111174
where
@@ -141,7 +204,29 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> {
141204
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
142205
///
143206
/// For more information, see [`TaskPool::scope`].
144-
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
207+
pub fn spawn<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
208+
self.spawn_local(f);
209+
}
210+
211+
/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
212+
/// the provided future. The results of the future will be returned as a part of
213+
/// [`TaskPool::scope`]'s return value.
214+
///
215+
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
216+
///
217+
/// For more information, see [`TaskPool::scope`].
218+
pub fn spawn_async_compute<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
219+
self.spawn_local(f);
220+
}
221+
222+
/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
223+
/// the provided future. The results of the future will be returned as a part of
224+
/// [`TaskPool::scope`]'s return value.
225+
///
226+
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
227+
///
228+
/// For more information, see [`TaskPool::scope`].
229+
pub fn spawn_io<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
145230
self.spawn_local(f);
146231
}
147232

0 commit comments

Comments
 (0)