Skip to content

Commit ea19965

Browse files
authored
feat(benches): implement thread binding with hwlocality (#337)
* add hwlocality dep & gating feature * add utils function for thread binding * add rayon threadpool init * add std threads binding in `cut-edges` * fix clippy lint * remove feat from default * disable --all-features in CI
1 parent 41a42e2 commit ea19965

File tree

7 files changed

+272
-27
lines changed

7 files changed

+272
-27
lines changed

.github/workflows/build.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
- uses: dtolnay/rust-toolchain@stable
2424
- uses: Swatinem/rust-cache@v2
2525
- name: Build crates
26-
run: cargo build --all --all-features
26+
run: cargo build --all # --all-features
2727
build_examples:
2828
name: Build examples for ${{ matrix.os }}
2929
strategy:
@@ -51,4 +51,4 @@ jobs:
5151
- uses: dtolnay/rust-toolchain@stable
5252
- uses: Swatinem/rust-cache@v2
5353
- name: Build benchmarks
54-
run: cargo build --benches --all-features
54+
run: cargo build --benches # --all-features

.github/workflows/rust-test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
- uses: dtolnay/rust-toolchain@stable
3030
- uses: Swatinem/rust-cache@v2
3131
- name: Run Clippy
32-
run: cargo clippy --all-features -- -D warnings
32+
run: cargo clippy # --all-features -- -D warnings
3333
tests:
3434
runs-on: ubuntu-22.04
3535
steps:
@@ -39,4 +39,4 @@ jobs:
3939
- uses: dtolnay/rust-toolchain@stable
4040
- uses: Swatinem/rust-cache@v2
4141
- name: Run Tests
42-
run: cargo test --all --all-features
42+
run: cargo test --all # --all-features

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ smallvec = "2.0.0-alpha.10"
5454

5555
# benchmarks
5656
criterion = "0.6.0"
57+
hwlocality = "1.0.0-alpha.7"
5758
iai-callgrind = "0.14.0"
5859
rand = "0.9.0-alpha.2"
5960

benches/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,18 @@ authors.workspace = true
1010
publish = false
1111

1212
[features]
13+
# default = ["bind-threads"] # enable back when CI is fixed with all the deps
1314
_single_precision = []
15+
bind-threads = ["dep:hwlocality"]
1416
profiling = []
1517

1618
# deps
1719

1820
[dependencies]
19-
clap = { workspace =true, features = ["derive"] }
21+
clap = { workspace = true, features = ["derive"] }
2022
cfg-if.workspace = true
2123
honeycomb.workspace = true
24+
hwlocality = { workspace = true, optional = true }
2225
rayon.workspace = true
2326
rand = { workspace = true, features = ["small_rng"] }
2427

benches/src/cut_edges.rs

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,25 @@ use honeycomb::{
1111
prelude::{CMap2, CMapBuilder, CoordsFloat, DartIdType, EdgeIdType},
1212
};
1313

14-
use crate::{cli::CutEdgesArgs, prof_start, prof_stop, utils::hash_file};
14+
use crate::{
15+
cli::CutEdgesArgs,
16+
prof_start, prof_stop,
17+
utils::{get_num_threads, hash_file},
18+
};
1519

1620
// const MAX_RETRY: u8 = 10;
1721

1822
pub fn bench_cut_edges<T: CoordsFloat>(args: CutEdgesArgs) -> CMap2<T> {
1923
let input_map = args.input.to_str().unwrap();
2024
let target_len = T::from(args.target_length).unwrap();
2125

22-
let n_threads = std::thread::available_parallelism()
23-
.map(|n| n.get())
24-
.unwrap_or(1);
26+
let n_threads = if let Ok(val) = get_num_threads() {
27+
val
28+
} else {
29+
std::thread::available_parallelism()
30+
.map(|n| n.get())
31+
.unwrap_or(1)
32+
};
2533

2634
// load map from file
2735
let mut instant = Instant::now();
@@ -215,25 +223,77 @@ fn dispatch_std_threads<T: CoordsFloat>(
215223
.zip(darts.chunks(6))
216224
.map(|(e, sl)| (e, sl.try_into().unwrap()))
217225
.collect();
218-
std::thread::scope(|s| {
219-
let mut handles = Vec::new();
220-
for wl in units.chunks(1 + units.len() / n_threads) {
221-
handles.push(s.spawn(|| {
222-
let mut n = 0;
223-
wl.iter().for_each(|&(e, new_darts)| {
224-
let mut n_retry = 0;
225-
if map.is_i_free::<2>(e as DartIdType) {
226-
while !process_outer_edge(map, &mut n_retry, e, new_darts).is_validated() {}
227-
} else {
228-
while !process_inner_edge(map, &mut n_retry, e, new_darts).is_validated() {}
226+
227+
#[cfg(feature = "bind-threads")]
228+
{
229+
use std::sync::Arc;
230+
231+
use hwlocality::{Topology, cpu::binding::CpuBindingFlags};
232+
233+
use crate::utils::get_proc_list;
234+
235+
let topo = Arc::new(Topology::new().unwrap());
236+
let mut cores = get_proc_list(&topo).unwrap_or_default().into_iter().cycle();
237+
std::thread::scope(|s| {
238+
let mut handles = Vec::new();
239+
for wl in units.chunks(1 + units.len() / n_threads) {
240+
let topo = topo.clone();
241+
let core = cores.next();
242+
handles.push(s.spawn(move || {
243+
// bind
244+
if let Some(c) = core {
245+
let tid = hwlocality::current_thread_id();
246+
topo.bind_thread_cpu(tid, &c, CpuBindingFlags::empty())
247+
.unwrap();
229248
}
230-
n += n_retry as u32;
231-
});
232-
n
233-
})); // s.spawn
234-
} // for wl in workloads
235-
handles.into_iter().map(|h| h.join().unwrap()).sum()
236-
}) // std::thread::scope
249+
// work
250+
let mut n = 0;
251+
wl.iter().for_each(|&(e, new_darts)| {
252+
let mut n_retry = 0;
253+
if map.is_i_free::<2>(e as DartIdType) {
254+
while !process_outer_edge(map, &mut n_retry, e, new_darts)
255+
.is_validated()
256+
{}
257+
} else {
258+
while !process_inner_edge(map, &mut n_retry, e, new_darts)
259+
.is_validated()
260+
{}
261+
}
262+
n += n_retry as u32;
263+
});
264+
n
265+
})); // s.spawn
266+
} // for wl in workloads
267+
handles.into_iter().map(|h| h.join().unwrap()).sum()
268+
}) // std::thread::scope
269+
}
270+
271+
#[cfg(not(feature = "bind-threads"))]
272+
{
273+
std::thread::scope(|s| {
274+
let mut handles = Vec::new();
275+
for wl in units.chunks(1 + units.len() / n_threads) {
276+
handles.push(s.spawn(|| {
277+
let mut n = 0;
278+
wl.iter().for_each(|&(e, new_darts)| {
279+
let mut n_retry = 0;
280+
if map.is_i_free::<2>(e as DartIdType) {
281+
while !process_outer_edge(map, &mut n_retry, e, new_darts)
282+
.is_validated()
283+
{}
284+
} else {
285+
while !process_inner_edge(map, &mut n_retry, e, new_darts)
286+
.is_validated()
287+
{}
288+
}
289+
n += n_retry as u32;
290+
});
291+
n
292+
})); // s.spawn
293+
} // for wl in workloads
294+
handles.into_iter().map(|h| h.join().unwrap()).sum()
295+
}) // std::thread::scope
296+
}
237297
}
238298

239299
#[inline]

benches/src/main.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,41 @@ use honeycomb_benches::{
1414
};
1515

1616
fn main() {
17+
#[cfg(feature = "bind-threads")]
18+
{
19+
use std::sync::Arc;
20+
21+
use honeycomb_benches::utils::get_proc_list;
22+
use hwlocality::{Topology, cpu::binding::CpuBindingFlags};
23+
use rayon::ThreadPoolBuilder;
24+
25+
let builder = ThreadPoolBuilder::new();
26+
let topo = Arc::new(Topology::new().unwrap());
27+
if let Some(cores) = get_proc_list(&topo) {
28+
let mut cores = cores.into_iter().cycle();
29+
builder
30+
.spawn_handler(|t_builder| {
31+
let topo = topo.clone();
32+
let core = cores.next().expect("E: unreachable"); // due to cycle
33+
34+
std::thread::spawn(move || {
35+
// bind
36+
let tid = hwlocality::current_thread_id();
37+
topo.bind_thread_cpu(tid, &core, CpuBindingFlags::empty())
38+
.unwrap();
39+
// work
40+
t_builder.run();
41+
});
42+
43+
Ok(())
44+
})
45+
.build_global()
46+
.unwrap();
47+
} else {
48+
builder.build_global().unwrap()
49+
}
50+
}
51+
1752
let cli = Cli::parse();
1853

1954
if cli.simple_precision {

benches/src/utils.rs

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@ use std::fs::File;
33
use std::hash::Hasher;
44
use std::io::Read;
55

6+
#[cfg(feature = "bind-threads")]
7+
use hwlocality::{
8+
Topology,
9+
cpu::cpuset::CpuSet,
10+
object::types::ObjectType,
11+
topology::support::{DiscoverySupport, FeatureSupport},
12+
};
13+
614
cfg_if::cfg_if! {
715
if #[cfg(feature = "_single_precision")] {
816
/// Floating-point type alias.
@@ -33,6 +41,144 @@ pub fn hash_file(path: &str) -> Result<u64, std::io::Error> {
3341
Ok(hasher.finish())
3442
}
3543

44+
pub const NUM_THREADS_VAR: &str = "RAYON_NUM_THREADS";
45+
46+
pub fn get_num_threads() -> Result<usize, String> {
47+
match std::env::var(NUM_THREADS_VAR) {
48+
Ok(val) => val.parse::<usize>().map_err(|e| e.to_string()),
49+
Err(e) => Err(e.to_string()),
50+
}
51+
}
52+
53+
#[cfg(feature = "bind-threads")]
54+
#[derive(Debug, Default)]
55+
pub enum BindingPolicy {
56+
/// Disable thread binding.
57+
/// Corresponding `RAYON_PROC_BIND_VALUE`: `false`.
58+
Disable,
59+
/// Enable thread binding & prioritize binding of PUs over cores?.
60+
/// Corresponding `RAYON_PROC_BIND_VALUE`: `close`.
61+
Close,
62+
/// Enable thread binding & prioritize binding across cores over filling PUs?.
63+
/// Corresponding `RAYON_PROC_BIND_VALUE`: `spread`. Default value.
64+
#[default]
65+
Spread,
66+
}
67+
68+
/// Environment variable controlling the thread-binding policy.
69+
///
70+
/// The name of this variable and its possible values reflect the OpenMP equivalents.
71+
#[cfg(feature = "bind-threads")]
72+
pub const RAYON_PROC_BIND_VAR: &str = "RAYON_PROC_BIND";
73+
74+
#[cfg(feature = "bind-threads")]
75+
impl BindingPolicy {
76+
fn from_env() -> Self {
77+
match std::env::var(RAYON_PROC_BIND_VAR) {
78+
Ok(val) => match val.to_lowercase().as_str() {
79+
"false" => Self::Disable,
80+
"close" => Self::Close,
81+
"spread" => Self::Spread,
82+
"" => Self::default(),
83+
_ => {
84+
eprintln!("W: unrecognized RAYON_PROC_BIND value (!= false | close | spread)");
85+
eprintln!(" continuing with default (spread)");
86+
Self::default()
87+
}
88+
},
89+
Err(e) => {
90+
match e {
91+
std::env::VarError::NotPresent => {}
92+
std::env::VarError::NotUnicode(_) => {
93+
eprintln!("W: non-unicode RAYON_PROC_BIND value");
94+
eprintln!(" continuing with default (spread)");
95+
}
96+
}
97+
Self::default()
98+
}
99+
}
100+
}
101+
}
102+
103+
#[cfg(feature = "bind-threads")]
104+
pub fn check_hwloc_support<'a>(topology: &'a Topology) -> Result<(), String> {
105+
if !topology.supports(FeatureSupport::discovery, DiscoverySupport::pu_count) {
106+
return Err("missing PU reporting support".to_string());
107+
}
108+
if !topology
109+
.feature_support()
110+
.cpu_binding()
111+
.is_some_and(|s| s.get_thread() && s.set_thread())
112+
{
113+
return Err("missing binding support".to_string());
114+
}
115+
116+
Ok(())
117+
}
118+
119+
/// Return a list of bind targets ordered according to the desired policy.
120+
///
121+
/// The desired policy is read from an environment variable (see [`RAYON_PROC_BIND_VAR`]). For details on each policy,
122+
/// see [`BindingPolicy`].
123+
///
124+
/// The returned list is used by iterating over a `cycle`d version of it, which corresponds to a round robin logic.
125+
#[cfg(feature = "bind-threads")]
126+
pub fn get_proc_list(topology: &Topology) -> Option<Vec<CpuSet>> {
127+
let binding_policy = BindingPolicy::from_env();
128+
let core_depth = topology
129+
.depth_or_below_for_type(ObjectType::Core)
130+
.expect("E: unreachable");
131+
132+
match binding_policy {
133+
BindingPolicy::Disable => None,
134+
BindingPolicy::Close => {
135+
let mut pu_set = Vec::with_capacity(256);
136+
topology.objects_at_depth(core_depth).for_each(|c| {
137+
let target = c.cpuset().unwrap().clone_target();
138+
let w = target.weight();
139+
if !(w == Some(1) || w == Some(2)) {
140+
panic!()
141+
}
142+
target
143+
.iter_set()
144+
.map(CpuSet::from)
145+
.for_each(|t| pu_set.push(t));
146+
});
147+
Some(pu_set)
148+
}
149+
BindingPolicy::Spread => {
150+
let mut first_pu_set = Vec::with_capacity(128);
151+
let mut second_pu_set = Vec::with_capacity(128);
152+
topology.objects_at_depth(core_depth).for_each(|c| {
153+
let target = c.cpuset().expect("E: unreachable").clone_target();
154+
// match required bc some modern CPUs have HT/SMT on only some of their cores :)
155+
match target.weight() {
156+
Some(1) => {
157+
// one PU per core -> no HT/SMT
158+
first_pu_set.push(target);
159+
}
160+
Some(2) => {
161+
// two PUs per core -> HT/SMT
162+
let [first_pu, second_pu]: [CpuSet; 2] = target
163+
.iter_set()
164+
.map(CpuSet::from)
165+
.collect::<Vec<_>>()
166+
.try_into()
167+
.expect("E: unreachable");
168+
first_pu_set.push(first_pu);
169+
second_pu_set.push(second_pu);
170+
}
171+
Some(_) | None => {
172+
panic!("E: architecture too cursed")
173+
}
174+
}
175+
});
176+
first_pu_set.append(&mut second_pu_set);
177+
Some(first_pu_set)
178+
}
179+
}
180+
}
181+
36182
#[cfg(feature = "profiling")]
37183
pub static mut PERF_FIFO: Option<File> = None;
38184

0 commit comments

Comments
 (0)