From 04c051ff78b4d45493c61ddbb879faafab20853a Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Tue, 4 Mar 2025 21:55:48 +0200 Subject: [PATCH 01/16] Add execution policy parameter to `bulk`. As execution policies don't come by default with libc++, also added variants for them in stdexec. --- examples/nvexec/bulk.cpp | 4 +- examples/nvexec/maxwell/snr.cuh | 7 +- examples/nvexec/maxwell_distributed.cpp | 26 +++--- examples/nvexec/split.cpp | 6 +- examples/server_theme/split_bulk.cpp | 2 +- .../__system_context_default_impl.hpp | 5 +- include/stdexec/__detail/__bulk.hpp | 20 +++-- include/stdexec/__detail/__config.hpp | 6 ++ .../stdexec/__detail/__execution_legacy.hpp | 87 +++++++++++++++++++ include/stdexec/execution.hpp | 1 + test/exec/test_libdispatch.cpp | 8 +- test/exec/test_system_context.cpp | 6 +- test/execpools/test_asio_thread_pool.cpp | 18 ++-- test/execpools/test_taskflow_thread_pool.cpp | 18 ++-- test/execpools/test_tbb_thread_pool.cpp | 11 ++- test/nvexec/bulk.cpp | 30 ++++--- test/stdexec/algos/adaptors/test_bulk.cpp | 77 ++++++++-------- test/stdexec/cpos/test_cpo_bulk.cpp | 8 +- 18 files changed, 224 insertions(+), 116 deletions(-) create mode 100644 include/stdexec/__detail/__execution_legacy.hpp diff --git a/examples/nvexec/bulk.cpp b/examples/nvexec/bulk.cpp index 95fd53b44..dd63c1c54 100644 --- a/examples/nvexec/bulk.cpp +++ b/examples/nvexec/bulk.cpp @@ -43,9 +43,9 @@ auto main() -> int { auto snd = ex::transfer_when_all( // sch, - fork | ex::bulk(4, bulk_fn(1)), + fork | ex::bulk(ex::par, 4, bulk_fn(1)), fork | ex::then(then_fn(1)), - fork | ex::bulk(4, bulk_fn(2))) + fork | ex::bulk(ex::par, 4, bulk_fn(2))) | ex::then(then_fn(2)); stdexec::sync_wait(std::move(snd)); diff --git a/examples/nvexec/maxwell/snr.cuh b/examples/nvexec/maxwell/snr.cuh index e6890e686..7f2be75ec 100644 --- a/examples/nvexec/maxwell/snr.cuh +++ b/examples/nvexec/maxwell/snr.cuh @@ -421,8 +421,8 @@ auto maxwell_eqs_snr( computer, repeat_n( n_iterations, - ex::bulk(accessor.cells, update_h(accessor)) - | ex::bulk(accessor.cells, update_e(time, dt, accessor)))) + ex::bulk(ex::par, accessor.cells, update_h(accessor)) + | ex::bulk(ex::par, accessor.cells, update_e(time, dt, accessor)))) | ex::then(dump_vtk(write_results, accessor)); } @@ -436,7 +436,8 @@ void run_snr( time_storage_t time{is_gpu_scheduler(computer)}; fields_accessor accessor = grid.accessor(); - auto init = ex::just() | exec::on(computer, ex::bulk(grid.cells, grid_initializer(dt, accessor))); + auto init = ex::just() + | exec::on(computer, ex::bulk(ex::par, grid.cells, grid_initializer(dt, accessor))); stdexec::sync_wait(init); auto snd = maxwell_eqs_snr(dt, time.get(), write_vtk, n_iterations, accessor, computer); diff --git a/examples/nvexec/maxwell_distributed.cpp b/examples/nvexec/maxwell_distributed.cpp index 64d9c1bf2..031414632 100644 --- a/examples/nvexec/maxwell_distributed.cpp +++ b/examples/nvexec/maxwell_distributed.cpp @@ -88,10 +88,8 @@ namespace distributed { , begin(grid_begin) , end(grid_end) , own_cells(end - begin) - , fields_( - device_alloc( - static_cast(own_cells + n * 2) - * static_cast(field_id::fields_count))) { + , fields_(device_alloc( + static_cast(own_cells + n * 2) * static_cast(field_id::fields_count))) { } [[nodiscard]] @@ -426,7 +424,7 @@ auto main(int argc, char *argv[]) -> int { stdexec::sync_wait( ex::schedule(gpu) - | ex::bulk(accessor.own_cells(), distributed::grid_initializer(dt, accessor))); + | ex::bulk(ex::par, accessor.own_cells(), distributed::grid_initializer(dt, accessor))); const int prev_rank = rank == 0 ? size - 1 : rank - 1; const int next_rank = rank == (size - 1) ? 0 : rank + 1; @@ -481,13 +479,13 @@ auto main(int argc, char *argv[]) -> int { for (std::size_t compute_step = 0; compute_step < n_iterations; compute_step++) { auto compute_h = ex::when_all( - ex::just() | exec::on(gpu, ex::bulk(bulk_cells, bulk_h_update)), - ex::just() | exec::on(gpu_with_priority, ex::bulk(border_cells, border_h_update)) + ex::just() | exec::on(gpu, ex::bulk(ex::par, bulk_cells, bulk_h_update)), + ex::just() | exec::on(gpu_with_priority, ex::bulk(ex::par, border_cells, border_h_update)) | ex::then(exchange_hx)); auto compute_e = ex::when_all( - ex::just() | exec::on(gpu, ex::bulk(bulk_cells, bulk_e_update)), - ex::just() | exec::on(gpu_with_priority, ex::bulk(border_cells, border_e_update)) + ex::just() | exec::on(gpu, ex::bulk(ex::par, bulk_cells, bulk_e_update)), + ex::just() | exec::on(gpu_with_priority, ex::bulk(ex::par, border_cells, border_e_update)) | ex::then(exchange_ez)); stdexec::sync_wait(std::move(compute_h)); @@ -497,14 +495,16 @@ auto main(int argc, char *argv[]) -> int { write(); #else for (std::size_t compute_step = 0; compute_step < n_iterations; compute_step++) { - auto compute_h = ex::just() - | exec::on(gpu, ex::bulk(accessor.own_cells(), distributed::update_h(accessor))) - | ex::then(exchange_hx); + auto compute_h = + ex::just() + | exec::on(gpu, ex::bulk(ex::par, accessor.own_cells(), distributed::update_h(accessor))) + | ex::then(exchange_hx); auto compute_e = ex::just() | exec::on( - gpu, ex::bulk(accessor.own_cells(), distributed::update_e(time.get(), dt, accessor))) + gpu, + ex::bulk(ex::par, accessor.own_cells(), distributed::update_e(time.get(), dt, accessor))) | ex::then(exchange_ez); stdexec::sync_wait(std::move(compute_h)); diff --git a/examples/nvexec/split.cpp b/examples/nvexec/split.cpp index 46ea32049..4190059a2 100644 --- a/examples/nvexec/split.cpp +++ b/examples/nvexec/split.cpp @@ -39,13 +39,13 @@ auto main() -> int { }; }; - auto fork = ex::schedule(sch) | ex::bulk(4, bulk_fn(0)) | ex::split(); + auto fork = ex::schedule(sch) | ex::bulk(ex::par, 4, bulk_fn(0)) | ex::split(); auto snd = ex::transfer_when_all( sch, - fork | ex::bulk(4, bulk_fn(1)), + fork | ex::bulk(ex::par, 4, bulk_fn(1)), fork | ex::then(then_fn(1)), - fork | ex::bulk(4, bulk_fn(2))) + fork | ex::bulk(ex::par, 4, bulk_fn(2))) | ex::then(then_fn(2)); stdexec::sync_wait(std::move(snd)); diff --git a/examples/server_theme/split_bulk.cpp b/examples/server_theme/split_bulk.cpp index 63391e815..81dcf522c 100644 --- a/examples/server_theme/split_bulk.cpp +++ b/examples/server_theme/split_bulk.cpp @@ -155,7 +155,7 @@ auto handle_multi_blur_request(const http_request& req) -> ex::sender auto { size_t img_count = imgs.size(); // return a sender that bulk-processes the image in parallel return ex::just(std::move(imgs)) - | ex::bulk(img_count, [](size_t i, std::vector& imgs) { + | ex::bulk(ex::par, img_count, [](size_t i, std::vector& imgs) { imgs[i] = apply_blur(imgs[i]); }); }) diff --git a/include/exec/__detail/__system_context_default_impl.hpp b/include/exec/__detail/__system_context_default_impl.hpp index 3ba6d6897..260317d29 100644 --- a/include/exec/__detail/__system_context_default_impl.hpp +++ b/include/exec/__detail/__system_context_default_impl.hpp @@ -184,6 +184,7 @@ namespace exec::__system_context_default_impl { using __bulk_schedule_operation_t = __operation()), + stdexec::par, std::declval(), std::declval<__bulk_functor>()))>; @@ -204,8 +205,8 @@ namespace exec::__system_context_default_impl { std::span __storage, bulk_item_receiver& __r) noexcept override { try { - auto __sndr = - stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{&__r}); + auto __sndr = stdexec::bulk( + stdexec::schedule(__pool_scheduler_), stdexec::par, __size, __bulk_functor{&__r}); auto __os = __bulk_schedule_operation_t::__construct_maybe_alloc(__storage, &__r, std::move(__sndr)); __os->start(); diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index 96a04ab15..771df75d0 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -15,6 +15,7 @@ */ #pragma once +#include "__execution_legacy.hpp" #include "__execution_fwd.hpp" // include these after __execution_fwd.hpp @@ -35,7 +36,8 @@ namespace stdexec { ///////////////////////////////////////////////////////////////////////////// // [execution.senders.adaptors.bulk] namespace __bulk { - inline constexpr __mstring __bulk_context = "In stdexec::bulk(Sender, Shape, Function)..."_mstr; + inline constexpr __mstring __bulk_context = + "In stdexec::bulk(Sender, Policy, Shape, Function)..."_mstr; using __on_not_callable = __callable_error<__bulk_context>; template @@ -69,8 +71,9 @@ namespace stdexec { __with_error_invoke_t<__on_not_callable, _Fun, _Shape, _CvrefSender, _Env...>>; struct bulk_t { - template - STDEXEC_ATTRIBUTE((host, device)) auto operator()(_Sender&& __sndr, _Shape __shape, _Fun __fun) const + template + requires is_execution_policy_v> + STDEXEC_ATTRIBUTE((host, device)) auto operator()(_Sender&& __sndr, _Policy&& __pol, _Shape __shape, _Fun __fun) const -> __well_formed_sender auto { auto __domain = __get_early_domain(__sndr); return stdexec::transform_sender( @@ -79,11 +82,14 @@ namespace stdexec { __data{__shape, static_cast<_Fun&&>(__fun)}, static_cast<_Sender&&>(__sndr))); } - template - STDEXEC_ATTRIBUTE((always_inline)) auto - operator()(_Shape __shape, _Fun __fun) const -> __binder_back { + template + requires is_execution_policy_v> + STDEXEC_ATTRIBUTE((always_inline)) auto operator()(_Policy&& __pol, _Shape __shape, _Fun __fun) const + -> __binder_back { return { - {static_cast<_Shape&&>(__shape), static_cast<_Fun&&>(__fun)}, + {static_cast<_Policy&&>(__pol), + static_cast<_Shape&&>(__shape), + static_cast<_Fun&&>(__fun)}, {}, {} }; diff --git a/include/stdexec/__detail/__config.hpp b/include/stdexec/__detail/__config.hpp index 9de731501..d1584e308 100644 --- a/include/stdexec/__detail/__config.hpp +++ b/include/stdexec/__detail/__config.hpp @@ -402,6 +402,12 @@ namespace stdexec { # define STDEXEC_HAS_STD_MEMORY_RESOURCE() 0 #endif +#if STDEXEC_HAS_FEATURE(__cpp_lib_execution) +# define STDEXEC_HAS_EXECUTION_POLICY() 1 +#else +# define STDEXEC_HAS_EXECUTION_POLICY() 0 +#endif + #ifdef STDEXEC_ASSERT # error "Redefinition of STDEXEC_ASSERT is not permitted. Define STDEXEC_ASSERT_FN instead." #endif diff --git a/include/stdexec/__detail/__execution_legacy.hpp b/include/stdexec/__detail/__execution_legacy.hpp new file mode 100644 index 000000000..59bfc7ee6 --- /dev/null +++ b/include/stdexec/__detail/__execution_legacy.hpp @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2025 Lucian Radu Teodorescu + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "__config.hpp" + +namespace stdexec { + +#if STDEXEC_HAS_EXECUTION_POLICY() + + using sequenced_policy = std::execution::sequenced_policy; + using parallel_policy = std::execution::parallel_policy; + using parallel_unsequenced_policy = std::execution::parallel_unsequenced_policy; + using unsequenced_policy = std::execution::unsequenced_policy; + + using seq = std::execution::seq; + using par = std::execution::par; + using par_unseq = std::execution::par_unseq; + using unseq = std::execution::unseq; + + using std::execution::is_execution_policy_v; + using std::execution::is_execution_policy; + +#else + + struct sequenced_policy { + constexpr sequenced_policy() = default; + sequenced_policy(const sequenced_policy&) = delete; + sequenced_policy& operator=(const sequenced_policy&) = delete; + }; + + struct parallel_policy { + constexpr parallel_policy() = default; + parallel_policy(const parallel_policy&) = delete; + parallel_policy& operator=(const parallel_policy&) = delete; + }; + + struct parallel_unsequenced_policy { + constexpr parallel_unsequenced_policy() = default; + parallel_unsequenced_policy(const parallel_unsequenced_policy&) = delete; + parallel_unsequenced_policy& operator=(const parallel_unsequenced_policy&) = delete; + }; + + struct unsequenced_policy { + constexpr unsequenced_policy() = default; + unsequenced_policy(const unsequenced_policy&) = delete; + unsequenced_policy& operator=(const unsequenced_policy&) = delete; + }; + + inline constexpr sequenced_policy seq{}; + inline constexpr parallel_policy par{}; + inline constexpr parallel_unsequenced_policy par_unseq{}; + inline constexpr unsequenced_policy unseq{}; + + template + inline constexpr bool is_execution_policy_v = false; + + template <> + inline constexpr bool is_execution_policy_v = true; + + template <> + inline constexpr bool is_execution_policy_v = true; + + template <> + inline constexpr bool is_execution_policy_v = true; + + template <> + inline constexpr bool is_execution_policy_v = true; + + template + struct is_execution_policy : std::bool_constant> { }; + +#endif +} // namespace stdexec \ No newline at end of file diff --git a/include/stdexec/execution.hpp b/include/stdexec/execution.hpp index ab4bf572d..930292340 100644 --- a/include/stdexec/execution.hpp +++ b/include/stdexec/execution.hpp @@ -15,6 +15,7 @@ */ #pragma once +#include "__detail/__execution_legacy.hpp" #include "__detail/__execution_fwd.hpp" // include these after __execution_fwd.hpp diff --git a/test/exec/test_libdispatch.cpp b/test/exec/test_libdispatch.cpp index 2f6c5fae3..52c1844fc 100644 --- a/test/exec/test_libdispatch.cpp +++ b/test/exec/test_libdispatch.cpp @@ -50,8 +50,8 @@ namespace { auto add = [](auto const & data) { return std::accumulate(std::begin(data), std::end(data), 0); }; - auto sender = stdexec::transfer_just(sch, std::move(data)) // - | stdexec::bulk(size, expensive_computation) // + auto sender = stdexec::transfer_just(sch, std::move(data)) // + | stdexec::bulk(stdexec::par, size, expensive_computation) // | stdexec::then(add); auto completion_scheduler = @@ -76,8 +76,8 @@ namespace { auto add = [](auto const & data) { return std::accumulate(std::begin(data), std::end(data), 0); }; - auto sender = stdexec::transfer_just(sch, std::move(data)) // - | stdexec::bulk(size, expensive_computation) // + auto sender = stdexec::transfer_just(sch, std::move(data)) // + | stdexec::bulk(stdexec::par, size, expensive_computation) // | stdexec::then(add); diff --git a/test/exec/test_system_context.cpp b/test/exec/test_system_context.cpp index c61e44975..e76948f88 100644 --- a/test/exec/test_system_context.cpp +++ b/test/exec/test_system_context.cpp @@ -150,7 +150,7 @@ TEST_CASE("simple bulk task on system context", "[types][system_scheduler]") { std::thread::id pool_ids[num_tasks]; exec::parallel_scheduler sched = exec::get_parallel_scheduler(); - auto bulk_snd = ex::bulk(ex::schedule(sched), num_tasks, [&](unsigned long id) { + auto bulk_snd = ex::bulk(ex::schedule(sched), ex::par, num_tasks, [&](unsigned long id) { pool_ids[id] = std::this_thread::get_id(); }); @@ -176,8 +176,8 @@ TEST_CASE("simple bulk chaining on system context", "[types][system_scheduler]") return pool_id; }); - auto bulk_snd = - ex::bulk(std::move(snd), num_tasks, [&](unsigned long id, std::thread::id propagated_pool_id) { + auto bulk_snd = ex::bulk( + std::move(snd), ex::par, num_tasks, [&](unsigned long id, std::thread::id propagated_pool_id) { propagated_pool_ids[id] = propagated_pool_id; pool_ids[id] = std::this_thread::get_id(); }); diff --git a/test/execpools/test_asio_thread_pool.cpp b/test/execpools/test_asio_thread_pool.cpp index 73714185b..3a9037805 100644 --- a/test/execpools/test_asio_thread_pool.cpp +++ b/test/execpools/test_asio_thread_pool.cpp @@ -57,7 +57,7 @@ namespace { // clang-format off return transfer_just(sch, std::move(partials)) - | bulk(tile_count, + | bulk(ex::par, tile_count, [=](std::size_t i, std::span partials) { auto start = i * tile_size; auto end = std::min(input.size(), (i + 1) * tile_size); @@ -68,7 +68,7 @@ namespace { std::inclusive_scan(begin(partials), end(partials), begin(partials)); return std::move(partials); }) - | bulk(tile_count, + | bulk(ex::par, tile_count, [=](std::size_t i, std::span partials) { auto start = i * tile_size; auto end = std::min(input.size(), (i + 1) * tile_size); @@ -139,14 +139,12 @@ namespace { execpools::asio_thread_pool taskflow_pool; exec::static_thread_pool other_pool(1ul); { - CHECK_THROWS( - stdexec::sync_wait(starts_on(taskflow_pool.get_scheduler(), just(0)) | then([](auto) { - throw std::exception(); - }))); - CHECK_THROWS( - stdexec::sync_wait(starts_on(other_pool.get_scheduler(), just(0)) | then([](auto) { - throw std::exception(); - }))); + CHECK_THROWS(stdexec::sync_wait( + starts_on(taskflow_pool.get_scheduler(), just(0)) + | then([](auto) { throw std::exception(); }))); + CHECK_THROWS(stdexec::sync_wait( + starts_on(other_pool.get_scheduler(), just(0)) + | then([](auto) { throw std::exception(); }))); } // Ensure it still works normally after exceptions: { diff --git a/test/execpools/test_taskflow_thread_pool.cpp b/test/execpools/test_taskflow_thread_pool.cpp index 5a77ee351..27b693258 100644 --- a/test/execpools/test_taskflow_thread_pool.cpp +++ b/test/execpools/test_taskflow_thread_pool.cpp @@ -57,7 +57,7 @@ namespace { // clang-format off return transfer_just(sch, std::move(partials)) - | bulk(tile_count, + | bulk(ex::par, tile_count, [=](std::size_t i, std::span partials) { auto start = i * tile_size; auto end = std::min(input.size(), (i + 1) * tile_size); @@ -68,7 +68,7 @@ namespace { std::inclusive_scan(begin(partials), end(partials), begin(partials)); return std::move(partials); }) - | bulk(tile_count, + | bulk(ex::par, tile_count, [=](std::size_t i, std::span partials) { auto start = i * tile_size; auto end = std::min(input.size(), (i + 1) * tile_size); @@ -139,14 +139,12 @@ namespace { execpools::taskflow_thread_pool taskflow_pool; exec::static_thread_pool other_pool(1ul); { - CHECK_THROWS( - stdexec::sync_wait(starts_on(taskflow_pool.get_scheduler(), just(0)) | then([](auto) { - throw std::exception(); - }))); - CHECK_THROWS( - stdexec::sync_wait(starts_on(other_pool.get_scheduler(), just(0)) | then([](auto) { - throw std::exception(); - }))); + CHECK_THROWS(stdexec::sync_wait( + starts_on(taskflow_pool.get_scheduler(), just(0)) + | then([](auto) { throw std::exception(); }))); + CHECK_THROWS(stdexec::sync_wait( + starts_on(other_pool.get_scheduler(), just(0)) + | then([](auto) { throw std::exception(); }))); } // Ensure it still works normally after exceptions: { diff --git a/test/execpools/test_tbb_thread_pool.cpp b/test/execpools/test_tbb_thread_pool.cpp index 87a54f388..076b14d9d 100644 --- a/test/execpools/test_tbb_thread_pool.cpp +++ b/test/execpools/test_tbb_thread_pool.cpp @@ -57,7 +57,7 @@ namespace { // clang-format off return transfer_just(sch, std::move(partials)) - | bulk(tile_count, + | bulk(ex::par, tile_count, [=](std::size_t i, std::span partials) { auto start = i * tile_size; auto end = std::min(input.size(), (i + 1) * tile_size); @@ -68,7 +68,7 @@ namespace { std::inclusive_scan(begin(partials), end(partials), begin(partials)); return std::move(partials); }) - | bulk(tile_count, + | bulk(ex::par, tile_count, [=](std::size_t i, std::span partials) { auto start = i * tile_size; auto end = std::min(input.size(), (i + 1) * tile_size); @@ -144,10 +144,9 @@ namespace { CHECK_THROWS(stdexec::sync_wait(starts_on(tbb_pool.get_scheduler(), just(0)) | then([](auto) { throw std::exception(); }))); - CHECK_THROWS( - stdexec::sync_wait(starts_on(other_pool.get_scheduler(), just(0)) | then([](auto) { - throw std::exception(); - }))); + CHECK_THROWS(stdexec::sync_wait( + starts_on(other_pool.get_scheduler(), just(0)) + | then([](auto) { throw std::exception(); }))); } // Ensure it still works normally after exceptions: { diff --git a/test/nvexec/bulk.cpp b/test/nvexec/bulk.cpp index c207d901b..c9ff24e2f 100644 --- a/test/nvexec/bulk.cpp +++ b/test/nvexec/bulk.cpp @@ -14,7 +14,7 @@ namespace { TEST_CASE("nvexec bulk returns a sender", "[cuda][stream][adaptors][bulk]") { nvexec::stream_context stream_ctx{}; - auto snd = ex::bulk(ex::schedule(stream_ctx.get_scheduler()), 42, [](int) { }); + auto snd = ex::bulk(ex::schedule(stream_ctx.get_scheduler()), ex::par, 42, [](int) { }); STATIC_REQUIRE(ex::sender); (void) snd; } @@ -26,7 +26,7 @@ namespace { auto flags = flags_storage.get(); auto snd = ex::schedule(stream_ctx.get_scheduler()) // - | ex::bulk(4, [=](int idx) { + | ex::bulk(ex::par, 4, [=](int idx) { if (is_on_gpu()) { flags.set(idx); } @@ -43,7 +43,7 @@ namespace { auto flags = flags_storage.get(); auto snd = ex::transfer_just(stream_ctx.get_scheduler(), 42) // - | ex::bulk(1024, [=](int idx, int val) { + | ex::bulk(ex::par, 1024, [=](int idx, int val) { if (is_on_gpu()) { if (val == 42) { flags.set(idx); @@ -62,7 +62,7 @@ namespace { auto flags = flags_storage.get(); auto snd = ex::transfer_just(stream_ctx.get_scheduler(), 42, 4.2) // - | ex::bulk(2, [=](int idx, int i, double d) { + | ex::bulk(ex::par, 2, [=](int idx, int i, double d) { if (is_on_gpu()) { if (i == 42 && d == 4.2) { flags.set(idx); @@ -86,7 +86,7 @@ namespace { auto flags = flags_storage.get(); auto snd = ex::transfer_just(stream_ctx.get_scheduler(), flags) // - | ex::bulk(1024, [](int idx, const flags_t& flags) { + | ex::bulk(ex::par, 1024, [](int idx, const flags_t& flags) { if (is_on_gpu()) { flags.set(idx); } @@ -104,6 +104,7 @@ namespace { auto snd = ex::schedule(stream_ctx.get_scheduler()) // | ex::bulk( + ex::par, 2, [flags](int idx) { if (is_on_gpu()) { @@ -132,7 +133,7 @@ namespace { flags.set(2); } }) - | ex::bulk(2, [flags](int idx) { + | ex::bulk(ex::par, 2, [flags](int idx) { if (is_on_gpu()) { flags.set(idx); } @@ -149,7 +150,7 @@ namespace { auto snd = ex::schedule(stream_ctx.get_scheduler()) // | a_sender([]() -> bool { return is_on_gpu(); }) - | ex::bulk(2, [flags](int idx, bool a_sender_was_on_gpu) { + | ex::bulk(ex::par, 2, [flags](int idx, bool a_sender_was_on_gpu) { if (a_sender_was_on_gpu && is_on_gpu()) { flags.set(idx); } @@ -169,13 +170,14 @@ namespace { const int nelems = 10; cudaMallocManaged(&inout, nelems * sizeof(double)); - auto task = stdexec::transfer_just(ctx.get_scheduler(), cuda::std::span{inout, nelems}) - | stdexec::bulk( - nelems, [](std::size_t i, cuda::std::span out) { out[i] = (double) i; }) - | stdexec::let_value([](cuda::std::span out) { return stdexec::just(out); }) - | stdexec::bulk(nelems, [](std::size_t i, cuda::std::span out) { - out[i] = 2.0 * out[i]; - }); + auto task = + stdexec::transfer_just(ctx.get_scheduler(), cuda::std::span{inout, nelems}) + | stdexec::bulk( + ex::par, nelems, [](std::size_t i, cuda::std::span out) { out[i] = (double) i; }) + | stdexec::let_value([](cuda::std::span out) { return stdexec::just(out); }) + | stdexec::bulk(ex::par, nelems, [](std::size_t i, cuda::std::span out) { + out[i] = 2.0 * out[i]; + }); stdexec::sync_wait(std::move(task)).value(); diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index 388e55e8a..2c917ad91 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -44,28 +44,29 @@ namespace { }; TEST_CASE("bulk returns a sender", "[adaptors][bulk]") { - auto snd = ex::bulk(ex::just(19), 8, [](int, int) { }); + auto snd = ex::bulk(ex::just(19), ex::par, 8, [](int, int) { }); static_assert(ex::sender); (void) snd; } TEST_CASE("bulk with environment returns a sender", "[adaptors][bulk]") { - auto snd = ex::bulk(ex::just(19), 8, [](int, int) { }); + auto snd = ex::bulk(ex::just(19), ex::par, 8, [](int, int) { }); static_assert(ex::sender_in); (void) snd; } TEST_CASE("bulk can be piped", "[adaptors][bulk]") { - ex::sender auto snd = ex::just() | ex::bulk(42, [](int) { }); + ex::sender auto snd = ex::just() | ex::bulk(ex::par, 42, [](int) { }); (void) snd; } TEST_CASE("bulk keeps values_type from input sender", "[adaptors][bulk]") { constexpr int n = 42; - check_val_types>>(ex::just() | ex::bulk(n, [](int) { })); - check_val_types>>(ex::just(4.2) | ex::bulk(n, [](int, double) { })); + check_val_types>>(ex::just() | ex::bulk(ex::par, n, [](int) { })); + check_val_types>>(ex::just(4.2) | ex::bulk(ex::par, n, [](int, double) { + })); check_val_types>>( - ex::just(4.2, std::string{}) | ex::bulk(n, [](int, double, std::string) { })); + ex::just(4.2, std::string{}) | ex::bulk(ex::par, n, [](int, double, std::string) { })); } TEST_CASE("bulk keeps error_types from input sender", "[adaptors][bulk]") { @@ -75,15 +76,15 @@ namespace { error_scheduler sched3{43}; check_err_types>( // - ex::transfer_just(sched1) | ex::bulk(n, [](int) noexcept {})); + ex::transfer_just(sched1) | ex::bulk(ex::par, n, [](int) noexcept { })); check_err_types>( // - ex::transfer_just(sched2) | ex::bulk(n, [](int) noexcept {})); + ex::transfer_just(sched2) | ex::bulk(ex::par, n, [](int) noexcept { })); check_err_types>( // - ex::just_error(n) | ex::bulk(n, [](int) noexcept {})); + ex::just_error(n) | ex::bulk(ex::par, n, [](int) noexcept { })); check_err_types>( // - ex::transfer_just(sched3) | ex::bulk(n, [](int) noexcept {})); + ex::transfer_just(sched3) | ex::bulk(ex::par, n, [](int) noexcept { })); check_err_types>( // - ex::transfer_just(sched3) | ex::bulk(n, [](int) { throw std::logic_error{"err"}; })); + ex::transfer_just(sched3) | ex::bulk(ex::par, n, [](int) { throw std::logic_error{"err"}; })); } TEST_CASE("bulk can be used with a function", "[adaptors][bulk]") { @@ -91,7 +92,7 @@ namespace { static int counter[n]{}; std::fill_n(counter, n, 0); - ex::sender auto snd = ex::just() | ex::bulk(n, function); + ex::sender auto snd = ex::just() | ex::bulk(ex::par, n, function); auto op = ex::connect(std::move(snd), expect_void_receiver{}); ex::start(op); @@ -105,7 +106,7 @@ namespace { int counter[n]{0}; function_object_t fn{counter}; - ex::sender auto snd = ex::just() | ex::bulk(n, fn); + ex::sender auto snd = ex::just() | ex::bulk(ex::par, n, fn); auto op = ex::connect(std::move(snd), expect_void_receiver{}); ex::start(op); @@ -118,7 +119,7 @@ namespace { constexpr int n = 9; int counter[n]{0}; - ex::sender auto snd = ex::just() | ex::bulk(n, [&](int i) { counter[i]++; }); + ex::sender auto snd = ex::just() | ex::bulk(ex::par, n, [&](int i) { counter[i]++; }); auto op = ex::connect(std::move(snd), expect_void_receiver{}); ex::start(op); @@ -133,7 +134,7 @@ namespace { int counter[n]{0}; auto snd = ex::just(magic_number) // - | ex::bulk(n, [&](int i, int val) { + | ex::bulk(ex::par, n, [&](int i, int val) { if (val == magic_number) { counter[i]++; } @@ -152,9 +153,10 @@ namespace { std::vector vals_expected(n); std::iota(vals_expected.begin(), vals_expected.end(), 0); - auto snd = - ex::just(std::move(vals)) // - | ex::bulk(n, [&](std::size_t i, std::vector& vals) { vals[i] = static_cast(i); }); + auto snd = ex::just(std::move(vals)) // + | ex::bulk(ex::par, n, [&](std::size_t i, std::vector& vals) { + vals[i] = static_cast(i); + }); auto op = ex::connect(std::move(snd), expect_value_receiver{vals_expected}); ex::start(op); } @@ -164,7 +166,7 @@ namespace { constexpr int n = 2; auto snd = ex::just(magic_number) - | ex::bulk(n, [](int, int) { return function_object_t{nullptr}; }); + | ex::bulk(ex::par, n, [](int, int) { return function_object_t{nullptr}; }); auto op = ex::connect(std::move(snd), expect_value_receiver{magic_number}); ex::start(op); @@ -174,7 +176,7 @@ namespace { constexpr int n = 2; auto snd = ex::just() // - | ex::bulk(n, [](int) -> int { throw std::logic_error{"err"}; }); + | ex::bulk(ex::par, n, [](int) -> int { throw std::logic_error{"err"}; }); auto op = ex::connect(std::move(snd), expect_error_receiver{}); ex::start(op); } @@ -183,7 +185,8 @@ namespace { constexpr int n = 2; int called{}; - auto snd = ex::just_error(std::string{"err"}) | ex::bulk(n, [&called](int) { called++; }); + auto snd = ex::just_error(std::string{"err"}) + | ex::bulk(ex::par, n, [&called](int) { called++; }); auto op = ex::connect(std::move(snd), expect_error_receiver{std::string{"err"}}); ex::start(op); } @@ -192,7 +195,7 @@ namespace { constexpr int n = 2; int called{}; - auto snd = ex::just_stopped() | ex::bulk(n, [&called](int) { called++; }); + auto snd = ex::just_stopped() | ex::bulk(ex::par, n, [&called](int) { called++; }); auto op = ex::connect(std::move(snd), expect_stopped_receiver{}); ex::start(op); } @@ -206,8 +209,8 @@ namespace { std::vector counter(n, 42); auto snd = ex::transfer_just(sch) - | ex::bulk(n, [&counter](std::size_t idx) { counter[idx] = 0; }) - | ex::bulk(n, [&counter](std::size_t idx) { counter[idx]++; }); + | ex::bulk(ex::par, n, [&counter](std::size_t idx) { counter[idx] = 0; }) + | ex::bulk(ex::par, n, [&counter](std::size_t idx) { counter[idx]++; }); stdexec::sync_wait(std::move(snd)); const std::size_t actual = @@ -224,13 +227,14 @@ namespace { auto snd = ex::transfer_just(sch, 42) | ex::bulk( + ex::par, n, [&counter](std::size_t idx, int val) { if (val == 42) { counter[idx] = 0; } }) - | ex::bulk(n, [&counter](std::size_t idx, int val) { + | ex::bulk(ex::par, n, [&counter](std::size_t idx, int val) { if (val == 42) { counter[idx]++; } @@ -256,8 +260,10 @@ namespace { auto snd = ex::transfer_just(sch, std::move(vals)) | ex::bulk( - n, [](std::size_t idx, std::vector& vals) { vals[idx] = static_cast(idx); }) - | ex::bulk(n, [](std::size_t idx, std::vector& vals) { ++vals[idx]; }); + ex::par, + n, + [](std::size_t idx, std::vector& vals) { vals[idx] = static_cast(idx); }) + | ex::bulk(ex::par, n, [](std::size_t idx, std::vector& vals) { ++vals[idx]; }); auto [vals_actual] = stdexec::sync_wait(std::move(snd)).value(); CHECK(vals_actual == vals_expected); @@ -267,7 +273,7 @@ namespace { SECTION("With exception") { constexpr int n = 9; auto snd = ex::transfer_just(sch) - | ex::bulk(n, [](int) { throw std::runtime_error("bulk"); }); + | ex::bulk(ex::par, n, [](int) { throw std::runtime_error("bulk"); }); CHECK_THROWS_AS(stdexec::sync_wait(std::move(snd)), std::runtime_error); } @@ -278,8 +284,10 @@ namespace { std::vector counters_2(n, 0); stdexec::sender auto snd = stdexec::when_all( - stdexec::schedule(sch) | stdexec::bulk(n, [&](std::size_t id) { counters_1[id]++; }), - stdexec::schedule(sch) | stdexec::bulk(n, [&](std::size_t id) { counters_2[id]++; })); + stdexec::schedule(sch) + | stdexec::bulk(ex::par, n, [&](std::size_t id) { counters_1[id]++; }), + stdexec::schedule(sch) + | stdexec::bulk(ex::par, n, [&](std::size_t id) { counters_2[id]++; })); stdexec::sync_wait(std::move(snd)); @@ -301,7 +309,7 @@ namespace { }; auto snd = ex::just() // - | ex::continues_on(sch) | ex::bulk(tids.size(), fun); + | ex::continues_on(sch) | ex::bulk(ex::par, tids.size(), fun); CHECK(std::equal_to()(&snd.pool_, &pool)); stdexec::sync_wait(std::move(snd)); @@ -326,7 +334,7 @@ namespace { }; auto snd = ex::just() // - | ex::bulk(tids.size(), fun); + | ex::bulk(ex::par, tids.size(), fun); stdexec::sync_wait(stdexec::starts_on(sch, std::move(snd))); // All the work should not have run on the same thread @@ -338,7 +346,8 @@ namespace { } TEST_CASE("default bulk works with non-default constructible types", "[adaptors][bulk]") { - ex::sender auto s = ex::just(non_default_constructible{42}) | ex::bulk(1, [](int, auto&) { }); + ex::sender auto s = ex::just(non_default_constructible{42}) + | ex::bulk(ex::par, 1, [](int, auto&) { }); ex::sync_wait(std::move(s)); } @@ -347,7 +356,7 @@ namespace { ex::scheduler auto sch = pool.get_scheduler(); ex::sender auto s = ex::just(non_default_constructible{42}) | ex::continues_on(sch) - | ex::bulk(1, [](int, auto&) { }); + | ex::bulk(ex::par, 1, [](int, auto&) { }); ex::sync_wait(std::move(s)); } } // namespace diff --git a/test/stdexec/cpos/test_cpo_bulk.cpp b/test/stdexec/cpos/test_cpo_bulk.cpp index 2cb128267..4995c170c 100644 --- a/test/stdexec/cpos/test_cpo_bulk.cpp +++ b/test/stdexec/cpos/test_cpo_bulk.cpp @@ -28,12 +28,12 @@ namespace { cpo_test_sender_t snd{}; { - constexpr scope_t scope = decltype(snd | ex::bulk(n, f))::scope; + constexpr scope_t scope = decltype(snd | ex::bulk(ex::par, n, f))::scope; STATIC_REQUIRE(scope == scope_t::free_standing); } { - constexpr scope_t scope = decltype(ex::bulk(snd, n, f))::scope; + constexpr scope_t scope = decltype(ex::bulk(snd, ex::par, n, f))::scope; STATIC_REQUIRE(scope == scope_t::free_standing); } } @@ -42,12 +42,12 @@ namespace { cpo_test_scheduler_t::sender_t snd{}; { - constexpr scope_t scope = decltype(snd | ex::bulk(n, f))::scope; + constexpr scope_t scope = decltype(snd | ex::bulk(ex::par, n, f))::scope; STATIC_REQUIRE(scope == scope_t::scheduler); } { - constexpr scope_t scope = decltype(ex::bulk(snd, n, f))::scope; + constexpr scope_t scope = decltype(ex::bulk(snd, ex::par, n, f))::scope; STATIC_REQUIRE(scope == scope_t::scheduler); } } From 980166013de348f722ce2574c4113502f771809a Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Tue, 4 Mar 2025 22:57:43 +0200 Subject: [PATCH 02/16] The functor passed to `bulk` needs to be copy-constructible. --- include/stdexec/__detail/__bulk.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index 771df75d0..beb8fb8b7 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -71,7 +71,7 @@ namespace stdexec { __with_error_invoke_t<__on_not_callable, _Fun, _Shape, _CvrefSender, _Env...>>; struct bulk_t { - template + template requires is_execution_policy_v> STDEXEC_ATTRIBUTE((host, device)) auto operator()(_Sender&& __sndr, _Policy&& __pol, _Shape __shape, _Fun __fun) const -> __well_formed_sender auto { @@ -82,7 +82,7 @@ namespace stdexec { __data{__shape, static_cast<_Fun&&>(__fun)}, static_cast<_Sender&&>(__sndr))); } - template + template requires is_execution_policy_v> STDEXEC_ATTRIBUTE((always_inline)) auto operator()(_Policy&& __pol, _Shape __shape, _Fun __fun) const -> __binder_back { From 4214768e22b88fdb41b39df2a9e0e569d6957caf Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Wed, 19 Mar 2025 00:45:09 +0200 Subject: [PATCH 03/16] Store the execution policy inside bulk operation state (if needed) --- include/exec/libdispatch_queue.hpp | 3 +- include/exec/static_thread_pool.hpp | 3 +- include/exec/system_context.hpp | 3 +- include/stdexec/__detail/__bulk.hpp | 86 +++++++++++++++++-- .../stdexec/__detail/__execution_legacy.hpp | 18 ++-- 5 files changed, 97 insertions(+), 16 deletions(-) diff --git a/include/exec/libdispatch_queue.hpp b/include/exec/libdispatch_queue.hpp index 1f1b4d659..31154b349 100644 --- a/include/exec/libdispatch_queue.hpp +++ b/include/exec/libdispatch_queue.hpp @@ -100,7 +100,8 @@ namespace exec { struct transform_bulk { template auto operator()(stdexec::bulk_t, Data &&data, Sender &&sndr) { - auto [shape, fun] = std::forward(data); + auto [pol, shape, fun] = std::forward(data); + // TODO: handle non-par execution policies return bulk_sender_t{ queue_, std::forward(sndr), shape, std::move(fun)}; } diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index f25f29565..0cb4b0ffd 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -237,7 +237,8 @@ namespace exec { struct transform_bulk { template auto operator()(bulk_t, Data&& data, Sender&& sndr) { - auto [shape, fun] = static_cast(data); + auto [pol, shape, fun] = static_cast(data); + // TODO: handle non-par execution policies return bulk_sender_t{ pool_, static_cast(sndr), shape, std::move(fun)}; } diff --git a/include/exec/system_context.hpp b/include/exec/system_context.hpp index 4ed055bf5..686f99841 100644 --- a/include/exec/system_context.hpp +++ b/include/exec/system_context.hpp @@ -615,7 +615,8 @@ namespace exec { struct __transform_parallel_bulk_sender { template auto operator()(stdexec::bulk_t, _Data&& __data, _Previous&& __previous) const noexcept { - auto [__shape, __fn] = static_cast<_Data&&>(__data); + auto [__pol, __shape, __fn] = static_cast<_Data&&>(__data); + // TODO: handle non-par execution policies return __parallel_bulk_sender<_Previous, decltype(__shape), decltype(__fn)>{ __sched_, static_cast<_Previous&&>(__previous), __shape, std::move(__fn)}; } diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index beb8fb8b7..11bd2b886 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -40,14 +40,74 @@ namespace stdexec { "In stdexec::bulk(Sender, Policy, Shape, Function)..."_mstr; using __on_not_callable = __callable_error<__bulk_context>; - template + //! Wrapper for a policy object. + //! + //! If we wrap a standard execution policy, we don't store anything, as we know the type. + //! Stores the execution policy object if it's a non-standard one. + //! Provides a way to query the execution policy object. + template + struct __policy_wrapper { + _Pol __pol_; + + /*implicit*/ __policy_wrapper(_Pol __pol) + : __pol_{__pol} { + } + + const _Pol& __get() const noexcept { + return __pol_; + } + }; + + template <> + struct __policy_wrapper { + /*implicit*/ __policy_wrapper(sequenced_policy) { + } + + const sequenced_policy& __get() const noexcept { + return seq; + } + }; + + template <> + struct __policy_wrapper { + /*implicit*/ __policy_wrapper(const parallel_policy&) { + } + + const parallel_policy& __get() const noexcept { + return par; + } + }; + + template <> + struct __policy_wrapper { + /*implicit*/ __policy_wrapper(const parallel_unsequenced_policy&) { + } + + const parallel_unsequenced_policy& __get() const noexcept { + return par_unseq; + } + }; + + template <> + struct __policy_wrapper { + /*implicit*/ __policy_wrapper(const unsequenced_policy&) { + } + + const unsequenced_policy& __get() const noexcept { + return unseq; + } + }; + + template struct __data { + STDEXEC_ATTRIBUTE((no_unique_address)) __policy_wrapper<_Pol> __pol_; _Shape __shape_; STDEXEC_ATTRIBUTE((no_unique_address)) _Fun __fun_; - static constexpr auto __mbrs_ = __mliterals<&__data::__shape_, &__data::__fun_>(); + static constexpr auto __mbrs_ = + __mliterals<&__data::__pol_, &__data::__shape_, &__data::__fun_>(); }; - template - __data(_Shape, _Fun) -> __data<_Shape, _Fun>; + template + __data(_Pol, _Shape, _Fun) -> __data<_Pol, _Shape, _Fun>; template using __decay_ref = __decay_t<_Ty>&; @@ -79,7 +139,7 @@ namespace stdexec { return stdexec::transform_sender( __domain, __make_sexpr( - __data{__shape, static_cast<_Fun&&>(__fun)}, static_cast<_Sender&&>(__sndr))); + __data{__pol, __shape, static_cast<_Fun&&>(__fun)}, static_cast<_Sender&&>(__sndr))); } template @@ -94,6 +154,22 @@ namespace stdexec { {} }; } + + // This describes how to use the pieces of a bulk sender to find + // legacy customizations of the bulk algorithm. + using _Sender = __1; + using _Pol = __nth_member<0>(__0); + using _Shape = __nth_member<1>(__0); + using _Fun = __nth_member<2>(__0); + using __legacy_customizations_t = __types< + tag_invoke_t( + bulk_t, + get_completion_scheduler_t(get_env_t(_Sender&)), + _Sender, + _Pol, + _Shape, + _Fun), + tag_invoke_t(bulk_t, _Sender, _Pol, _Shape, _Fun)>; }; struct __bulk_impl : __sexpr_defaults { diff --git a/include/stdexec/__detail/__execution_legacy.hpp b/include/stdexec/__detail/__execution_legacy.hpp index 59bfc7ee6..1b6c0e4b6 100644 --- a/include/stdexec/__detail/__execution_legacy.hpp +++ b/include/stdexec/__detail/__execution_legacy.hpp @@ -36,34 +36,36 @@ namespace stdexec { #else + struct __hidden_construction {}; + struct sequenced_policy { - constexpr sequenced_policy() = default; + constexpr explicit sequenced_policy(__hidden_construction) {}; sequenced_policy(const sequenced_policy&) = delete; sequenced_policy& operator=(const sequenced_policy&) = delete; }; struct parallel_policy { - constexpr parallel_policy() = default; + constexpr explicit parallel_policy(__hidden_construction) {}; parallel_policy(const parallel_policy&) = delete; parallel_policy& operator=(const parallel_policy&) = delete; }; struct parallel_unsequenced_policy { - constexpr parallel_unsequenced_policy() = default; + constexpr explicit parallel_unsequenced_policy(__hidden_construction) {}; parallel_unsequenced_policy(const parallel_unsequenced_policy&) = delete; parallel_unsequenced_policy& operator=(const parallel_unsequenced_policy&) = delete; }; struct unsequenced_policy { - constexpr unsequenced_policy() = default; + constexpr explicit unsequenced_policy(__hidden_construction) {}; unsequenced_policy(const unsequenced_policy&) = delete; unsequenced_policy& operator=(const unsequenced_policy&) = delete; }; - inline constexpr sequenced_policy seq{}; - inline constexpr parallel_policy par{}; - inline constexpr parallel_unsequenced_policy par_unseq{}; - inline constexpr unsequenced_policy unseq{}; + inline constexpr sequenced_policy seq{__hidden_construction{}}; + inline constexpr parallel_policy par{__hidden_construction{}}; + inline constexpr parallel_unsequenced_policy par_unseq{__hidden_construction{}}; + inline constexpr unsequenced_policy unseq{__hidden_construction{}}; template inline constexpr bool is_execution_policy_v = false; From 7f17135e10e00d5ea115341f90ad210fb39f3777 Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Sat, 22 Mar 2025 17:35:16 +0200 Subject: [PATCH 04/16] Add bulk_chunked and bulk_unchunked -- first attempt --- include/stdexec/__detail/__bulk.hpp | 182 ++++++++++++++++++- include/stdexec/__detail/__execution_fwd.hpp | 6 + test/stdexec/algos/adaptors/test_bulk.cpp | 12 ++ 3 files changed, 193 insertions(+), 7 deletions(-) diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index 11bd2b886..ca616011b 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -38,7 +38,12 @@ namespace stdexec { namespace __bulk { inline constexpr __mstring __bulk_context = "In stdexec::bulk(Sender, Policy, Shape, Function)..."_mstr; + inline constexpr __mstring __bulk_chunked_context = + "In stdexec::bulk_chunked(Sender, Policy, Shape, Function)..."_mstr; + inline constexpr __mstring __bulk_unchunked_context = + "In stdexec::bulk_unchunked(Sender, Shape, Function)..."_mstr; using __on_not_callable = __callable_error<__bulk_context>; + using __on_not_callable2 = __callable_error<__bulk_chunked_context>; //! Wrapper for a policy object. //! @@ -123,14 +128,37 @@ namespace stdexec { __q<__mand>>, completion_signatures<>, __eptr_completion>; - + template + using __with_error_invoke2_t = // + __if< + __value_types_t< + __completion_signatures_of_t<_CvrefSender, _Env...>, + __mtransform< + __q<__decay_ref>, + __mbind_front<__mtry_catch_q<__nothrow_invocable_t, _Catch>, _Fun, _Shape, _Shape>>, + __q<__mand>>, + completion_signatures<>, + __eptr_completion>; + template using __completion_signatures = // transform_completion_signatures< __completion_signatures_of_t<_CvrefSender, _Env...>, __with_error_invoke_t<__on_not_callable, _Fun, _Shape, _CvrefSender, _Env...>>; - struct bulk_t { + template + using __completion_signatures2 = // + transform_completion_signatures< + __completion_signatures_of_t<_CvrefSender, _Env...>, + __with_error_invoke2_t<__on_not_callable2, _Fun, _Shape, _Shape, _CvrefSender, _Env...>>; + // TODO (now): use tag to provide appropriate error message + + struct bulk_t; + struct bulk_chunked_t; + struct bulk_unchunked_t; + + template + struct __generic_bulk_t { template requires is_execution_policy_v> STDEXEC_ATTRIBUTE((host, device)) auto operator()(_Sender&& __sndr, _Policy&& __pol, _Shape __shape, _Fun __fun) const @@ -138,14 +166,14 @@ namespace stdexec { auto __domain = __get_early_domain(__sndr); return stdexec::transform_sender( __domain, - __make_sexpr( + __make_sexpr<_Tag>( __data{__pol, __shape, static_cast<_Fun&&>(__fun)}, static_cast<_Sender&&>(__sndr))); } template requires is_execution_policy_v> STDEXEC_ATTRIBUTE((always_inline)) auto operator()(_Policy&& __pol, _Shape __shape, _Fun __fun) const - -> __binder_back { + -> __binder_back<_Tag, _Policy, _Shape, _Fun> { return { {static_cast<_Policy&&>(__pol), static_cast<_Shape&&>(__shape), @@ -163,16 +191,56 @@ namespace stdexec { using _Fun = __nth_member<2>(__0); using __legacy_customizations_t = __types< tag_invoke_t( - bulk_t, + _Tag, get_completion_scheduler_t(get_env_t(_Sender&)), _Sender, _Pol, _Shape, _Fun), - tag_invoke_t(bulk_t, _Sender, _Pol, _Shape, _Fun)>; + tag_invoke_t(_Tag, _Sender, _Pol, _Shape, _Fun)>; + }; + + struct bulk_t : __generic_bulk_t { }; + + struct bulk_chunked_t : __generic_bulk_t { }; + + struct bulk_unchunked_t { + template + STDEXEC_ATTRIBUTE((host, device)) auto operator()(_Sender&& __sndr, _Shape __shape, _Fun __fun) const + -> __well_formed_sender auto { + auto __domain = __get_early_domain(__sndr); + return stdexec::transform_sender( + __domain, + __make_sexpr( + __data{par, __shape, static_cast<_Fun&&>(__fun)}, static_cast<_Sender&&>(__sndr))); + } + + template + STDEXEC_ATTRIBUTE((always_inline)) auto operator()(_Shape __shape, _Fun __fun) const + -> __binder_back { + return { + {static_cast<_Shape&&>(__shape), static_cast<_Fun&&>(__fun)}, + {}, + {} + }; + } + + // This describes how to use the pieces of a bulk sender to find + // legacy customizations of the bulk algorithm. + using _Sender = __1; + using _Shape = __nth_member<1>(__0); + using _Fun = __nth_member<2>(__0); + using __legacy_customizations_t = __types< + tag_invoke_t( + bulk_unchunked_t, + get_completion_scheduler_t(get_env_t(_Sender&)), + _Sender, + _Shape, + _Fun), + tag_invoke_t(bulk_unchunked_t, _Sender, _Shape, _Fun)>; }; - struct __bulk_impl : __sexpr_defaults { + struct __bulk_base_impl : __sexpr_defaults { template using __fun_t = decltype(__decay_t<__data_of<_Sender>>::__fun_); @@ -185,7 +253,24 @@ namespace stdexec { static_assert(sender_expr_for<_Sender, bulk_t>); return {}; }; + }; + + struct __bulk_base2_impl : __sexpr_defaults { + template + using __fun_t = decltype(__decay_t<__data_of<_Sender>>::__fun_); + template + using __shape_t = decltype(__decay_t<__data_of<_Sender>>::__shape_); + + static constexpr auto get_completion_signatures = // + [](_Sender&&, _Env&&...) noexcept + -> __completion_signatures2<__fun_t<_Sender>, __shape_t<_Sender>, __child_of<_Sender>, _Env...> { + static_assert(sender_expr_for<_Sender, bulk_t>); + return {}; + }; + }; + + struct __bulk_chunked_impl : __bulk_base2_impl { //! This implements the core default behavior for `bulk`: //! When setting value, it loops over the shape and invokes the function. //! Note: This is not done in parallel. That is customized by the scheduler. @@ -200,6 +285,47 @@ namespace stdexec { if constexpr (same_as<_Tag, set_value_t>) { // Intercept set_value and dispatch to the bulk operation. using __shape_t = decltype(__state.__shape_); + if constexpr (noexcept(__state.__fun_(__shape_t{}, __shape_t{}, __args...))) { + // The noexcept version that doesn't need try/catch: + __state.__fun_(static_cast<__shape_t>(0), __state.__shape_, __args...); + _Tag()(static_cast<_Receiver&&>(__rcvr), static_cast<_Args&&>(__args)...); + } else { + try { + __state.__fun_(static_cast<__shape_t>(0), __state.__shape_, __args...); + _Tag()(static_cast<_Receiver&&>(__rcvr), static_cast<_Args&&>(__args)...); + } catch (...) { + stdexec::set_error(static_cast<_Receiver&&>(__rcvr), std::current_exception()); + } + } + } else { + _Tag()(static_cast<_Receiver&&>(__rcvr), static_cast<_Args&&>(__args)...); + } + }; + }; + + struct __bulk_unchunked_impl : __bulk_base_impl { + //! This implements the core default behavior for `bulk`: + //! When setting value, it loops over the shape and invokes the function. + //! Note: This is not done in parallel. That is customized by the scheduler. + //! See, e.g., static_thread_pool::bulk_receiver::__t. + static constexpr auto complete = // + []( + __ignore, + _State& __state, + _Receiver& __rcvr, + _Tag, + _Args&&... __args) noexcept -> void { + if constexpr (std::same_as<_Tag, set_value_t>) { + // Intercept set_value and dispatch to the bulk operation. + using __shape_t = decltype(__state.__shape_); + constexpr bool __scheduler_available = + requires { get_completion_scheduler(get_env(__rcvr)); }; + if constexpr (__scheduler_available) { + // This default implementation doesn't run a scheduler with concurrent progres guarantees. + constexpr auto __guarantee = get_forward_progress_guarantee( + get_completion_scheduler(get_env(__rcvr))); + static_assert(__guarantee != forward_progress_guarantee::concurrent); + } if constexpr (noexcept(__state.__fun_(__shape_t{}, __args...))) { // The noexcept version that doesn't need try/catch: for (__shape_t __i{}; __i != __state.__shape_; ++__i) { @@ -221,13 +347,55 @@ namespace stdexec { } }; }; + + struct __bulk_impl : __bulk_base_impl { + //! This implements the core default behavior for `bulk`: + //! When setting value, it loops over the shape and invokes the function. + //! Note: This is not done in parallel. That is customized by the scheduler. + //! See, e.g., static_thread_pool::bulk_receiver::__t. + static constexpr auto complete = // + []( + __ignore, + _State& __state, + _Receiver& __rcvr, + _Tag, + _Args&&... __args) noexcept -> void { + if constexpr (std::same_as<_Tag, set_value_t>) { + using __shape_t = decltype(__state.__shape_); + + constexpr bool __nothrow = noexcept(__state.__fun_(__state.__shape_, __args...)); + auto __new_f = + [__func = std::move(__state.__fun_)]( + __shape_t __begin, __shape_t __end, auto&&... __vs) mutable noexcept(__nothrow) { + while (__begin != __end) + __func(__begin++, __vs...); + }; + + auto __chunked_data = __data{__state.__pol_, __state.__shape_, std::move(__new_f)}; + __bulk_chunked_impl::complete( + _Tag(), __chunked_data, __rcvr, _Tag(), std::forward<_Args>(__args)...); + } else { + _Tag()(static_cast<_Receiver&&>(__rcvr), static_cast<_Args&&>(__args)...); + } + }; + }; } // namespace __bulk using __bulk::bulk_t; + using __bulk::bulk_chunked_t; + using __bulk::bulk_unchunked_t; inline constexpr bulk_t bulk{}; + inline constexpr bulk_chunked_t bulk_chunked{}; + inline constexpr bulk_unchunked_t bulk_unchunked{}; template <> struct __sexpr_impl : __bulk::__bulk_impl { }; + + template <> + struct __sexpr_impl : __bulk::__bulk_chunked_impl { }; + + template <> + struct __sexpr_impl : __bulk::__bulk_unchunked_impl { }; } // namespace stdexec STDEXEC_PRAGMA_POP() diff --git a/include/stdexec/__detail/__execution_fwd.hpp b/include/stdexec/__detail/__execution_fwd.hpp index af57bd69f..01d0c398a 100644 --- a/include/stdexec/__detail/__execution_fwd.hpp +++ b/include/stdexec/__detail/__execution_fwd.hpp @@ -245,10 +245,16 @@ namespace stdexec { ////////////////////////////////////////////////////////////////////////////////////////////////// namespace __bulk { struct bulk_t; + struct bulk_chunked_t; + struct bulk_unchunked_t; } // namespace __bulk using __bulk::bulk_t; + using __bulk::bulk_chunked_t; + using __bulk::bulk_unchunked_t; extern const bulk_t bulk; + extern const bulk_chunked_t bulk_chunked; + extern const bulk_unchunked_t bulk_unchunked; ////////////////////////////////////////////////////////////////////////////////////////////////// namespace __split { diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index 2c917ad91..438af9c7d 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -49,6 +49,18 @@ namespace { (void) snd; } + // TEST_CASE("bulk_chunked returns a sender", "[adaptors][bulk]") { + // auto snd = ex::bulk_chunked(ex::just(19), 8, [](int, int, int) { }); + // static_assert(ex::sender); + // (void) snd; + // } + + TEST_CASE("bulk_unchunked returns a sender", "[adaptors][bulk]") { + auto snd = ex::bulk_unchunked(ex::just(19), 8, [](int, int) { }); + static_assert(ex::sender); + (void) snd; + } + TEST_CASE("bulk with environment returns a sender", "[adaptors][bulk]") { auto snd = ex::bulk(ex::just(19), ex::par, 8, [](int, int) { }); static_assert(ex::sender_in); From d04dfe52f72640d4664b7c5bdac864dbf60d97e4 Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Sun, 23 Mar 2025 16:49:40 +0200 Subject: [PATCH 05/16] Fix `bulk_chunked`. Improve the implementation. --- include/stdexec/__detail/__bulk.hpp | 141 +++++++++++----------- test/stdexec/algos/adaptors/test_bulk.cpp | 10 +- 2 files changed, 75 insertions(+), 76 deletions(-) diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index ca616011b..3de2a1ef1 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -36,14 +36,9 @@ namespace stdexec { ///////////////////////////////////////////////////////////////////////////// // [execution.senders.adaptors.bulk] namespace __bulk { - inline constexpr __mstring __bulk_context = - "In stdexec::bulk(Sender, Policy, Shape, Function)..."_mstr; - inline constexpr __mstring __bulk_chunked_context = - "In stdexec::bulk_chunked(Sender, Policy, Shape, Function)..."_mstr; - inline constexpr __mstring __bulk_unchunked_context = - "In stdexec::bulk_unchunked(Sender, Shape, Function)..."_mstr; - using __on_not_callable = __callable_error<__bulk_context>; - using __on_not_callable2 = __callable_error<__bulk_chunked_context>; + struct bulk_t; + struct bulk_chunked_t; + struct bulk_unchunked_t; //! Wrapper for a policy object. //! @@ -65,7 +60,7 @@ namespace stdexec { template <> struct __policy_wrapper { - /*implicit*/ __policy_wrapper(sequenced_policy) { + /*implicit*/ __policy_wrapper(const sequenced_policy&) { } const sequenced_policy& __get() const noexcept { @@ -114,50 +109,65 @@ namespace stdexec { template __data(_Pol, _Shape, _Fun) -> __data<_Pol, _Shape, _Fun>; + template + struct __bulk_traits; + + template <> + struct __bulk_traits { + using __on_not_callable = + __callable_error<"In stdexec::bulk(Sender, Policy, Shape, Function)..."_mstr>; + + // Curried function, after passing the required indices. + template + using __fun_curried = + __mbind_front<__mtry_catch_q<__nothrow_invocable_t, __on_not_callable>, _Fun, _Shape>; + }; + + template <> + struct __bulk_traits { + using __on_not_callable = + __callable_error<"In stdexec::bulk_chunked(Sender, Policy, Shape, Function)..."_mstr>; + + // Curried function, after passing the required indices. + template + using __fun_curried = + __mbind_front<__mtry_catch_q<__nothrow_invocable_t, __on_not_callable>, _Fun, _Shape, _Shape>; + }; + + template <> + struct __bulk_traits { + using __on_not_callable = + __callable_error<"In stdexec::bulk_unchunked(Sender, Shape, Function)..."_mstr>; + + // Curried function, after passing the required indices. + template + using __fun_curried = + __mbind_front<__mtry_catch_q<__nothrow_invocable_t, __on_not_callable>, _Fun, _Shape>; + }; + template using __decay_ref = __decay_t<_Ty>&; - template + template using __with_error_invoke_t = // __if< __value_types_t< __completion_signatures_of_t<_CvrefSender, _Env...>, __mtransform< __q<__decay_ref>, - __mbind_front<__mtry_catch_q<__nothrow_invocable_t, _Catch>, _Fun, _Shape>>, + typename __bulk_traits<_AlgoTag>::template __fun_curried<_Fun, _Shape>>, __q<__mand>>, completion_signatures<>, __eptr_completion>; - template - using __with_error_invoke2_t = // - __if< - __value_types_t< - __completion_signatures_of_t<_CvrefSender, _Env...>, - __mtransform< - __q<__decay_ref>, - __mbind_front<__mtry_catch_q<__nothrow_invocable_t, _Catch>, _Fun, _Shape, _Shape>>, - __q<__mand>>, - completion_signatures<>, - __eptr_completion>; - - template + + + template using __completion_signatures = // transform_completion_signatures< __completion_signatures_of_t<_CvrefSender, _Env...>, - __with_error_invoke_t<__on_not_callable, _Fun, _Shape, _CvrefSender, _Env...>>; - - template - using __completion_signatures2 = // - transform_completion_signatures< - __completion_signatures_of_t<_CvrefSender, _Env...>, - __with_error_invoke2_t<__on_not_callable2, _Fun, _Shape, _Shape, _CvrefSender, _Env...>>; - // TODO (now): use tag to provide appropriate error message - - struct bulk_t; - struct bulk_chunked_t; - struct bulk_unchunked_t; + __with_error_invoke_t<_AlgoTag, _Fun, _Shape, _CvrefSender, _Env...>>; - template + template struct __generic_bulk_t { template requires is_execution_policy_v> @@ -166,14 +176,14 @@ namespace stdexec { auto __domain = __get_early_domain(__sndr); return stdexec::transform_sender( __domain, - __make_sexpr<_Tag>( + __make_sexpr<_AlgoTag>( __data{__pol, __shape, static_cast<_Fun&&>(__fun)}, static_cast<_Sender&&>(__sndr))); } template requires is_execution_policy_v> STDEXEC_ATTRIBUTE((always_inline)) auto operator()(_Policy&& __pol, _Shape __shape, _Fun __fun) const - -> __binder_back<_Tag, _Policy, _Shape, _Fun> { + -> __binder_back<_AlgoTag, _Policy, _Shape, _Fun> { return { {static_cast<_Policy&&>(__pol), static_cast<_Shape&&>(__shape), @@ -191,13 +201,13 @@ namespace stdexec { using _Fun = __nth_member<2>(__0); using __legacy_customizations_t = __types< tag_invoke_t( - _Tag, + _AlgoTag, get_completion_scheduler_t(get_env_t(_Sender&)), _Sender, _Pol, _Shape, _Fun), - tag_invoke_t(_Tag, _Sender, _Pol, _Shape, _Fun)>; + tag_invoke_t(_AlgoTag, _Sender, _Pol, _Shape, _Fun)>; }; struct bulk_t : __generic_bulk_t { }; @@ -240,39 +250,30 @@ namespace stdexec { tag_invoke_t(bulk_unchunked_t, _Sender, _Shape, _Fun)>; }; - struct __bulk_base_impl : __sexpr_defaults { - template - using __fun_t = decltype(__decay_t<__data_of<_Sender>>::__fun_); - - template - using __shape_t = decltype(__decay_t<__data_of<_Sender>>::__shape_); - - static constexpr auto get_completion_signatures = // - [](_Sender&&, _Env&&...) noexcept - -> __completion_signatures<__fun_t<_Sender>, __shape_t<_Sender>, __child_of<_Sender>, _Env...> { - static_assert(sender_expr_for<_Sender, bulk_t>); - return {}; - }; - }; - - struct __bulk_base2_impl : __sexpr_defaults { + template + struct __bulk_impl_base : __sexpr_defaults { template using __fun_t = decltype(__decay_t<__data_of<_Sender>>::__fun_); template using __shape_t = decltype(__decay_t<__data_of<_Sender>>::__shape_); - static constexpr auto get_completion_signatures = // - [](_Sender&&, _Env&&...) noexcept - -> __completion_signatures2<__fun_t<_Sender>, __shape_t<_Sender>, __child_of<_Sender>, _Env...> { + static constexpr auto get_completion_signatures = // + [](_Sender&&, _Env&&...) noexcept // + -> __completion_signatures< + _AlgoTag, + __fun_t<_Sender>, + __shape_t<_Sender>, + __child_of<_Sender>, + _Env...> { static_assert(sender_expr_for<_Sender, bulk_t>); return {}; }; }; - struct __bulk_chunked_impl : __bulk_base2_impl { - //! This implements the core default behavior for `bulk`: - //! When setting value, it loops over the shape and invokes the function. + struct __bulk_chunked_impl : __bulk_impl_base { + //! This implements the core default behavior for `bulk_chunked`: + //! When setting value, it calls the function with the entire range. //! Note: This is not done in parallel. That is customized by the scheduler. //! See, e.g., static_thread_pool::bulk_receiver::__t. static constexpr auto complete = // @@ -303,11 +304,11 @@ namespace stdexec { }; }; - struct __bulk_unchunked_impl : __bulk_base_impl { - //! This implements the core default behavior for `bulk`: + struct __bulk_unchunked_impl : __bulk_impl_base { + //! This implements the core default behavior for `bulk_unchunked`: //! When setting value, it loops over the shape and invokes the function. - //! Note: This is not done in parallel. That is customized by the scheduler. - //! See, e.g., static_thread_pool::bulk_receiver::__t. + //! Note: This is not done in concurrently. That is customized by the scheduler. + //! Calling it on a scheduler that is not concurrent is an error. static constexpr auto complete = // []( __ignore, @@ -348,11 +349,9 @@ namespace stdexec { }; }; - struct __bulk_impl : __bulk_base_impl { + struct __bulk_impl : __bulk_impl_base { //! This implements the core default behavior for `bulk`: - //! When setting value, it loops over the shape and invokes the function. - //! Note: This is not done in parallel. That is customized by the scheduler. - //! See, e.g., static_thread_pool::bulk_receiver::__t. + //! This is implemented in terms of `bulk_chunked`. static constexpr auto complete = // []( __ignore, diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index 438af9c7d..98383e682 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -49,11 +49,11 @@ namespace { (void) snd; } - // TEST_CASE("bulk_chunked returns a sender", "[adaptors][bulk]") { - // auto snd = ex::bulk_chunked(ex::just(19), 8, [](int, int, int) { }); - // static_assert(ex::sender); - // (void) snd; - // } + TEST_CASE("bulk_chunked returns a sender", "[adaptors][bulk]") { + auto snd = ex::bulk_chunked(ex::just(19), ex::par, 8, [](int, int, int) { }); + static_assert(ex::sender); + (void) snd; + } TEST_CASE("bulk_unchunked returns a sender", "[adaptors][bulk]") { auto snd = ex::bulk_unchunked(ex::just(19), 8, [](int, int) { }); From d5801a6957649f39c42f04ef2d971df8b21c7b5f Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Sun, 23 Mar 2025 17:50:14 +0200 Subject: [PATCH 06/16] Add more tests for bulk_chunked and bulk_unchunked --- test/stdexec/algos/adaptors/test_bulk.cpp | 621 ++++++++++++++++++++++ 1 file changed, 621 insertions(+) diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index 98383e682..1538056a2 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -34,6 +35,13 @@ namespace { Counter[i]++; } + template + void function_range(Shape b, Shape e) { + while (b != e) { + Counter[b++]++; + } + } + template struct function_object_t { int* Counter; @@ -43,6 +51,17 @@ namespace { } }; + template + struct function_object_range_t { + int* Counter; + + void operator()(Shape b, Shape e) { + while (b != e) { + Counter[b++]++; + } + } + }; + TEST_CASE("bulk returns a sender", "[adaptors][bulk]") { auto snd = ex::bulk(ex::just(19), ex::par, 8, [](int, int) { }); static_assert(ex::sender); @@ -67,11 +86,33 @@ namespace { (void) snd; } + TEST_CASE("bulk_chunked with environment returns a sender", "[adaptors][bulk]") { + auto snd = ex::bulk_chunked(ex::just(19), ex::par, 8, [](int, int, int) { }); + static_assert(ex::sender_in); + (void) snd; + } + + TEST_CASE("bulk_unchunked with environment returns a sender", "[adaptors][bulk]") { + auto snd = ex::bulk_unchunked(ex::just(19), 8, [](int, int) { }); + static_assert(ex::sender_in); + (void) snd; + } + TEST_CASE("bulk can be piped", "[adaptors][bulk]") { ex::sender auto snd = ex::just() | ex::bulk(ex::par, 42, [](int) { }); (void) snd; } + TEST_CASE("bulk_chunked can be piped", "[adaptors][bulk]") { + ex::sender auto snd = ex::just() | ex::bulk_chunked(ex::par, 42, [](int, int) { }); + (void) snd; + } + + TEST_CASE("bulk_unchunked can be piped", "[adaptors][bulk]") { + ex::sender auto snd = ex::just() | ex::bulk_unchunked(42, [](int) { }); + (void) snd; + } + TEST_CASE("bulk keeps values_type from input sender", "[adaptors][bulk]") { constexpr int n = 42; check_val_types>>(ex::just() | ex::bulk(ex::par, n, [](int) { })); @@ -81,6 +122,26 @@ namespace { ex::just(4.2, std::string{}) | ex::bulk(ex::par, n, [](int, double, std::string) { })); } + TEST_CASE("bulk_chunked keeps values_type from input sender", "[adaptors][bulk]") { + constexpr int n = 42; + check_val_types>>(ex::just() | ex::bulk_chunked(ex::par, n, [](int, int) { + })); + check_val_types>>( + ex::just(4.2) | ex::bulk_chunked(ex::par, n, [](int, int, double) { })); + check_val_types>>( + ex::just(4.2, std::string{}) + | ex::bulk_chunked(ex::par, n, [](int, int, double, std::string) { })); + } + + TEST_CASE("bulk_unchunked keeps values_type from input sender", "[adaptors][bulk]") { + constexpr int n = 42; + check_val_types>>(ex::just() | ex::bulk_unchunked(n, [](int) { })); + check_val_types>>( + ex::just(4.2) | ex::bulk_unchunked(n, [](int, double) { })); + check_val_types>>( + ex::just(4.2, std::string{}) | ex::bulk_unchunked(n, [](int, double, std::string) { })); + } + TEST_CASE("bulk keeps error_types from input sender", "[adaptors][bulk]") { constexpr int n = 42; inline_scheduler sched1{}; @@ -99,6 +160,44 @@ namespace { ex::transfer_just(sched3) | ex::bulk(ex::par, n, [](int) { throw std::logic_error{"err"}; })); } + TEST_CASE("bulk_chunked keeps error_types from input sender", "[adaptors][bulk]") { + constexpr int n = 42; + inline_scheduler sched1{}; + error_scheduler sched2{}; + error_scheduler sched3{43}; + + check_err_types>( // + ex::transfer_just(sched1) | ex::bulk_chunked(ex::par, n, [](int, int) noexcept { })); + check_err_types>( // + ex::transfer_just(sched2) | ex::bulk_chunked(ex::par, n, [](int, int) noexcept { })); + check_err_types>( // + ex::just_error(n) | ex::bulk_chunked(ex::par, n, [](int, int) noexcept { })); + check_err_types>( // + ex::transfer_just(sched3) | ex::bulk_chunked(ex::par, n, [](int, int) noexcept { })); + check_err_types>( // + ex::transfer_just(sched3) + | ex::bulk_chunked(ex::par, n, [](int, int) { throw std::logic_error{"err"}; })); + } + + TEST_CASE("bulk_unchunked keeps error_types from input sender", "[adaptors][bulk]") { + constexpr int n = 42; + inline_scheduler sched1{}; + error_scheduler sched2{}; + error_scheduler sched3{43}; + + check_err_types>( // + ex::transfer_just(sched1) | ex::bulk_unchunked(n, [](int) noexcept { })); + check_err_types>( // + ex::transfer_just(sched2) | ex::bulk_unchunked(n, [](int) noexcept { })); + check_err_types>( // + ex::just_error(n) | ex::bulk_unchunked(n, [](int) noexcept { })); + check_err_types>( // + ex::transfer_just(sched3) | ex::bulk_unchunked(n, [](int) noexcept { })); + check_err_types>( // + ex::transfer_just(sched3) + | ex::bulk_unchunked(n, [](int) { throw std::logic_error{"err"}; })); + } + TEST_CASE("bulk can be used with a function", "[adaptors][bulk]") { constexpr int n = 9; static int counter[n]{}; @@ -113,6 +212,35 @@ namespace { } } + TEST_CASE("bulk_chunked can be used with a function", "[adaptors][bulk]") { + constexpr int n = 9; + static int counter[n]{}; + std::fill_n(counter, n, 0); + + ex::sender auto snd = ex::just() + | ex::bulk_chunked(ex::par, n, function_range); + auto op = ex::connect(std::move(snd), expect_void_receiver{}); + ex::start(op); + + for (int i = 0; i < n; i++) { + CHECK(counter[i] == 1); + } + } + + TEST_CASE("bulk_unchunked can be used with a function", "[adaptors][bulk]") { + constexpr int n = 9; + static int counter[n]{}; + std::fill_n(counter, n, 0); + + ex::sender auto snd = ex::just() | ex::bulk_unchunked(n, function); + auto op = ex::connect(std::move(snd), expect_void_receiver{}); + ex::start(op); + + for (int i = 0; i < n; i++) { + CHECK(counter[i] == 1); + } + } + TEST_CASE("bulk can be used with a function object", "[adaptors][bulk]") { constexpr int n = 9; int counter[n]{0}; @@ -127,6 +255,34 @@ namespace { } } + TEST_CASE("bulk_chunked can be used with a function object", "[adaptors][bulk]") { + constexpr int n = 9; + int counter[n]{0}; + function_object_range_t fn{counter}; + + ex::sender auto snd = ex::just() | ex::bulk_chunked(ex::par, n, fn); + auto op = ex::connect(std::move(snd), expect_void_receiver{}); + ex::start(op); + + for (int i = 0; i < n; i++) { + CHECK(counter[i] == 1); + } + } + + TEST_CASE("bulk_unchunked can be used with a function object", "[adaptors][bulk]") { + constexpr int n = 9; + int counter[n]{0}; + function_object_t fn{counter}; + + ex::sender auto snd = ex::just() | ex::bulk_unchunked(n, fn); + auto op = ex::connect(std::move(snd), expect_void_receiver{}); + ex::start(op); + + for (int i = 0; i < n; i++) { + CHECK(counter[i] == 1); + } + } + TEST_CASE("bulk can be used with a lambda", "[adaptors][bulk]") { constexpr int n = 9; int counter[n]{0}; @@ -140,6 +296,67 @@ namespace { } } + TEST_CASE("bulk_chunked can be used with a lambda", "[adaptors][bulk]") { + constexpr int n = 9; + int counter[n]{0}; + + ex::sender auto snd = ex::just() | ex::bulk_chunked(ex::par, n, [&](int b, int e) { + while (b < e) + counter[b++]++; + }); + auto op = ex::connect(std::move(snd), expect_void_receiver{}); + ex::start(op); + + for (int i = 0; i < n; i++) { + CHECK(counter[i] == 1); + } + } + + TEST_CASE("bulk_unchunked can be used with a lambda", "[adaptors][bulk]") { + constexpr int n = 9; + int counter[n]{0}; + + ex::sender auto snd = ex::just() | ex::bulk_unchunked(n, [&](int i) { counter[i]++; }); + auto op = ex::connect(std::move(snd), expect_void_receiver{}); + ex::start(op); + + for (int i = 0; i < n; i++) { + CHECK(counter[i] == 1); + } + } + + TEST_CASE("bulk works with all standard execution policies", "[adaptors][bulk]") { + ex::sender auto snd1 = ex::just() | ex::bulk(ex::seq, 9, [](int) { }); + ex::sender auto snd2 = ex::just() | ex::bulk(ex::par, 9, [](int) { }); + ex::sender auto snd3 = ex::just() | ex::bulk(ex::par_unseq, 9, [](int) { }); + ex::sender auto snd4 = ex::just() | ex::bulk(ex::unseq, 9, [](int) { }); + + static_assert(ex::sender); + static_assert(ex::sender); + static_assert(ex::sender); + static_assert(ex::sender); + (void) snd1; + (void) snd2; + (void) snd3; + (void) snd4; + } + + TEST_CASE("bulk_chunked works with all standard execution policies", "[adaptors][bulk]") { + ex::sender auto snd1 = ex::just() | ex::bulk_chunked(ex::seq, 9, [](int, int) { }); + ex::sender auto snd2 = ex::just() | ex::bulk_chunked(ex::par, 9, [](int, int) { }); + ex::sender auto snd3 = ex::just() | ex::bulk_chunked(ex::par_unseq, 9, [](int, int) { }); + ex::sender auto snd4 = ex::just() | ex::bulk_chunked(ex::unseq, 9, [](int, int) { }); + + static_assert(ex::sender); + static_assert(ex::sender); + static_assert(ex::sender); + static_assert(ex::sender); + (void) snd1; + (void) snd2; + (void) snd3; + (void) snd4; + } + TEST_CASE("bulk forwards values", "[adaptors][bulk]") { constexpr int n = 9; constexpr int magic_number = 42; @@ -159,6 +376,46 @@ namespace { } } + TEST_CASE("bulk_chunked forwards values", "[adaptors][bulk]") { + constexpr int n = 9; + constexpr int magic_number = 42; + int counter[n]{0}; + + auto snd = ex::just(magic_number) // + | ex::bulk_chunked(ex::par, n, [&](int b, int e, int val) { + if (val == magic_number) { + while (b < e) { + counter[b++]++; + } + } + }); + auto op = ex::connect(std::move(snd), expect_value_receiver{magic_number}); + ex::start(op); + + for (int i = 0; i < n; i++) { + CHECK(counter[i] == 1); + } + } + + TEST_CASE("bulk_unchunked forwards values", "[adaptors][bulk]") { + constexpr int n = 9; + constexpr int magic_number = 42; + int counter[n]{0}; + + auto snd = ex::just(magic_number) // + | ex::bulk_unchunked(n, [&](int i, int val) { + if (val == magic_number) { + counter[i]++; + } + }); + auto op = ex::connect(std::move(snd), expect_value_receiver{magic_number}); + ex::start(op); + + for (int i = 0; i < n; i++) { + CHECK(counter[i] == 1); + } + } + TEST_CASE("bulk forwards values that can be taken by reference", "[adaptors][bulk]") { constexpr std::size_t n = 9; std::vector vals(n, 0); @@ -173,6 +430,36 @@ namespace { ex::start(op); } + TEST_CASE("bulk_chunked forwards values that can be taken by reference", "[adaptors][bulk]") { + constexpr std::size_t n = 9; + std::vector vals(n, 0); + std::vector vals_expected(n); + std::iota(vals_expected.begin(), vals_expected.end(), 0); + + auto snd = + ex::just(std::move(vals)) // + | ex::bulk_chunked(ex::par, n, [&](std::size_t b, std::size_t e, std::vector& vals) { + while (b < e) + vals[b++] = static_cast(b); + }); + auto op = ex::connect(std::move(snd), expect_value_receiver{vals_expected}); + ex::start(op); + } + + TEST_CASE("bulk_unchunked forwards values that can be taken by reference", "[adaptors][bulk]") { + constexpr std::size_t n = 9; + std::vector vals(n, 0); + std::vector vals_expected(n); + std::iota(vals_expected.begin(), vals_expected.end(), 0); + + auto snd = ex::just(std::move(vals)) // + | ex::bulk_unchunked(n, [&](std::size_t i, std::vector& vals) { + vals[i] = static_cast(i); + }); + auto op = ex::connect(std::move(snd), expect_value_receiver{vals_expected}); + ex::start(op); + } + TEST_CASE("bulk cannot be used to change the value type", "[adaptors][bulk]") { constexpr int magic_number = 42; constexpr int n = 2; @@ -184,6 +471,29 @@ namespace { ex::start(op); } + TEST_CASE("bulk_chunked cannot be used to change the value type", "[adaptors][bulk]") { + constexpr int magic_number = 42; + constexpr int n = 2; + + auto snd = ex::just(magic_number) | ex::bulk_chunked(ex::par, n, [](int, int, int) { + return function_object_range_t{nullptr}; + }); + + auto op = ex::connect(std::move(snd), expect_value_receiver{magic_number}); + ex::start(op); + } + + TEST_CASE("bulk_unchunked cannot be used to change the value type", "[adaptors][bulk]") { + constexpr int magic_number = 42; + constexpr int n = 2; + + auto snd = ex::just(magic_number) + | ex::bulk_unchunked(n, [](int, int) { return function_object_t{nullptr}; }); + + auto op = ex::connect(std::move(snd), expect_value_receiver{magic_number}); + ex::start(op); + } + TEST_CASE("bulk can throw, and set_error will be called", "[adaptors][bulk]") { constexpr int n = 2; @@ -193,6 +503,24 @@ namespace { ex::start(op); } + TEST_CASE("bulk_chunked can throw, and set_error will be called", "[adaptors][bulk]") { + constexpr int n = 2; + + auto snd = ex::just() // + | ex::bulk_chunked(ex::par, n, [](int, int) -> int { throw std::logic_error{"err"}; }); + auto op = ex::connect(std::move(snd), expect_error_receiver{}); + ex::start(op); + } + + TEST_CASE("bulk_unchunked can throw, and set_error will be called", "[adaptors][bulk]") { + constexpr int n = 2; + + auto snd = ex::just() // + | ex::bulk_unchunked(n, [](int) -> int { throw std::logic_error{"err"}; }); + auto op = ex::connect(std::move(snd), expect_error_receiver{}); + ex::start(op); + } + TEST_CASE("bulk function is not called on error", "[adaptors][bulk]") { constexpr int n = 2; int called{}; @@ -203,6 +531,26 @@ namespace { ex::start(op); } + TEST_CASE("bulk_chunked function is not called on error", "[adaptors][bulk]") { + constexpr int n = 2; + int called{}; + + auto snd = ex::just_error(std::string{"err"}) + | ex::bulk_chunked(ex::par, n, [&called](int, int) { called++; }); + auto op = ex::connect(std::move(snd), expect_error_receiver{std::string{"err"}}); + ex::start(op); + } + + TEST_CASE("bulk_unchunked function is not called on error", "[adaptors][bulk]") { + constexpr int n = 2; + int called{}; + + auto snd = ex::just_error(std::string{"err"}) + | ex::bulk_unchunked(n, [&called](int) { called++; }); + auto op = ex::connect(std::move(snd), expect_error_receiver{std::string{"err"}}); + ex::start(op); + } + TEST_CASE("bulk function in not called on stop", "[adaptors][bulk]") { constexpr int n = 2; int called{}; @@ -212,6 +560,24 @@ namespace { ex::start(op); } + TEST_CASE("bulk_chunked function in not called on stop", "[adaptors][bulk]") { + constexpr int n = 2; + int called{}; + + auto snd = ex::just_stopped() | ex::bulk_chunked(ex::par, n, [&called](int, int) { called++; }); + auto op = ex::connect(std::move(snd), expect_stopped_receiver{}); + ex::start(op); + } + + TEST_CASE("bulk_unchunked function in not called on stop", "[adaptors][bulk]") { + constexpr int n = 2; + int called{}; + + auto snd = ex::just_stopped() | ex::bulk_unchunked(n, [&called](int) { called++; }); + auto op = ex::connect(std::move(snd), expect_stopped_receiver{}); + ex::start(op); + } + TEST_CASE("bulk works with static thread pool", "[adaptors][bulk]") { exec::static_thread_pool pool{4}; ex::scheduler auto sch = pool.get_scheduler(); @@ -308,6 +674,225 @@ namespace { } } + TEST_CASE("bulk_chunked works with static thread pool", "[adaptors][bulk]") { + exec::static_thread_pool pool{4}; + ex::scheduler auto sch = pool.get_scheduler(); + + SECTION("Without values in the set_value channel") { + for (std::size_t n = 0; n < 9u; n++) { + std::vector counter(n, 42); + + auto snd = ex::transfer_just(sch) + | ex::bulk_chunked( + ex::par, + n, + [&counter](std::size_t b, std::size_t e) { + while (b < e) + counter[b++] = 0; + }) + | ex::bulk_chunked(ex::par, n, [&counter](std::size_t b, std::size_t e) { + while (b < e) + counter[b++]++; + }); + stdexec::sync_wait(std::move(snd)); + + const std::size_t actual = + static_cast(std::count(counter.begin(), counter.end(), 1)); + const std::size_t expected = n; + + CHECK(expected == actual); + } + } + + SECTION("With values in the set_value channel") { + for (std::size_t n = 0; n < 9; n++) { + std::vector counter(n, 42); + + auto snd = ex::transfer_just(sch, 42) + | ex::bulk_chunked( + ex::par, + n, + [&counter](std::size_t b, std::size_t e, int val) { + if (val == 42) { + while (b < e) + counter[b++] = 0; + } + }) + | ex::bulk_chunked(ex::par, n, [&counter](std::size_t b, std::size_t e, int val) { + if (val == 42) { + while (b < e) + counter[b++]++; + } + }); + auto [val] = stdexec::sync_wait(std::move(snd)).value(); + + CHECK(val == 42); + + const std::size_t actual = + static_cast(std::count(counter.begin(), counter.end(), 1)); + const std::size_t expected = n; + + CHECK(expected == actual); + } + } + + SECTION("With values in the set_value channel that can be taken by reference") { + for (std::size_t n = 0; n < 9; n++) { + std::vector vals(n, 0); + std::vector vals_expected(n); + std::iota(vals_expected.begin(), vals_expected.end(), 1); + + auto snd = + ex::transfer_just(sch, std::move(vals)) + | ex::bulk_chunked( + ex::par, + n, + [](std::size_t b, std::size_t e, std::vector& vals) { + while (b < e) { + vals[b] = static_cast(b); + ++b; + } + }) + | ex::bulk_chunked(ex::par, n, [](std::size_t b, std::size_t e, std::vector& vals) { + while (b < e) + ++vals[b++]; + }); + auto [vals_actual] = stdexec::sync_wait(std::move(snd)).value(); + + CHECK(vals_actual == vals_expected); + } + } + + SECTION("With exception") { + constexpr int n = 9; + auto snd = ex::transfer_just(sch) | ex::bulk_chunked(ex::par, n, [](int, int) { + throw std::runtime_error("bulk_chunked"); + }); + + CHECK_THROWS_AS(stdexec::sync_wait(std::move(snd)), std::runtime_error); + } + + SECTION("With concurrent enqueueing") { + constexpr std::size_t n = 4; + std::vector counters_1(n, 0); + std::vector counters_2(n, 0); + + stdexec::sender auto snd = stdexec::when_all( + stdexec::schedule(sch) + | stdexec::bulk_chunked( + ex::par, + n, + [&](std::size_t b, std::size_t e) { + while (b < e) + counters_1[b++]++; + }), + stdexec::schedule(sch) + | stdexec::bulk_chunked(ex::par, n, [&](std::size_t b, std::size_t e) { + while (b < e) + counters_2[b++]++; + })); + + stdexec::sync_wait(std::move(snd)); + + CHECK(std::count(counters_1.begin(), counters_1.end(), 1) == static_cast(n)); + CHECK(std::count(counters_2.begin(), counters_2.end(), 1) == static_cast(n)); + } + } + + TEST_CASE("bulk_unchunked works with static thread pool", "[adaptors][bulk]") { + exec::static_thread_pool pool{4}; + ex::scheduler auto sch = pool.get_scheduler(); + + SECTION("Without values in the set_value channel") { + for (std::size_t n = 0; n < 9u; n++) { + std::vector counter(n, 42); + + auto snd = ex::transfer_just(sch) + | ex::bulk_unchunked(n, [&counter](std::size_t idx) { counter[idx] = 0; }) + | ex::bulk_unchunked(n, [&counter](std::size_t idx) { counter[idx]++; }); + stdexec::sync_wait(std::move(snd)); + + const std::size_t actual = + static_cast(std::count(counter.begin(), counter.end(), 1)); + const std::size_t expected = n; + + CHECK(expected == actual); + } + } + + SECTION("With values in the set_value channel") { + for (std::size_t n = 0; n < 9; n++) { + std::vector counter(n, 42); + + auto snd = ex::transfer_just(sch, 42) + | ex::bulk_unchunked( + n, + [&counter](std::size_t idx, int val) { + if (val == 42) { + counter[idx] = 0; + } + }) + | ex::bulk_unchunked(n, [&counter](std::size_t idx, int val) { + if (val == 42) { + counter[idx]++; + } + }); + auto [val] = stdexec::sync_wait(std::move(snd)).value(); + + CHECK(val == 42); + + const std::size_t actual = + static_cast(std::count(counter.begin(), counter.end(), 1)); + const std::size_t expected = n; + + CHECK(expected == actual); + } + } + + SECTION("With values in the set_value channel that can be taken by reference") { + for (std::size_t n = 0; n < 9; n++) { + std::vector vals(n, 0); + std::vector vals_expected(n); + std::iota(vals_expected.begin(), vals_expected.end(), 1); + + auto snd = + ex::transfer_just(sch, std::move(vals)) + | ex::bulk_unchunked( + n, [](std::size_t idx, std::vector& vals) { vals[idx] = static_cast(idx); }) + | ex::bulk_unchunked(n, [](std::size_t idx, std::vector& vals) { ++vals[idx]; }); + auto [vals_actual] = stdexec::sync_wait(std::move(snd)).value(); + + CHECK(vals_actual == vals_expected); + } + } + + SECTION("With exception") { + constexpr int n = 9; + auto snd = ex::transfer_just(sch) + | ex::bulk_unchunked(n, [](int) { throw std::runtime_error("bulk_unchunked"); }); + + CHECK_THROWS_AS(stdexec::sync_wait(std::move(snd)), std::runtime_error); + } + + SECTION("With concurrent enqueueing") { + constexpr std::size_t n = 4; + std::vector counters_1(n, 0); + std::vector counters_2(n, 0); + + stdexec::sender auto snd = stdexec::when_all( + stdexec::schedule(sch) + | stdexec::bulk_unchunked(n, [&](std::size_t id) { counters_1[id]++; }), + stdexec::schedule(sch) + | stdexec::bulk_unchunked(n, [&](std::size_t id) { counters_2[id]++; })); + + stdexec::sync_wait(std::move(snd)); + + CHECK(std::count(counters_1.begin(), counters_1.end(), 1) == static_cast(n)); + CHECK(std::count(counters_2.begin(), counters_2.end(), 1) == static_cast(n)); + } + } + + // TODO: also add similar tests for bulk_chunked and bulk_unchunked TEST_CASE("eager customization of bulk works with static thread pool", "[adaptors][bulk]") { exec::static_thread_pool pool{4}; ex::scheduler auto sch = pool.get_scheduler(); @@ -333,6 +918,7 @@ namespace { } } + // TODO: also add similar tests for bulk_chunked and bulk_unchunked TEST_CASE("lazy customization of bulk works with static thread pool", "[adaptors][bulk]") { exec::static_thread_pool pool{4}; ex::scheduler auto sch = pool.get_scheduler(); @@ -363,6 +949,20 @@ namespace { ex::sync_wait(std::move(s)); } + TEST_CASE("default bulk_chunked works with non-default constructible types", "[adaptors][bulk]") { + ex::sender auto s = ex::just(non_default_constructible{42}) + | ex::bulk_chunked(ex::par, 1, [](int, int, auto&) { }); + ex::sync_wait(std::move(s)); + } + + TEST_CASE( + "default bulk_unchunked works with non-default constructible types", + "[adaptors][bulk]") { + ex::sender auto s = ex::just(non_default_constructible{42}) + | ex::bulk_unchunked(1, [](int, auto&) { }); + ex::sync_wait(std::move(s)); + } + TEST_CASE("static thread pool works with non-default constructible types", "[adaptors][bulk]") { exec::static_thread_pool pool{4}; ex::scheduler auto sch = pool.get_scheduler(); @@ -371,4 +971,25 @@ namespace { | ex::bulk(ex::par, 1, [](int, auto&) { }); ex::sync_wait(std::move(s)); } + + struct my_domain { + template Sender, class... Env> + static auto transform_sender(Sender, const Env&...) { + return ex::just(std::string{"hijacked"}); + } + }; + + // TODO: fix this test + // TEST_CASE("customizing bulk_chunked also changes the behavior of bulk", "[adaptors][then]") { + // bool called{false}; + // // The customization will return a different value + // basic_inline_scheduler sched; + // auto snd = ex::just(std::string{"hello"}) + // | exec::on( + // sched, // + // ex::bulk(ex::par, 1, [&called](int, std::string x) { called = true; })) + // | exec::write(stdexec::prop{ex::get_scheduler, inline_scheduler()}); + // wait_for_value(std::move(snd), std::string{"hijacked"}); + // REQUIRE_FALSE(called); + // } } // namespace From 95f642af506da91638296719e8dcfecb7424e873 Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Sun, 23 Mar 2025 19:49:05 +0200 Subject: [PATCH 07/16] Proper lowering of `bulk` into `bulk_chunked`. --- include/stdexec/__detail/__bulk.hpp | 57 ++++++++++++----------- test/stdexec/algos/adaptors/test_bulk.cpp | 25 +++++----- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index 3de2a1ef1..f72308dc8 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -210,7 +210,34 @@ namespace stdexec { tag_invoke_t(_AlgoTag, _Sender, _Pol, _Shape, _Fun)>; }; - struct bulk_t : __generic_bulk_t { }; + struct bulk_t : __generic_bulk_t { + template + static auto __transform_sender_fn(const _Env&) { + return [&](__ignore, _Data&& __data, _Child&& __child) { + using __shape_t = std::remove_cvref_t; + auto __new_f = [__func = std::move(__data.__fun_)]( + __shape_t __begin, + __shape_t __end, + auto&&... __vs) mutable // + noexcept(noexcept(__data.__fun_(__begin++, __vs...))) { + while (__begin != __end) + __func(__begin++, __vs...); + }; + + // Lower `bulk` to `bulk_chunked`. If `bulk_chunked` is customized, we will see the customization. + return bulk_chunked( + static_cast<_Child&&>(__child), + __data.__pol_.__get(), + __data.__shape_, + std::move(__new_f)); + }; + } + + template + static auto transform_sender(_Sender&& __sndr, const _Env& __env) { + return __sexpr_apply(static_cast<_Sender&&>(__sndr), __transform_sender_fn(__env)); + } + }; struct bulk_chunked_t : __generic_bulk_t { }; @@ -350,33 +377,7 @@ namespace stdexec { }; struct __bulk_impl : __bulk_impl_base { - //! This implements the core default behavior for `bulk`: - //! This is implemented in terms of `bulk_chunked`. - static constexpr auto complete = // - []( - __ignore, - _State& __state, - _Receiver& __rcvr, - _Tag, - _Args&&... __args) noexcept -> void { - if constexpr (std::same_as<_Tag, set_value_t>) { - using __shape_t = decltype(__state.__shape_); - - constexpr bool __nothrow = noexcept(__state.__fun_(__state.__shape_, __args...)); - auto __new_f = - [__func = std::move(__state.__fun_)]( - __shape_t __begin, __shape_t __end, auto&&... __vs) mutable noexcept(__nothrow) { - while (__begin != __end) - __func(__begin++, __vs...); - }; - - auto __chunked_data = __data{__state.__pol_, __state.__shape_, std::move(__new_f)}; - __bulk_chunked_impl::complete( - _Tag(), __chunked_data, __rcvr, _Tag(), std::forward<_Args>(__args)...); - } else { - _Tag()(static_cast<_Receiver&&>(__rcvr), static_cast<_Args&&>(__args)...); - } - }; + // Implementation is handled by lowering to `bulk_chunked` in `transform_sender`. }; } // namespace __bulk diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index 1538056a2..b8499f4a6 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -979,17 +979,16 @@ namespace { } }; - // TODO: fix this test - // TEST_CASE("customizing bulk_chunked also changes the behavior of bulk", "[adaptors][then]") { - // bool called{false}; - // // The customization will return a different value - // basic_inline_scheduler sched; - // auto snd = ex::just(std::string{"hello"}) - // | exec::on( - // sched, // - // ex::bulk(ex::par, 1, [&called](int, std::string x) { called = true; })) - // | exec::write(stdexec::prop{ex::get_scheduler, inline_scheduler()}); - // wait_for_value(std::move(snd), std::string{"hijacked"}); - // REQUIRE_FALSE(called); - // } + TEST_CASE("late customizing bulk_chunked also changes the behavior of bulk", "[adaptors][then]") { + bool called{false}; + // The customization will return a different value + basic_inline_scheduler sched; + auto snd = ex::just(std::string{"hello"}) + | exec::on( + sched, // + ex::bulk(ex::par, 1, [&called](int, std::string x) { called = true; })) + | exec::write(stdexec::prop{ex::get_scheduler, inline_scheduler()}); + wait_for_value(std::move(snd), std::string{"hijacked"}); + REQUIRE_FALSE(called); + } } // namespace From 3984ae5e30555cff2501501eb95e11199a7c8167 Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Sun, 23 Mar 2025 22:28:46 +0200 Subject: [PATCH 08/16] Add test showing that `bulk` can be customized independently of `bulk_chunked`. --- test/stdexec/algos/adaptors/test_bulk.cpp | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index b8499f4a6..a818b9b48 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -991,4 +991,27 @@ namespace { wait_for_value(std::move(snd), std::string{"hijacked"}); REQUIRE_FALSE(called); } + + struct my_domain2 { + template Sender, class... Env> + static auto transform_sender(Sender, const Env&...) { + return ex::just(std::string{"hijacked"}); + } + }; + + TEST_CASE("bulk can be customized, independently of bulk_chunked", "[adaptors][then]") { + bool called{false}; + // The customization will return a different value + basic_inline_scheduler sched; + auto snd = ex::just(std::string{"hello"}) | ex::continues_on(sched) + | ex::bulk(ex::par, 1, [&called](int, std::string x) { called = true; }); + wait_for_value(std::move(snd), std::string{"hijacked"}); + REQUIRE_FALSE(called); + + // bulk_chunked will still use the default implementation + auto snd2 = ex::just(std::string{"hello"}) | ex::continues_on(sched) + | ex::bulk_chunked(ex::par, 1, [&called](int, int, std::string x) { called = true; }); + wait_for_value(std::move(snd2), std::string{"hello"}); + REQUIRE(called); + } } // namespace From f102fe9e6ea124963dda375a4e6d7f4a01433cc4 Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Mon, 24 Mar 2025 21:56:44 +0200 Subject: [PATCH 09/16] Customize `bulk_chunked` instead of `bulk` for static thread pool. --- include/exec/static_thread_pool.hpp | 21 ++++---- test/stdexec/algos/adaptors/test_bulk.cpp | 60 +++++++++++++++++++++-- 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index 0cb4b0ffd..f10adb3fa 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -195,11 +195,11 @@ namespace exec { #endif template - requires __callable + requires __callable using bulk_non_throwing = // __mbool< // If function invocation doesn't throw - __nothrow_callable && + __nothrow_callable && // and emplacing a tuple doesn't throw #if STDEXEC_MSVC() __bulk_non_throwing::__v @@ -236,7 +236,7 @@ namespace exec { struct transform_bulk { template - auto operator()(bulk_t, Data&& data, Sender&& sndr) { + auto operator()(bulk_chunked_t, Data&& data, Sender&& sndr) { auto [pol, shape, fun] = static_cast(data); // TODO: handle non-par execution policies return bulk_sender_t{ @@ -265,7 +265,7 @@ namespace exec { public: struct domain : stdexec::default_domain { // For eager customization - template Sender> + template Sender> auto transform_sender(Sender&& sndr) const noexcept { if constexpr (__completes_on) { auto sched = get_completion_scheduler(get_env(sndr)); @@ -279,8 +279,8 @@ namespace exec { } } - // transform the generic bulk sender into a parallel thread-pool bulk sender - template Sender, class Env> + // transform the generic bulk_chunked sender into a parallel thread-pool bulk sender + template Sender, class Env> auto transform_sender(Sender&& sndr, const Env& env) const noexcept { if constexpr (__completes_on) { auto sched = get_completion_scheduler(get_env(sndr)); @@ -679,9 +679,8 @@ namespace exec { for (std::uint32_t index = 0; index < threadCount; ++index) { threadStates_[index].emplace(this, index, params, numa_); - threadIndexByNumaNode_.push_back( - thread_index_by_numa_node{ - .numa_node = threadStates_[index]->numa_node(), .thread_index = index}); + threadIndexByNumaNode_.push_back(thread_index_by_numa_node{ + .numa_node = threadStates_[index]->numa_node(), .thread_index = index}); } // NOLINTNEXTLINE(modernize-use-ranges) we still support platforms without the std::ranges algorithms @@ -1158,9 +1157,7 @@ namespace exec { // In the case that the shape is much larger than the total number of threads, // then each call to computation will call the function many times. auto [begin, end] = even_share(sh_state.shape_, tid, total_threads); - for (Shape i = begin; i < end; ++i) { - sh_state.fun_(i, args...); - } + sh_state.fun_(begin, end, args...); }; auto completion = [&](auto&... args) { diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index a818b9b48..6976aea8a 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -892,7 +892,6 @@ namespace { } } - // TODO: also add similar tests for bulk_chunked and bulk_unchunked TEST_CASE("eager customization of bulk works with static thread pool", "[adaptors][bulk]") { exec::static_thread_pool pool{4}; ex::scheduler auto sch = pool.get_scheduler(); @@ -907,7 +906,6 @@ namespace { auto snd = ex::just() // | ex::continues_on(sch) | ex::bulk(ex::par, tids.size(), fun); - CHECK(std::equal_to()(&snd.pool_, &pool)); stdexec::sync_wait(std::move(snd)); // All the work should not have run on the same thread @@ -918,7 +916,34 @@ namespace { } } - // TODO: also add similar tests for bulk_chunked and bulk_unchunked + TEST_CASE( + "eager customization of bulk_chunked works with static thread pool", + "[adaptors][bulk]") { + exec::static_thread_pool pool{4}; + ex::scheduler auto sch = pool.get_scheduler(); + + SECTION("Without values in the set_value channel") { + std::vector tids(42); + + auto fun = [&tids](std::size_t b, std::size_t e) { + while (b < e) { + tids[b++] = std::this_thread::get_id(); + } + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + }; + + auto snd = ex::just() // + | ex::continues_on(sch) | ex::bulk_chunked(ex::par, tids.size(), fun); + stdexec::sync_wait(std::move(snd)); + + // All the work should not have run on the same thread + const auto actual = static_cast(std::count(tids.begin(), tids.end(), tids[0])); + const std::size_t wrong = tids.size(); + + CHECK(actual != wrong); + } + } + TEST_CASE("lazy customization of bulk works with static thread pool", "[adaptors][bulk]") { exec::static_thread_pool pool{4}; ex::scheduler auto sch = pool.get_scheduler(); @@ -943,6 +968,32 @@ namespace { } } + TEST_CASE("lazy customization of bulk_chunked works with static thread pool", "[adaptors][bulk]") { + exec::static_thread_pool pool{4}; + ex::scheduler auto sch = pool.get_scheduler(); + + SECTION("Without values in the set_value channel") { + std::vector tids(42); + + auto fun = [&tids](std::size_t b, std::size_t e) { + while (b < e) { + tids[b++] = std::this_thread::get_id(); + } + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + }; + + auto snd = ex::just() // + | ex::bulk_chunked(ex::par, tids.size(), fun); + stdexec::sync_wait(stdexec::starts_on(sch, std::move(snd))); + + // All the work should not have run on the same thread + const auto actual = static_cast(std::count(tids.begin(), tids.end(), tids[0])); + const std::size_t wrong = tids.size(); + + CHECK(actual != wrong); + } + } + TEST_CASE("default bulk works with non-default constructible types", "[adaptors][bulk]") { ex::sender auto s = ex::just(non_default_constructible{42}) | ex::bulk(ex::par, 1, [](int, auto&) { }); @@ -986,8 +1037,7 @@ namespace { auto snd = ex::just(std::string{"hello"}) | exec::on( sched, // - ex::bulk(ex::par, 1, [&called](int, std::string x) { called = true; })) - | exec::write(stdexec::prop{ex::get_scheduler, inline_scheduler()}); + ex::bulk(ex::par, 1, [&called](int, std::string x) { called = true; })); wait_for_value(std::move(snd), std::string{"hijacked"}); REQUIRE_FALSE(called); } From e751c5e4f5d6bf76b6049859533d22cc4c8722a1 Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Mon, 24 Mar 2025 23:17:52 +0200 Subject: [PATCH 10/16] The execution policy passed to `bulk*` is taken into consideration in static thread pool's customization. --- include/exec/static_thread_pool.hpp | 77 +++++++++++++++-------- test/stdexec/algos/adaptors/test_bulk.cpp | 75 ++++++++++++++++++++++ 2 files changed, 125 insertions(+), 27 deletions(-) diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index f10adb3fa..c32abdc3f 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -175,14 +175,14 @@ namespace exec { // TODO: code to reconstitute a static_thread_pool_ schedule sender }; - template + template struct bulk_sender { using Sender = stdexec::__t; struct __t; }; - template - using bulk_sender_t = __t>, Shape, Fun>>; + template + using bulk_sender_t = __t>, parallelize, Shape, Fun>>; #if STDEXEC_MSVC() // MSVCBUG https://developercommunity.visualstudio.com/t/Alias-template-with-pack-expansion-in-no/10437850 @@ -209,37 +209,45 @@ namespace exec { // there's no need to advertise completion with `exception_ptr` >; - template + template struct bulk_shared_state; - template + template < + class CvrefSenderId, + class ReceiverId, + bool parallelize, + class Shape, + class Fun, + bool MayThrow> struct bulk_receiver { using CvrefSender = __cvref_t; using Receiver = stdexec::__t; struct __t; }; - template - using bulk_receiver_t = - __t, __id, Shape, Fun, MayThrow>>; + template + using bulk_receiver_t = __t< + bulk_receiver<__cvref_id, __id, parallelize, Shape, Fun, MayThrow>>; - template + template struct bulk_op_state { using CvrefSender = stdexec::__cvref_t; using Receiver = stdexec::__t; struct __t; }; - template - using bulk_op_state_t = - __t>, __id<__decay_t>, Shape, Fun>>; + template + using bulk_op_state_t = __t< + bulk_op_state<__id<__decay_t>, __id<__decay_t>, parallelize, Shape, Fun>>; struct transform_bulk { template auto operator()(bulk_chunked_t, Data&& data, Sender&& sndr) { auto [pol, shape, fun] = static_cast(data); - // TODO: handle non-par execution policies - return bulk_sender_t{ + using policy_t = std::remove_cvref_t; + constexpr bool parallelize = std::same_as + || std::same_as; + return bulk_sender_t{ pool_, static_cast(sndr), shape, std::move(fun)}; } @@ -1076,8 +1084,8 @@ namespace exec { ////////////////////////////////////////////////////////////////////////////////////////////////// // What follows is the implementation for parallel bulk execution on static_thread_pool_. - template - struct static_thread_pool_::bulk_sender::__t { + template + struct static_thread_pool_::bulk_sender::__t { using __id = bulk_sender; using sender_concept = sender_t; @@ -1108,7 +1116,8 @@ namespace exec { template using bulk_op_state_t = // - stdexec::__t, stdexec::__id, Shape, Fun>>; + stdexec::__t< + bulk_op_state<__cvref_id, stdexec::__id, parallelize, Shape, Fun>>; template <__decays_to<__t> Self, receiver Receiver> requires receiver_of>> @@ -1139,7 +1148,7 @@ namespace exec { }; //! The customized operation state for `stdexec::bulk` operations - template + template struct static_thread_pool_::bulk_shared_state { //! The actual `bulk_task` holds a pointer to the shared state //! and its `__execute` function reads from that shared state. @@ -1223,8 +1232,12 @@ namespace exec { //! That is, we don't need an agent for each of the shape values. [[nodiscard]] auto num_agents_required() const -> std::uint32_t { - return static_cast( - std::min(shape_, static_cast(pool_.available_parallelism()))); + if constexpr (parallelize) { + return static_cast( + std::min(shape_, static_cast(pool_.available_parallelism()))); + } else { + return static_cast(1); + } } template @@ -1253,12 +1266,20 @@ namespace exec { }; //! A customized receiver to allow parallel execution of `stdexec::bulk` operations: - template - struct static_thread_pool_::bulk_receiver::__t { + template < + class CvrefSenderId, + class ReceiverId, + bool parallelize, + class Shape, + class Fun, + bool MayThrow> + struct static_thread_pool_:: + bulk_receiver::__t { using __id = bulk_receiver; using receiver_concept = receiver_t; - using shared_state = bulk_shared_state; + using shared_state = + bulk_shared_state; shared_state& shared_state_; @@ -1308,8 +1329,9 @@ namespace exec { } }; - template - struct static_thread_pool_::bulk_op_state::__t { + template + struct static_thread_pool_::bulk_op_state:: + __t { using __id = bulk_op_state; static constexpr bool may_throw = // @@ -1319,8 +1341,9 @@ namespace exec { __mbind_front_q, __q<__mand>>>; - using bulk_rcvr = bulk_receiver_t; - using shared_state = bulk_shared_state; + using bulk_rcvr = bulk_receiver_t; + using shared_state = + bulk_shared_state; using inner_op_state = connect_result_t; shared_state shared_state_; diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index 6976aea8a..98160c33d 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -1023,6 +1023,81 @@ namespace { ex::sync_wait(std::move(s)); } + template + int number_of_threads_in_bulk(Sched sch, const Policy& policy, int n) { + std::vector tids(n); + auto fun = [&tids](std::size_t idx) { + tids[idx] = std::this_thread::get_id(); + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + }; + + auto snd = ex::just() // + | ex::continues_on(sch) // + | ex::bulk(policy, tids.size(), fun); + stdexec::sync_wait(std::move(snd)); + + std::sort(tids.begin(), tids.end()); + return static_cast(std::unique(tids.begin(), tids.end()) - tids.begin()); + } + + TEST_CASE( + "static thread pool execute bulk work in accordance with the execution policy", + "[adaptors][bulk]") { + exec::static_thread_pool pool{4}; + ex::scheduler auto sch = pool.get_scheduler(); + + SECTION("seq execution policy") { + REQUIRE(number_of_threads_in_bulk(sch, ex::seq, 42) == 1); + } + SECTION("unseq execution policy") { + REQUIRE(number_of_threads_in_bulk(sch, ex::unseq, 42) == 1); + } + SECTION("par execution policy") { + REQUIRE(number_of_threads_in_bulk(sch, ex::par, 42) > 1); + } + SECTION("par_unseq execution policy") { + REQUIRE(number_of_threads_in_bulk(sch, ex::par_unseq, 42) > 1); + } + } + + template + int number_of_threads_in_bulk_chunked(Sched sch, const Policy& policy, int n) { + std::vector tids(n); + auto fun = [&tids](std::size_t b, std::size_t e) { + while (b < e) + tids[b++] = std::this_thread::get_id(); + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + }; + + auto snd = ex::just() // + | ex::continues_on(sch) // + | ex::bulk_chunked(policy, tids.size(), fun); + stdexec::sync_wait(std::move(snd)); + + std::sort(tids.begin(), tids.end()); + return static_cast(std::unique(tids.begin(), tids.end()) - tids.begin()); + } + + TEST_CASE( + "static thread pool execute bulk_chunked work in accordance with the execution policy", + "[adaptors][bulk]") { + exec::static_thread_pool pool{4}; + ex::scheduler auto sch = pool.get_scheduler(); + + SECTION("seq execution policy") { + REQUIRE(number_of_threads_in_bulk_chunked(sch, ex::seq, 42) == 1); + } + SECTION("unseq execution policy") { + REQUIRE(number_of_threads_in_bulk_chunked(sch, ex::unseq, 42) == 1); + } + SECTION("par execution policy") { + REQUIRE(number_of_threads_in_bulk_chunked(sch, ex::par, 42) > 1); + } + SECTION("par_unseq execution policy") { + REQUIRE(number_of_threads_in_bulk_chunked(sch, ex::par_unseq, 42) > 1); + } + } + struct my_domain { template Sender, class... Env> static auto transform_sender(Sender, const Env&...) { From bfecde64ac3743a0e88572bc46d9c6eeaf23ffec Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Tue, 25 Mar 2025 21:46:21 +0200 Subject: [PATCH 11/16] Fix data constructors to take policy by const reference. --- include/stdexec/__detail/__bulk.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index f72308dc8..64ff08528 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -107,7 +107,7 @@ namespace stdexec { __mliterals<&__data::__pol_, &__data::__shape_, &__data::__fun_>(); }; template - __data(_Pol, _Shape, _Fun) -> __data<_Pol, _Shape, _Fun>; + __data(const _Pol&, _Shape, _Fun) -> __data<_Pol, _Shape, _Fun>; template struct __bulk_traits; From cbbe3ee9d3f06933c8801e72d6a5351aaa227f71 Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Sun, 30 Mar 2025 11:49:27 -0700 Subject: [PATCH 12/16] fix CUDA build --- include/nvexec/stream/bulk.cuh | 2 +- include/nvexec/stream/sync_wait.cuh | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/include/nvexec/stream/bulk.cuh b/include/nvexec/stream/bulk.cuh index d1fff1f8b..ad6506f02 100644 --- a/include/nvexec/stream/bulk.cuh +++ b/include/nvexec/stream/bulk.cuh @@ -393,7 +393,7 @@ namespace nvexec::_strm { struct transform_sender_for { template auto operator()(__ignore, Data data, Sender&& sndr) const { - auto [shape, fun] = static_cast(data); + auto [policy, shape, fun] = static_cast(data); using Shape = decltype(shape); using Fn = decltype(fun); auto sched = get_completion_scheduler(get_env(sndr)); diff --git a/include/nvexec/stream/sync_wait.cuh b/include/nvexec/stream/sync_wait.cuh index 7f82ea199..8c2d53c36 100644 --- a/include/nvexec/stream/sync_wait.cuh +++ b/include/nvexec/stream/sync_wait.cuh @@ -195,15 +195,6 @@ namespace nvexec::_strm { return std::move(std::get<1>(state.data_)); } - -#if STDEXEC_EDG() - // For reporting better diagnostics with nvc++ - template > - auto operator()( - context_state_t context_state, - _Sender&&, - [[maybe_unused]] _Error __diagnostic = {}) const -> std::optional> = delete; -#endif }; } // namespace _sync_wait From ddaba6f495ecd879deb6116a05302aa35062f6e3 Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Fri, 4 Apr 2025 10:13:11 -0700 Subject: [PATCH 13/16] work around bug in nvc++ --- test/stdexec/algos/adaptors/test_bulk.cpp | 24 +++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index 20606ac38..f48e89efb 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -200,44 +200,44 @@ namespace { TEST_CASE("bulk can be used with a function", "[adaptors][bulk]") { constexpr int n = 9; - static int counter[n]{}; - std::fill_n(counter, n, 0); + static int counter1[n]{}; + std::fill_n(counter1, n, 0); - ex::sender auto snd = ex::just() | ex::bulk(ex::par, n, function); + ex::sender auto snd = ex::just() | ex::bulk(ex::par, n, function); auto op = ex::connect(std::move(snd), expect_void_receiver{}); ex::start(op); - for (int i: counter) { + for (int i: counter1) { CHECK(i == 1); } } TEST_CASE("bulk_chunked can be used with a function", "[adaptors][bulk]") { constexpr int n = 9; - static int counter[n]{}; - std::fill_n(counter, n, 0); + static int counter2[n]{}; + std::fill_n(counter2, n, 0); ex::sender auto snd = ex::just() - | ex::bulk_chunked(ex::par, n, function_range); + | ex::bulk_chunked(ex::par, n, function_range); auto op = ex::connect(std::move(snd), expect_void_receiver{}); ex::start(op); for (int i = 0; i < n; i++) { - CHECK(counter[i] == 1); + CHECK(counter2[i] == 1); } } TEST_CASE("bulk_unchunked can be used with a function", "[adaptors][bulk]") { constexpr int n = 9; - static int counter[n]{}; - std::fill_n(counter, n, 0); + static int counter3[n]{}; + std::fill_n(counter3, n, 0); - ex::sender auto snd = ex::just() | ex::bulk_unchunked(n, function); + ex::sender auto snd = ex::just() | ex::bulk_unchunked(n, function); auto op = ex::connect(std::move(snd), expect_void_receiver{}); ex::start(op); for (int i = 0; i < n; i++) { - CHECK(counter[i] == 1); + CHECK(counter3[i] == 1); } } From 8e9547c6ac1e4245c1166d15b0c09da5f136a0fe Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Thu, 24 Apr 2025 11:35:55 +0300 Subject: [PATCH 14/16] Workaround MSVC bug. --- include/stdexec/__detail/__bulk.hpp | 12 ++++++++---- test/stdexec/algos/adaptors/test_bulk.cpp | 3 +++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index 64ff08528..baf45f86c 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -219,10 +219,14 @@ namespace stdexec { __shape_t __begin, __shape_t __end, auto&&... __vs) mutable // - noexcept(noexcept(__data.__fun_(__begin++, __vs...))) { - while (__begin != __end) - __func(__begin++, __vs...); - }; +#if !STDEXEC_MSVC() + // MSVCBUG https://developercommunity.visualstudio.com/t/noexcept-expression-in-lambda-template-n/10718680 + noexcept(noexcept(__data.__fun_(__begin++, __vs...))) +#endif + { + while (__begin != __end) + __func(__begin++, __vs...); + }; // Lower `bulk` to `bulk_chunked`. If `bulk_chunked` is customized, we will see the customization. return bulk_chunked( diff --git a/test/stdexec/algos/adaptors/test_bulk.cpp b/test/stdexec/algos/adaptors/test_bulk.cpp index f48e89efb..6bae13d0f 100644 --- a/test/stdexec/algos/adaptors/test_bulk.cpp +++ b/test/stdexec/algos/adaptors/test_bulk.cpp @@ -148,6 +148,8 @@ namespace { error_scheduler sched2{}; error_scheduler sched3{43}; +#if !STDEXEC_MSVC() + // MSVCBUG https://developercommunity.visualstudio.com/t/noexcept-expression-in-lambda-template-n/10718680 check_err_types>( // ex::transfer_just(sched1) | ex::bulk(ex::par, n, [](int) noexcept { })); check_err_types>( // @@ -158,6 +160,7 @@ namespace { ex::transfer_just(sched3) | ex::bulk(ex::par, n, [](int) noexcept { })); check_err_types>( // ex::transfer_just(sched3) | ex::bulk(ex::par, n, [](int) { throw std::logic_error{"err"}; })); +#endif } TEST_CASE("bulk_chunked keeps error_types from input sender", "[adaptors][bulk]") { From a6dfb60d4da0d7d91b50aebce82593e878a17ff4 Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Sun, 27 Apr 2025 20:58:57 +0300 Subject: [PATCH 15/16] Apply suggestions from code review Co-authored-by: Eric Niebler --- include/stdexec/__detail/__bulk.hpp | 47 +++++++++++------------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index baf45f86c..b2dfaa2a7 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -192,22 +192,6 @@ namespace stdexec { {} }; } - - // This describes how to use the pieces of a bulk sender to find - // legacy customizations of the bulk algorithm. - using _Sender = __1; - using _Pol = __nth_member<0>(__0); - using _Shape = __nth_member<1>(__0); - using _Fun = __nth_member<2>(__0); - using __legacy_customizations_t = __types< - tag_invoke_t( - _AlgoTag, - get_completion_scheduler_t(get_env_t(_Sender&)), - _Sender, - _Pol, - _Shape, - _Fun), - tag_invoke_t(_AlgoTag, _Sender, _Pol, _Shape, _Fun)>; }; struct bulk_t : __generic_bulk_t { @@ -241,6 +225,23 @@ namespace stdexec { static auto transform_sender(_Sender&& __sndr, const _Env& __env) { return __sexpr_apply(static_cast<_Sender&&>(__sndr), __transform_sender_fn(__env)); } + using __generic_bulk_t::operator(); + + template + [[deprecated("The bulk algorithm now requires an execution policy such as stdexec::par as an argument.")]] + STDEXEC_ATTRIBUTE((host, device)) auto operator()(_Sender&& __sndr, _Shape __shape, _Fun __fun) const { + return (*this)( + static_cast<_Sender&&>(__sndr), + par, + static_cast<_Shape&&>(__shape), + static_cast<_Fun&&>(__fun)); + } + + template + [[deprecated("The bulk algorithm now requires an execution policy such as stdexec::par as an argument.")]] + STDEXEC_ATTRIBUTE((always_inline)) auto operator()(_Shape __shape, _Fun __fun) const { + return (*this)(static_cast<_Shape&&>(__shape), static_cast<_Fun&&>(__fun)); + } }; struct bulk_chunked_t : __generic_bulk_t { }; @@ -265,20 +266,6 @@ namespace stdexec { {} }; } - - // This describes how to use the pieces of a bulk sender to find - // legacy customizations of the bulk algorithm. - using _Sender = __1; - using _Shape = __nth_member<1>(__0); - using _Fun = __nth_member<2>(__0); - using __legacy_customizations_t = __types< - tag_invoke_t( - bulk_unchunked_t, - get_completion_scheduler_t(get_env_t(_Sender&)), - _Sender, - _Shape, - _Fun), - tag_invoke_t(bulk_unchunked_t, _Sender, _Shape, _Fun)>; }; template From a78089f732550c0efa1fdd64fa7c643bf83bac45 Mon Sep 17 00:00:00 2001 From: Lucian Radu Teodorescu Date: Mon, 28 Apr 2025 19:30:17 +0300 Subject: [PATCH 16/16] Fix CI failure Move the deprecated definitions in the base class, to avoid some weird shadowing error. --- include/stdexec/__detail/__bulk.hpp | 33 ++++++++++++++--------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index b2dfaa2a7..5ec621efd 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -192,6 +192,22 @@ namespace stdexec { {} }; } + + template + [[deprecated("The bulk algorithm now requires an execution policy such as stdexec::par as an argument.")]] + STDEXEC_ATTRIBUTE((host, device)) auto operator()(_Sender&& __sndr, _Shape __shape, _Fun __fun) const { + return (*this)( + static_cast<_Sender&&>(__sndr), + par, + static_cast<_Shape&&>(__shape), + static_cast<_Fun&&>(__fun)); + } + + template + [[deprecated("The bulk algorithm now requires an execution policy such as stdexec::par as an argument.")]] + STDEXEC_ATTRIBUTE((always_inline)) auto operator()(_Shape __shape, _Fun __fun) const { + return (*this)(static_cast<_Shape&&>(__shape), static_cast<_Fun&&>(__fun)); + } }; struct bulk_t : __generic_bulk_t { @@ -225,23 +241,6 @@ namespace stdexec { static auto transform_sender(_Sender&& __sndr, const _Env& __env) { return __sexpr_apply(static_cast<_Sender&&>(__sndr), __transform_sender_fn(__env)); } - using __generic_bulk_t::operator(); - - template - [[deprecated("The bulk algorithm now requires an execution policy such as stdexec::par as an argument.")]] - STDEXEC_ATTRIBUTE((host, device)) auto operator()(_Sender&& __sndr, _Shape __shape, _Fun __fun) const { - return (*this)( - static_cast<_Sender&&>(__sndr), - par, - static_cast<_Shape&&>(__shape), - static_cast<_Fun&&>(__fun)); - } - - template - [[deprecated("The bulk algorithm now requires an execution policy such as stdexec::par as an argument.")]] - STDEXEC_ATTRIBUTE((always_inline)) auto operator()(_Shape __shape, _Fun __fun) const { - return (*this)(static_cast<_Shape&&>(__shape), static_cast<_Fun&&>(__fun)); - } }; struct bulk_chunked_t : __generic_bulk_t { };