Skip to content

Commit d6915ef

Browse files
james7132joseph-gio
authored andcommitted
refactor: Extract parallel queue abstraction (bevyengine#7348)
# Objective There's a repeating pattern of `ThreadLocal<Cell<Vec<T>>>` which is very useful for low overhead, low contention multithreaded queues that have cropped up in a few places in the engine. This pattern is surprisingly useful when building deferred mutation across multiple threads, as noted by it's use in `ParallelCommands`. However, `ThreadLocal<Cell<Vec<T>>>` is not only a mouthful, it's also hard to ensure the thread-local queue is replaced after it's been temporarily removed from the `Cell`. ## Solution Wrap the pattern into `bevy_utils::Parallel<T>` which codifies the entire pattern and ensures the user follows the contract. Instead of fetching indivdual cells, removing the value, mutating it, and replacing it, `Parallel::get` returns a `ParRef<'a, T>` which contains the temporarily removed value and a reference back to the cell, and will write the mutated value back to the cell upon being dropped. I would like to use this to simplify the remaining part of bevyengine#4899 that has not been adopted/merged. --- ## Changelog TODO --------- Co-authored-by: Joseph <[email protected]>
1 parent df83064 commit d6915ef

File tree

6 files changed

+90
-28
lines changed

6 files changed

+90
-28
lines changed

crates/bevy_ecs/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.13.0" }
2323
bevy_ecs_macros = { path = "macros", version = "0.13.0" }
2424

2525
async-channel = "2.1.0"
26-
thread_local = "1.1.4"
2726
fixedbitset = "0.4.2"
2827
rustc-hash = "1.1"
2928
downcast-rs = "1.2"

crates/bevy_ecs/src/system/commands/parallel_scope.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use std::cell::Cell;
2-
3-
use thread_local::ThreadLocal;
1+
use bevy_utils::Parallel;
42

53
use crate::{
64
self as bevy_ecs,
@@ -13,7 +11,7 @@ use super::{CommandQueue, Commands};
1311

1412
#[derive(Default)]
1513
struct ParallelCommandQueue {
16-
thread_local_storage: ThreadLocal<Cell<CommandQueue>>,
14+
thread_queues: Parallel<CommandQueue>,
1715
}
1816

1917
/// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_iter`](crate::system::Query::par_iter)
@@ -53,8 +51,8 @@ impl SystemBuffer for ParallelCommandQueue {
5351
fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
5452
#[cfg(feature = "trace")]
5553
let _system_span = _system_meta.commands_span.enter();
56-
for cq in &mut self.thread_local_storage {
57-
cq.get_mut().apply(world);
54+
for cq in self.thread_queues.iter_mut() {
55+
cq.apply(world);
5856
}
5957
}
6058
}
@@ -64,16 +62,9 @@ impl<'w, 's> ParallelCommands<'w, 's> {
6462
///
6563
/// For an example, see the type-level documentation for [`ParallelCommands`].
6664
pub fn command_scope<R>(&self, f: impl FnOnce(Commands) -> R) -> R {
67-
let store = &self.state.thread_local_storage;
68-
let command_queue_cell = store.get_or_default();
69-
let mut command_queue = command_queue_cell.take();
70-
71-
let r = f(Commands::new_from_entities(
72-
&mut command_queue,
73-
self.entities,
74-
));
75-
76-
command_queue_cell.set(command_queue);
77-
r
65+
self.state.thread_queues.scope(|queue| {
66+
let commands = Commands::new_from_entities(queue, self.entities);
67+
f(commands)
68+
})
7869
}
7970
}

crates/bevy_render/src/view/visibility/mod.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use bevy_ecs::prelude::*;
99
use bevy_hierarchy::{Children, Parent};
1010
use bevy_reflect::{std_traits::ReflectDefault, Reflect};
1111
use bevy_transform::{components::GlobalTransform, TransformSystem};
12-
use std::cell::Cell;
13-
use thread_local::ThreadLocal;
12+
use bevy_utils::Parallel;
1413

1514
use crate::deterministic::DeterministicRenderingConfig;
1615
use crate::{
@@ -372,7 +371,7 @@ fn reset_view_visibility(mut query: Query<&mut ViewVisibility>) {
372371
/// [`ViewVisibility`] of all entities, and for each view also compute the [`VisibleEntities`]
373372
/// for that view.
374373
pub fn check_visibility(
375-
mut thread_queues: Local<ThreadLocal<Cell<Vec<Entity>>>>,
374+
mut thread_queues: Local<Parallel<Vec<Entity>>>,
376375
mut view_query: Query<(
377376
&mut VisibleEntities,
378377
&Frustum,
@@ -440,15 +439,13 @@ pub fn check_visibility(
440439
}
441440

442441
view_visibility.set();
443-
let cell = thread_queues.get_or_default();
444-
let mut queue = cell.take();
445-
queue.push(entity);
446-
cell.set(queue);
442+
thread_queues.scope(|queue| {
443+
queue.push(entity);
444+
});
447445
});
448446

449-
for cell in &mut thread_queues {
450-
visible_entities.entities.append(cell.get_mut());
451-
}
447+
visible_entities.entities.clear();
448+
thread_queues.drain_into(&mut visible_entities.entities);
452449
if deterministic_rendering_config.stable_sort_z_fighting {
453450
// We can use the faster unstable sort here because
454451
// the values (`Entity`) are guaranteed to be unique.

crates/bevy_utils/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ hashbrown = { version = "0.14", features = ["serde"] }
2020
bevy_utils_proc_macros = { version = "0.13.0", path = "macros" }
2121
petgraph = "0.6"
2222
thiserror = "1.0"
23+
thread_local = "1.0"
2324
nonmax = "0.5"
2425
smallvec = { version = "1.11", features = ["serde", "union", "const_generics"] }
2526

crates/bevy_utils/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod default;
2222
mod float_ord;
2323
pub mod intern;
2424
mod once;
25+
mod parallel_queue;
2526

2627
pub use crate::uuid::Uuid;
2728
pub use ahash::{AHasher, RandomState};
@@ -30,6 +31,7 @@ pub use cow_arc::*;
3031
pub use default::default;
3132
pub use float_ord::*;
3233
pub use hashbrown;
34+
pub use parallel_queue::*;
3335
pub use petgraph;
3436
pub use smallvec;
3537
pub use thiserror;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use core::cell::Cell;
2+
use thread_local::ThreadLocal;
3+
4+
/// A cohesive set of thread-local values of a given type.
5+
///
6+
/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`].
7+
#[derive(Default)]
8+
pub struct Parallel<T: Send> {
9+
locals: ThreadLocal<Cell<T>>,
10+
}
11+
12+
impl<T: Send> Parallel<T> {
13+
/// Gets a mutable iterator over all of the per-thread queues.
14+
pub fn iter_mut(&mut self) -> impl Iterator<Item = &'_ mut T> {
15+
self.locals.iter_mut().map(|cell| cell.get_mut())
16+
}
17+
18+
/// Clears all of the stored thread local values.
19+
pub fn clear(&mut self) {
20+
self.locals.clear();
21+
}
22+
}
23+
24+
impl<T: Default + Send> Parallel<T> {
25+
/// Retrieves the thread-local value for the current thread and runs `f` on it.
26+
///
27+
/// If there is no thread-local value, it will be initialized to it's default.
28+
pub fn scope<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
29+
let cell = self.locals.get_or_default();
30+
let mut value = cell.take();
31+
let ret = f(&mut value);
32+
cell.set(value);
33+
ret
34+
}
35+
}
36+
37+
impl<T, I> Parallel<I>
38+
where
39+
I: IntoIterator<Item = T> + Default + Send + 'static,
40+
{
41+
/// Drains all enqueued items from all threads and returns an iterator over them.
42+
///
43+
/// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored.
44+
/// If iteration is terminated part way, the rest of the enqueued items in the same
45+
/// chunk will be dropped, and the rest of the undrained elements will remain.
46+
///
47+
/// The ordering is not guaranteed.
48+
pub fn drain<B>(&mut self) -> impl Iterator<Item = T> + '_
49+
where
50+
B: FromIterator<T>,
51+
{
52+
self.locals.iter_mut().flat_map(|item| item.take())
53+
}
54+
}
55+
56+
impl<T: Send> Parallel<Vec<T>> {
57+
/// Collect all enqueued items from all threads and appends them to the end of a
58+
/// single Vec.
59+
///
60+
/// The ordering is not guarenteed.
61+
pub fn drain_into(&mut self, out: &mut Vec<T>) {
62+
let size = self
63+
.locals
64+
.iter_mut()
65+
.map(|queue| queue.get_mut().len())
66+
.sum();
67+
out.reserve(size);
68+
for queue in self.locals.iter_mut() {
69+
out.append(queue.get_mut());
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)