Skip to content

P3481 R2 - Incorporate changes from the "bulk issues" paper #1500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
04c051f
Add execution policy parameter to `bulk`.
lucteo Mar 4, 2025
9801660
The functor passed to `bulk` needs to be copy-constructible.
lucteo Mar 4, 2025
4214768
Store the execution policy inside bulk operation state (if needed)
lucteo Mar 18, 2025
7f17135
Add bulk_chunked and bulk_unchunked -- first attempt
lucteo Mar 22, 2025
d04dfe5
Fix `bulk_chunked`. Improve the implementation.
lucteo Mar 23, 2025
d5801a6
Add more tests for bulk_chunked and bulk_unchunked
lucteo Mar 23, 2025
95f642a
Proper lowering of `bulk` into `bulk_chunked`.
lucteo Mar 23, 2025
3984ae5
Add test showing that `bulk` can be customized independently of `bulk…
lucteo Mar 23, 2025
f102fe9
Customize `bulk_chunked` instead of `bulk` for static thread pool.
lucteo Mar 24, 2025
e751c5e
The execution policy passed to `bulk*` is taken into consideration in…
lucteo Mar 24, 2025
1a3e019
Merge branch 'main' into P3481R2_bulk_issues2
ericniebler Mar 24, 2025
bfecde6
Fix data constructors to take policy by const reference.
lucteo Mar 25, 2025
a087e04
Merge remote-tracking branch 'origin/main' into P3481R2_bulk_issues2
ericniebler Mar 30, 2025
cbbe3ee
fix CUDA build
ericniebler Mar 30, 2025
ea7bdf9
Merge branch 'main' into P3481R2_bulk_issues2
ericniebler Apr 2, 2025
f3723e7
Merge branch 'main' into P3481R2_bulk_issues2
ericniebler Apr 3, 2025
56a7d73
Merge remote-tracking branch 'origin/main' into P3481R2_bulk_issues2
ericniebler Apr 4, 2025
ddaba6f
work around bug in nvc++
ericniebler Apr 4, 2025
67740ab
Merge remote-tracking branch 'upstream/main' into P3481R2_bulk_issues2
lucteo Apr 20, 2025
8e9547c
Workaround MSVC bug.
lucteo Apr 24, 2025
a6dfb60
Apply suggestions from code review
lucteo Apr 27, 2025
a78089f
Fix CI failure
lucteo Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/nvexec/bulk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ auto main() -> int {

auto snd = ex::transfer_when_all( //
sch,
fork | ex::bulk(4, bulk_fn(1)),
fork | ex::bulk(ex::par, 4, bulk_fn(1)),
fork | ex::then(then_fn(1)),
fork | ex::bulk(4, bulk_fn(2)))
fork | ex::bulk(ex::par, 4, bulk_fn(2)))
| ex::then(then_fn(2));

stdexec::sync_wait(std::move(snd));
Expand Down
7 changes: 4 additions & 3 deletions examples/nvexec/maxwell/snr.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ auto maxwell_eqs_snr(
computer,
repeat_n(
n_iterations,
ex::bulk(accessor.cells, update_h(accessor))
| ex::bulk(accessor.cells, update_e(time, dt, accessor))))
ex::bulk(ex::par, accessor.cells, update_h(accessor))
| ex::bulk(ex::par, accessor.cells, update_e(time, dt, accessor))))
| ex::then(dump_vtk(write_results, accessor));
}

Expand All @@ -436,7 +436,8 @@ void run_snr(
time_storage_t time{is_gpu_scheduler(computer)};
fields_accessor accessor = grid.accessor();

auto init = ex::just() | exec::on(computer, ex::bulk(grid.cells, grid_initializer(dt, accessor)));
auto init = ex::just()
| exec::on(computer, ex::bulk(ex::par, grid.cells, grid_initializer(dt, accessor)));
stdexec::sync_wait(init);

auto snd = maxwell_eqs_snr(dt, time.get(), write_vtk, n_iterations, accessor, computer);
Expand Down
26 changes: 13 additions & 13 deletions examples/nvexec/maxwell_distributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,8 @@ namespace distributed {
, begin(grid_begin)
, end(grid_end)
, own_cells(end - begin)
, fields_(
device_alloc<float>(
static_cast<std::size_t>(own_cells + n * 2)
* static_cast<int>(field_id::fields_count))) {
, fields_(device_alloc<float>(
static_cast<std::size_t>(own_cells + n * 2) * static_cast<int>(field_id::fields_count))) {
}

[[nodiscard]]
Expand Down Expand Up @@ -426,7 +424,7 @@ auto main(int argc, char *argv[]) -> int {

stdexec::sync_wait(
ex::schedule(gpu)
| ex::bulk(accessor.own_cells(), distributed::grid_initializer(dt, accessor)));
| ex::bulk(ex::par, accessor.own_cells(), distributed::grid_initializer(dt, accessor)));

const int prev_rank = rank == 0 ? size - 1 : rank - 1;
const int next_rank = rank == (size - 1) ? 0 : rank + 1;
Expand Down Expand Up @@ -481,13 +479,13 @@ auto main(int argc, char *argv[]) -> int {

for (std::size_t compute_step = 0; compute_step < n_iterations; compute_step++) {
auto compute_h = ex::when_all(
ex::just() | exec::on(gpu, ex::bulk(bulk_cells, bulk_h_update)),
ex::just() | exec::on(gpu_with_priority, ex::bulk(border_cells, border_h_update))
ex::just() | exec::on(gpu, ex::bulk(ex::par, bulk_cells, bulk_h_update)),
ex::just() | exec::on(gpu_with_priority, ex::bulk(ex::par, border_cells, border_h_update))
| ex::then(exchange_hx));

auto compute_e = ex::when_all(
ex::just() | exec::on(gpu, ex::bulk(bulk_cells, bulk_e_update)),
ex::just() | exec::on(gpu_with_priority, ex::bulk(border_cells, border_e_update))
ex::just() | exec::on(gpu, ex::bulk(ex::par, bulk_cells, bulk_e_update)),
ex::just() | exec::on(gpu_with_priority, ex::bulk(ex::par, border_cells, border_e_update))
| ex::then(exchange_ez));

stdexec::sync_wait(std::move(compute_h));
Expand All @@ -497,14 +495,16 @@ auto main(int argc, char *argv[]) -> int {
write();
#else
for (std::size_t compute_step = 0; compute_step < n_iterations; compute_step++) {
auto compute_h = ex::just()
| exec::on(gpu, ex::bulk(accessor.own_cells(), distributed::update_h(accessor)))
| ex::then(exchange_hx);
auto compute_h =
ex::just()
| exec::on(gpu, ex::bulk(ex::par, accessor.own_cells(), distributed::update_h(accessor)))
| ex::then(exchange_hx);

auto compute_e =
ex::just()
| exec::on(
gpu, ex::bulk(accessor.own_cells(), distributed::update_e(time.get(), dt, accessor)))
gpu,
ex::bulk(ex::par, accessor.own_cells(), distributed::update_e(time.get(), dt, accessor)))
| ex::then(exchange_ez);

stdexec::sync_wait(std::move(compute_h));
Expand Down
6 changes: 3 additions & 3 deletions examples/nvexec/split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ auto main() -> int {
};
};

auto fork = ex::schedule(sch) | ex::bulk(4, bulk_fn(0)) | ex::split();
auto fork = ex::schedule(sch) | ex::bulk(ex::par, 4, bulk_fn(0)) | ex::split();

auto snd = ex::transfer_when_all(
sch,
fork | ex::bulk(4, bulk_fn(1)),
fork | ex::bulk(ex::par, 4, bulk_fn(1)),
fork | ex::then(then_fn(1)),
fork | ex::bulk(4, bulk_fn(2)))
fork | ex::bulk(ex::par, 4, bulk_fn(2)))
| ex::then(then_fn(2));

stdexec::sync_wait(std::move(snd));
Expand Down
2 changes: 1 addition & 1 deletion examples/server_theme/split_bulk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ auto handle_multi_blur_request(const http_request& req) -> ex::sender auto {
size_t img_count = imgs.size();
// return a sender that bulk-processes the image in parallel
return ex::just(std::move(imgs))
| ex::bulk(img_count, [](size_t i, std::vector<image>& imgs) {
| ex::bulk(ex::par, img_count, [](size_t i, std::vector<image>& imgs) {
imgs[i] = apply_blur(imgs[i]);
});
})
Expand Down
5 changes: 3 additions & 2 deletions include/exec/__detail/__system_context_default_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ namespace exec::__system_context_default_impl {

using __bulk_schedule_operation_t = __operation<decltype(stdexec::bulk(
stdexec::schedule(std::declval<__pool_scheduler_t>()),
stdexec::par,
std::declval<uint32_t>(),
std::declval<__bulk_functor>()))>;

Expand All @@ -205,8 +206,8 @@ namespace exec::__system_context_default_impl {
std::span<std::byte> __storage,
bulk_item_receiver& __r) noexcept override {
try {
auto __sndr =
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{&__r});
auto __sndr = stdexec::bulk(
stdexec::schedule(__pool_scheduler_), stdexec::par, __size, __bulk_functor{&__r});
auto __os =
__bulk_schedule_operation_t::__construct_maybe_alloc(__storage, &__r, std::move(__sndr));
__os->start();
Expand Down
3 changes: 2 additions & 1 deletion include/exec/libdispatch_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ namespace exec {
struct transform_bulk {
template <class Data, class Sender>
auto operator()(stdexec::bulk_t, Data &&data, Sender &&sndr) {
auto [shape, fun] = std::forward<Data>(data);
auto [pol, shape, fun] = std::forward<Data>(data);
// TODO: handle non-par execution policies
return bulk_sender_t<Sender, decltype(shape), decltype(fun)>{
queue_, std::forward<Sender>(sndr), shape, std::move(fun)};
}
Expand Down
99 changes: 60 additions & 39 deletions include/exec/static_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ namespace exec {
// TODO: code to reconstitute a static_thread_pool_ schedule sender
};

template <class SenderId, std::integral Shape, class Fun>
template <class SenderId, bool parallelize, std::integral Shape, class Fun>
struct bulk_sender {
using Sender = stdexec::__t<SenderId>;
struct __t;
};

template <sender Sender, std::integral Shape, class Fun>
using bulk_sender_t = __t<bulk_sender<__id<__decay_t<Sender>>, Shape, Fun>>;
template <sender Sender, bool parallelize, std::integral Shape, class Fun>
using bulk_sender_t = __t<bulk_sender<__id<__decay_t<Sender>>, parallelize, Shape, Fun>>;

#if STDEXEC_MSVC()
// MSVCBUG https://developercommunity.visualstudio.com/t/Alias-template-with-pack-expansion-in-no/10437850
Expand All @@ -195,11 +195,11 @@ namespace exec {
#endif

template <class Fun, class Shape, class... Args>
requires __callable<Fun, Shape, Args&...>
requires __callable<Fun, Shape, Shape, Args&...>
using bulk_non_throwing = //
__mbool<
// If function invocation doesn't throw
__nothrow_callable<Fun, Shape, Args&...> &&
__nothrow_callable<Fun, Shape, Shape, Args&...> &&
// and emplacing a tuple doesn't throw
#if STDEXEC_MSVC()
__bulk_non_throwing<Args...>::__v
Expand All @@ -209,36 +209,45 @@ namespace exec {
// there's no need to advertise completion with `exception_ptr`
>;

template <class CvrefSender, class Receiver, class Shape, class Fun, bool MayThrow>
template <class CvrefSender, class Receiver, bool parallelize, class Shape, class Fun, bool MayThrow>
struct bulk_shared_state;

template <class CvrefSenderId, class ReceiverId, class Shape, class Fun, bool MayThrow>
template <
class CvrefSenderId,
class ReceiverId,
bool parallelize,
class Shape,
class Fun,
bool MayThrow>
struct bulk_receiver {
using CvrefSender = __cvref_t<CvrefSenderId>;
using Receiver = stdexec::__t<ReceiverId>;
struct __t;
};

template <class CvrefSender, class Receiver, class Shape, class Fun, bool MayThrow>
using bulk_receiver_t =
__t<bulk_receiver<__cvref_id<CvrefSender>, __id<Receiver>, Shape, Fun, MayThrow>>;
template <class CvrefSender, class Receiver, bool parallelize, class Shape, class Fun, bool MayThrow>
using bulk_receiver_t = __t<
bulk_receiver<__cvref_id<CvrefSender>, __id<Receiver>, parallelize, Shape, Fun, MayThrow>>;

template <class CvrefSenderId, class ReceiverId, std::integral Shape, class Fun>
template <class CvrefSenderId, class ReceiverId, bool parallelize, std::integral Shape, class Fun>
struct bulk_op_state {
using CvrefSender = stdexec::__cvref_t<CvrefSenderId>;
using Receiver = stdexec::__t<ReceiverId>;
struct __t;
};

template <class Sender, class Receiver, std::integral Shape, class Fun>
using bulk_op_state_t =
__t<bulk_op_state<__id<__decay_t<Sender>>, __id<__decay_t<Receiver>>, Shape, Fun>>;
template <class Sender, class Receiver, bool parallelize, std::integral Shape, class Fun>
using bulk_op_state_t = __t<
bulk_op_state<__id<__decay_t<Sender>>, __id<__decay_t<Receiver>>, parallelize, Shape, Fun>>;

struct transform_bulk {
template <class Data, class Sender>
auto operator()(bulk_t, Data&& data, Sender&& sndr) {
auto [shape, fun] = static_cast<Data&&>(data);
return bulk_sender_t<Sender, decltype(shape), decltype(fun)>{
auto operator()(bulk_chunked_t, Data&& data, Sender&& sndr) {
auto [pol, shape, fun] = static_cast<Data&&>(data);
using policy_t = std::remove_cvref_t<decltype(pol.__get())>;
constexpr bool parallelize = std::same_as<policy_t, parallel_policy>
|| std::same_as<policy_t, parallel_unsequenced_policy>;
return bulk_sender_t<Sender, parallelize, decltype(shape), decltype(fun)>{
pool_, static_cast<Sender&&>(sndr), shape, std::move(fun)};
}

Expand All @@ -264,7 +273,7 @@ namespace exec {
public:
struct domain : stdexec::default_domain {
// For eager customization
template <sender_expr_for<bulk_t> Sender>
template <sender_expr_for<bulk_chunked_t> Sender>
auto transform_sender(Sender&& sndr) const noexcept {
if constexpr (__completes_on<Sender, static_thread_pool_::scheduler>) {
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr));
Expand All @@ -278,8 +287,8 @@ namespace exec {
}
}

// transform the generic bulk sender into a parallel thread-pool bulk sender
template <sender_expr_for<bulk_t> Sender, class Env>
// transform the generic bulk_chunked sender into a parallel thread-pool bulk sender
template <sender_expr_for<bulk_chunked_t> Sender, class Env>
auto transform_sender(Sender&& sndr, const Env& env) const noexcept {
if constexpr (__completes_on<Sender, static_thread_pool_::scheduler>) {
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr));
Expand Down Expand Up @@ -680,9 +689,8 @@ namespace exec {

for (std::uint32_t index = 0; index < threadCount; ++index) {
threadStates_[index].emplace(this, index, params, numa_);
threadIndexByNumaNode_.push_back(
thread_index_by_numa_node{
.numa_node = threadStates_[index]->numa_node(), .thread_index = index});
threadIndexByNumaNode_.push_back(thread_index_by_numa_node{
.numa_node = threadStates_[index]->numa_node(), .thread_index = index});
}

// NOLINTNEXTLINE(modernize-use-ranges) we still support platforms without the std::ranges algorithms
Expand Down Expand Up @@ -1103,8 +1111,8 @@ namespace exec {

//////////////////////////////////////////////////////////////////////////////////////////////////
// What follows is the implementation for parallel bulk execution on static_thread_pool_.
template <class SenderId, std::integral Shape, class Fun>
struct static_thread_pool_::bulk_sender<SenderId, Shape, Fun>::__t {
template <class SenderId, bool parallelize, std::integral Shape, class Fun>
struct static_thread_pool_::bulk_sender<SenderId, parallelize, Shape, Fun>::__t {
using __id = bulk_sender;
using sender_concept = sender_t;

Expand Down Expand Up @@ -1135,7 +1143,8 @@ namespace exec {

template <class Self, class Receiver>
using bulk_op_state_t = //
stdexec::__t<bulk_op_state<__cvref_id<Self, Sender>, stdexec::__id<Receiver>, Shape, Fun>>;
stdexec::__t<
bulk_op_state<__cvref_id<Self, Sender>, stdexec::__id<Receiver>, parallelize, Shape, Fun>>;

template <__decays_to<__t> Self, receiver Receiver>
requires receiver_of<Receiver, __completions_t<Self, env_of_t<Receiver>>>
Expand Down Expand Up @@ -1166,7 +1175,7 @@ namespace exec {
};

//! The customized operation state for `stdexec::bulk` operations
template <class CvrefSender, class Receiver, class Shape, class Fun, bool MayThrow>
template <class CvrefSender, class Receiver, bool parallelize, class Shape, class Fun, bool MayThrow>
struct static_thread_pool_::bulk_shared_state {
//! The actual `bulk_task` holds a pointer to the shared state
//! and its `__execute` function reads from that shared state.
Expand All @@ -1184,9 +1193,7 @@ namespace exec {
// In the case that the shape is much larger than the total number of threads,
// then each call to computation will call the function many times.
auto [begin, end] = even_share(sh_state.shape_, tid, total_threads);
for (Shape i = begin; i < end; ++i) {
sh_state.fun_(i, args...);
}
sh_state.fun_(begin, end, args...);
};

auto completion = [&](auto&... args) {
Expand Down Expand Up @@ -1252,8 +1259,12 @@ namespace exec {
//! That is, we don't need an agent for each of the shape values.
[[nodiscard]]
auto num_agents_required() const -> std::uint32_t {
return static_cast<std::uint32_t>(
std::min(shape_, static_cast<Shape>(pool_.available_parallelism())));
if constexpr (parallelize) {
return static_cast<std::uint32_t>(
std::min(shape_, static_cast<Shape>(pool_.available_parallelism())));
} else {
return static_cast<std::uint32_t>(1);
}
}

template <class F>
Expand Down Expand Up @@ -1282,12 +1293,20 @@ namespace exec {
};

//! A customized receiver to allow parallel execution of `stdexec::bulk` operations:
template <class CvrefSenderId, class ReceiverId, class Shape, class Fun, bool MayThrow>
struct static_thread_pool_::bulk_receiver<CvrefSenderId, ReceiverId, Shape, Fun, MayThrow>::__t {
template <
class CvrefSenderId,
class ReceiverId,
bool parallelize,
class Shape,
class Fun,
bool MayThrow>
struct static_thread_pool_::
bulk_receiver<CvrefSenderId, ReceiverId, parallelize, Shape, Fun, MayThrow>::__t {
using __id = bulk_receiver;
using receiver_concept = receiver_t;

using shared_state = bulk_shared_state<CvrefSender, Receiver, Shape, Fun, MayThrow>;
using shared_state =
bulk_shared_state<CvrefSender, Receiver, parallelize, Shape, Fun, MayThrow>;

shared_state& shared_state_;

Expand Down Expand Up @@ -1337,8 +1356,9 @@ namespace exec {
}
};

template <class CvrefSenderId, class ReceiverId, std::integral Shape, class Fun>
struct static_thread_pool_::bulk_op_state<CvrefSenderId, ReceiverId, Shape, Fun>::__t {
template <class CvrefSenderId, class ReceiverId, bool parallelize, std::integral Shape, class Fun>
struct static_thread_pool_::bulk_op_state<CvrefSenderId, ReceiverId, parallelize, Shape, Fun>::
__t {
using __id = bulk_op_state;

static constexpr bool may_throw = //
Expand All @@ -1348,8 +1368,9 @@ namespace exec {
__mbind_front_q<bulk_non_throwing, Fun, Shape>,
__q<__mand>>>;

using bulk_rcvr = bulk_receiver_t<CvrefSender, Receiver, Shape, Fun, may_throw>;
using shared_state = bulk_shared_state<CvrefSender, Receiver, Shape, Fun, may_throw>;
using bulk_rcvr = bulk_receiver_t<CvrefSender, Receiver, parallelize, Shape, Fun, may_throw>;
using shared_state =
bulk_shared_state<CvrefSender, Receiver, parallelize, Shape, Fun, may_throw>;
using inner_op_state = connect_result_t<CvrefSender, bulk_rcvr>;

shared_state shared_state_;
Expand Down
3 changes: 2 additions & 1 deletion include/exec/system_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,8 @@ namespace exec {
struct __transform_parallel_bulk_sender {
template <class _Data, class _Previous>
auto operator()(stdexec::bulk_t, _Data&& __data, _Previous&& __previous) const noexcept {
auto [__shape, __fn] = static_cast<_Data&&>(__data);
auto [__pol, __shape, __fn] = static_cast<_Data&&>(__data);
// TODO: handle non-par execution policies
return __parallel_bulk_sender<_Previous, decltype(__shape), decltype(__fn)>{
__sched_, static_cast<_Previous&&>(__previous), __shape, std::move(__fn)};
}
Expand Down
2 changes: 1 addition & 1 deletion include/nvexec/stream/bulk.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ namespace nvexec::_strm {
struct transform_sender_for<stdexec::bulk_t> {
template <class Data, stream_completing_sender Sender>
auto operator()(__ignore, Data data, Sender&& sndr) const {
auto [shape, fun] = static_cast<Data&&>(data);
auto [policy, shape, fun] = static_cast<Data&&>(data);
using Shape = decltype(shape);
using Fn = decltype(fun);
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr));
Expand Down
Loading