From b0859bc640dbd1cc8b92dbe40c14a51bd620b6ce Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Sun, 24 Sep 2023 22:14:35 +0200 Subject: [PATCH] Implement parallelism and cardinality --- include/exec/sequence/empty_sequence.hpp | 7 ++ include/exec/sequence/iterate.hpp | 11 +++- include/exec/sequence_senders.hpp | 84 +++++++++++++++++++++++- test/exec/sequence/test_iterate.cpp | 11 ++++ 4 files changed, 110 insertions(+), 3 deletions(-) diff --git a/include/exec/sequence/empty_sequence.hpp b/include/exec/sequence/empty_sequence.hpp index 0be237b62..05f8273c1 100644 --- a/include/exec/sequence/empty_sequence.hpp +++ b/include/exec/sequence/empty_sequence.hpp @@ -50,6 +50,13 @@ namespace exec { return stdexec::__t<__operation>>>{ static_cast<_Rcvr&&>(__rcvr)}; } + + template <__decays_to<__t> _Self> + friend auto tag_invoke(get_env_t, _Self&&) noexcept { + return make_env( + with(parallelism, lock_step), + with(cardinality, std::integral_constant{})); + } }; }; diff --git a/include/exec/sequence/iterate.hpp b/include/exec/sequence/iterate.hpp index 96ef4550f..6107a3a10 100644 --- a/include/exec/sequence/iterate.hpp +++ b/include/exec/sequence/iterate.hpp @@ -205,8 +205,15 @@ namespace exec { return {}; } - static empty_env get_env(__ignore) noexcept { - return {}; + static make_env_t> get_env(__ignore) noexcept { + return make_env(with(parallelism, lock_step)); + } + + template _SeqExpr> + requires std::ranges::sized_range> + static auto get_env(const _SeqExpr& __seq) noexcept { + auto&& __rng = apply_sender(__seq, stdexec::__detail::__get_data{}); + return make_env(with(parallelism, lock_step), with(cardinality, std::ranges::size(__rng))); } }; } diff --git a/include/exec/sequence_senders.hpp b/include/exec/sequence_senders.hpp index 5dbc92cc8..f5792282b 100644 --- a/include/exec/sequence_senders.hpp +++ b/include/exec/sequence_senders.hpp @@ -18,6 +18,8 @@ #include "../stdexec/execution.hpp" +#include "./env.hpp" + namespace exec { struct sequence_tag { }; @@ -193,7 +195,7 @@ namespace exec { using item_types_of_t = decltype(get_item_types(stdexec::__declval<_Sender>(), stdexec::__declval<_Env>())); - template + template concept sequence_sender = // stdexec::sender<_Sender, _Env> && // enable_sequence_sender>; @@ -454,4 +456,84 @@ namespace exec { } } } + + namespace __sequence_queries { + using namespace stdexec; + + inline struct unbounded_t { + } unbounded; + + struct cardinality_t { + template + requires tag_invocable + constexpr tag_invoke_result_t operator()(const Env& env) const + noexcept(nothrow_tag_invocable) { + return tag_invoke(*this, env); + } + + friend constexpr bool tag_invoke(forwarding_query_t, cardinality_t) noexcept { + return true; + } + }; + + inline struct many_sender_t { + } many_sender; + + inline struct lock_step_t { + } lock_step; + + struct parallelism_t { + template + requires tag_invocable + constexpr tag_invoke_result_t operator()(const Env& env) const + noexcept(nothrow_tag_invocable) { + return tag_invoke(*this, env); + } + + friend constexpr bool tag_invoke(forwarding_query_t, parallelism_t) noexcept { + return true; + } + }; + } + + using __sequence_queries::cardinality_t; + inline constexpr cardinality_t cardinality; + + using __sequence_queries::parallelism_t; + inline constexpr parallelism_t parallelism; + + using __sequence_queries::unbounded_t; + using __sequence_queries::unbounded; + + using __sequence_queries::lock_step_t; + using __sequence_queries::lock_step; + + using __sequence_queries::many_sender_t; + using __sequence_queries::many_sender; + + namespace __get_sequence_env { + using namespace stdexec; + + struct get_sequence_env_t { + template + constexpr env_of_t operator()(const Sequence& seq) const noexcept { + return stdexec::get_env(seq); + } + + template + requires(!sequence_sender) + constexpr auto operator()(const Sequence& seq) const noexcept { + return make_env( + stdexec::get_env(seq), + with(cardinality, std::integral_constant{}), + with(parallelism, lock_step)); + } + }; + } + + using __get_sequence_env::get_sequence_env_t; + inline constexpr get_sequence_env_t get_sequence_env; + + template + using sequence_env_of_t = stdexec::__call_result_t; } \ No newline at end of file diff --git a/test/exec/sequence/test_iterate.cpp b/test/exec/sequence/test_iterate.cpp index bc867e4d2..222ada231 100644 --- a/test/exec/sequence/test_iterate.cpp +++ b/test/exec/sequence/test_iterate.cpp @@ -129,4 +129,15 @@ TEST_CASE("iterate - sum up an array with custom domain", "[sequence_senders][it CHECK(sum == (42 + 43 + 44 + 1)); } +TEST_CASE("iterate - is lock step") { + std::array array{42, 43, 44}; + auto iterate = exec::iterate(std::views::all(array)); + STATIC_REQUIRE(exec::sequence_sender_in); + STATIC_REQUIRE(stdexec::sender_expr_for); + using parallelism_t = decltype(exec::parallelism(stdexec::get_env(iterate))); + STATIC_REQUIRE(std::same_as); + auto size = exec::cardinality(stdexec::get_env(iterate)); + CHECK(size == 3); +} + #endif // STDEXEC_HAS_STD_RANGES()