From 05380eb5ac6378b88056e49c54c1ac07e68e6699 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= <dietmar.kuehl@me.com> Date: Mon, 2 Sep 2024 01:41:13 +0100 Subject: [PATCH 1/4] added receiver_of and run_loop --- docs/questions.md | 5 +- include/beman/execution26/detail/connect.hpp | 3 +- include/beman/execution26/detail/receiver.hpp | 1 + .../beman/execution26/detail/receiver_of.hpp | 24 +++ include/beman/execution26/detail/run_loop.hpp | 190 ++++++++++++++++++ src/beman/execution26/CMakeLists.txt | 2 + src/beman/execution26/tests/CMakeLists.txt | 2 + .../tests/exec-recv-concepts.pass.cpp | 90 ++++++++- .../tests/exec-run-loop-general.pass.cpp | 27 +++ .../tests/exec-run-loop-types.pass.cpp | 143 +++++++++++++ 10 files changed, 481 insertions(+), 6 deletions(-) create mode 100644 include/beman/execution26/detail/receiver_of.hpp create mode 100644 include/beman/execution26/detail/run_loop.hpp create mode 100644 src/beman/execution26/tests/exec-run-loop-general.pass.cpp create mode 100644 src/beman/execution26/tests/exec-run-loop-types.pass.cpp diff --git a/docs/questions.md b/docs/questions.md index 2444d243..141fe828 100644 --- a/docs/questions.md +++ b/docs/questions.md @@ -21,4 +21,7 @@ likely observable. - [exec.sched] uses `auto(get_completion_scheduler<set_value_t>(...))` which is OK for clang but doesn't seem to compile for g++ os MSVC. - [exec.just] p2.1: movable-value<Ts> doesn't seems right: movable-value<decay_t<Ts>> -- [exec.just] Otherwise after p2.3 is missing <ts...> \ No newline at end of file +- [exec.just] Otherwise after p2.3 is missing <ts...> +- [exec.run.loop.types] p9.1: "refers remains" -> "refers to remains" +- [exec.run.loop.types] p9.2: "get_stop_token(REC(o))": REC is a receiver, any + environment would be attached to get_env(REC(o)). \ No newline at end of file diff --git a/include/beman/execution26/detail/connect.hpp b/include/beman/execution26/detail/connect.hpp index 100d1aec..e87c0e8d 100644 --- a/include/beman/execution26/detail/connect.hpp +++ b/include/beman/execution26/detail/connect.hpp @@ -18,7 +18,8 @@ namespace beman::execution26 struct connect_t { template <typename Sender, typename Receiver> - auto operator()(Sender&& sender, Receiver&& receiver) const + auto operator()(Sender&& sender, Receiver&& receiver) const + noexcept(true/*-dk:TODO*/) { auto new_sender = [&sender, &receiver]() -> decltype(auto) { return ::beman::execution26::transform_sender( diff --git a/include/beman/execution26/detail/receiver.hpp b/include/beman/execution26/detail/receiver.hpp index 301c3d26..89109d8d 100644 --- a/include/beman/execution26/detail/receiver.hpp +++ b/include/beman/execution26/detail/receiver.hpp @@ -5,6 +5,7 @@ #define INCLUDED_BEMAN_EXECUTION26_DETAIL_RECEIVER #include <beman/execution26/detail/queryable.hpp> +#include <beman/execution26/detail/get_env.hpp> #include <concepts> #include <type_traits> diff --git a/include/beman/execution26/detail/receiver_of.hpp b/include/beman/execution26/detail/receiver_of.hpp new file mode 100644 index 00000000..ed49b680 --- /dev/null +++ b/include/beman/execution26/detail/receiver_of.hpp @@ -0,0 +1,24 @@ +// include/beman/execution26/detail/receiver_of.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_RECEIVER_OF +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_RECEIVER_OF + +#include <beman/execution26/detail/has_completions.hpp> +#include <beman/execution26/detail/receiver.hpp> + +// ---------------------------------------------------------------------------- + +namespace beman::execution26 +{ + template <typename Receiver, typename Completions> + concept receiver_of + = beman::execution26::receiver<Receiver> + && beman::execution26::detail::has_completions<Receiver, Completions> + ; + +} + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution26/detail/run_loop.hpp b/include/beman/execution26/detail/run_loop.hpp new file mode 100644 index 00000000..726af4b4 --- /dev/null +++ b/include/beman/execution26/detail/run_loop.hpp @@ -0,0 +1,190 @@ +// include/beman/execution26/detail/run_loop.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_RUN_LOOP +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_RUN_LOOP + +#include <beman/execution26/detail/completion_signatures.hpp> +#include <beman/execution26/detail/get_completion_scheduler.hpp> +#include <beman/execution26/detail/get_env.hpp> +#include <beman/execution26/detail/get_stop_token.hpp> +#include <beman/execution26/detail/operation_state.hpp> +#include <beman/execution26/detail/scheduler.hpp> +#include <beman/execution26/detail/sender.hpp> +#include <beman/execution26/detail/set_error.hpp> +#include <beman/execution26/detail/set_stopped.hpp> +#include <beman/execution26/detail/set_value.hpp> + +#include <exception> +#include <mutex> +#include <type_traits> +#include <utility> + +// ---------------------------------------------------------------------------- + +namespace beman::execution26 +{ + class run_loop + { + private: + struct scheduler; + + struct env + { + run_loop* loop; + + template <typename Completion> + auto query(::beman::execution26::get_completion_scheduler_t<Completion> const&) const + noexcept + -> scheduler + { + return {this->loop}; + } + }; + struct opstate_base + { + opstate_base* next; + virtual auto execute() noexcept -> void = 0; + }; + template <typename Receiver> + struct opstate + : opstate_base + { + using operation_state_concept = ::beman::execution26::operation_state_t; + + run_loop* loop; + Receiver receiver; + + template <typename R> + opstate(run_loop* loop, R&& receiver) + : loop(loop) + , receiver(::std::forward<Receiver>(receiver)) + { + } + opstate(opstate&&) = delete; + auto start() & noexcept -> void + { + try + { + this->loop->push_back(this); + } + catch(...) + { + ::beman::execution26::set_error( + ::std::move(this->receiver), + ::std::current_exception()); + } + + } + auto execute() noexcept -> void override + { + if (::beman::execution26::get_stop_token( + ::beman::execution26::get_env(this->receiver) + ).stop_requested()) + ::beman::execution26::set_stopped(::std::move(this->receiver)); + else + ::beman::execution26::set_value(::std::move(this->receiver)); + } + }; + struct sender + { + using sender_concept = ::beman::execution26::sender_t; + using completion_signatures = ::beman::execution26::completion_signatures< + ::beman::execution26::set_value_t(), + ::beman::execution26::set_error_t(::std::exception_ptr), + ::beman::execution26::set_stopped_t() + >; + + run_loop* loop; + + auto get_env() const noexcept -> env { return {this->loop}; } + template <typename Receiver> + auto connect(Receiver&& receiver) noexcept + -> opstate<::std::decay_t<Receiver>> + { + return {this->loop, ::std::forward<Receiver>(receiver)}; + } + }; + struct scheduler + { + using scheduler_concept = ::beman::execution26::scheduler_t; + + run_loop* loop; + + auto schedule() noexcept -> sender { return {this->loop}; } + auto operator== (scheduler const&) const -> bool = default; + }; + + enum class state { starting, running, finishing }; + + state current_state{state::starting}; + ::std::mutex mutex{}; + ::std::condition_variable condition{}; + opstate_base* front{}; + opstate_base* back{}; + + auto push_back(opstate_base* item) -> void + { + ::std::lock_guard guard(this->mutex); + if (auto previous_back{::std::exchange(this->back, item)}) + { + previous_back->next = item; + } + else + { + this->front = item; + this->condition.notify_one(); + } + } + auto pop_front() -> opstate_base* + { + ::std::unique_lock guard(this->mutex); + this->condition.wait(guard, + [this]{ return this->front || this->current_state == state::finishing; }); + return this->front? ::std::exchange(this->front, this->front->next): nullptr; + } + + public: + run_loop() noexcept = default; + run_loop(run_loop&&) = delete; + ~run_loop() + { + ::std::lock_guard guard(this->mutex); + { + if (this->front != nullptr || this->current_state == state::running) + ::std::terminate(); + } + } + + auto get_scheduler() -> scheduler { return {this}; } + + auto run() -> void + { + { + ::std::lock_guard guard(this->mutex); + auto current{::std::exchange(this->current_state, state::running)}; + if (state::starting != current) + { + ::std::terminate(); + } + } + + while (auto* op{this->pop_front()}) + { + op->execute(); + } + } + auto finish() -> void + { + { + ::std::lock_guard guard(this->mutex); + this->current_state = state::finishing; + } + this->condition.notify_one(); + } + }; +} + +// ---------------------------------------------------------------------------- + +#endif diff --git a/src/beman/execution26/CMakeLists.txt b/src/beman/execution26/CMakeLists.txt index 026cfcd7..60bc508c 100644 --- a/src/beman/execution26/CMakeLists.txt +++ b/src/beman/execution26/CMakeLists.txt @@ -89,6 +89,8 @@ target_sources(${TARGET_LIBRARY} ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/query_with_default.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/queryable.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/receiver.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/receiver_of.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/run_loop.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/sched_attrs.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/sched_env.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/schedule.hpp diff --git a/src/beman/execution26/tests/CMakeLists.txt b/src/beman/execution26/tests/CMakeLists.txt index dd58ffbb..a012ff83 100644 --- a/src/beman/execution26/tests/CMakeLists.txt +++ b/src/beman/execution26/tests/CMakeLists.txt @@ -2,6 +2,8 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception list(APPEND execution_tests + exec-run-loop-types.pass + exec-run-loop-general.pass exec-just.pass exec-snd-expos.pass exec-recv-concepts.pass diff --git a/src/beman/execution26/tests/exec-recv-concepts.pass.cpp b/src/beman/execution26/tests/exec-recv-concepts.pass.cpp index 56c42aaa..4a3a8b55 100644 --- a/src/beman/execution26/tests/exec-recv-concepts.pass.cpp +++ b/src/beman/execution26/tests/exec-recv-concepts.pass.cpp @@ -1,6 +1,7 @@ // src/beman/execution26/tests/exec-recv-concepts.pass.cpp -*-C++-*- // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +#include <beman/execution26/detail/receiver_of.hpp> #include <beman/execution26/detail/has_completions.hpp> #include <beman/execution26/detail/valid_completion_for.hpp> #include <beman/execution26/execution.hpp> @@ -33,9 +34,10 @@ namespace auto set_stopped() && noexcept -> void {} }; + template <typename Concept = test_std::receiver_t> struct multi_receiver { - using receiver_concept = test_std::receiver_t; + using receiver_concept = Concept; auto set_value(int) && noexcept -> void {} auto set_value(int, arg) && noexcept -> void {} @@ -120,7 +122,7 @@ namespace static_assert(test_std::receiver<stopped_receiver>); static_assert(test_detail::has_completions< - multi_receiver, + multi_receiver<>, test_std::completion_signatures< test_std::set_value_t(int), test_std::set_value_t(int, arg), @@ -130,7 +132,7 @@ namespace > >); static_assert(not test_detail::has_completions< - multi_receiver, + multi_receiver<>, test_std::completion_signatures< test_std::set_value_t(int), test_std::set_value_t(arg, int), @@ -140,7 +142,86 @@ namespace > >); static_assert(not test_detail::has_completions< - multi_receiver, + multi_receiver<>, + test_std::completion_signatures< + test_std::set_value_t(int), + test_std::set_value_t(int, arg), + test_std::set_value_t(arg, arg), + test_std::set_error_t(int), + test_std::set_stopped_t() + > + >); + } + + auto test_receiver_of() -> void + { + static_assert(test_std::receiver_of< + value_receiver<int>, + test_std::completion_signatures<> + >); + static_assert(test_std::receiver_of< + value_receiver<int>, + test_std::completion_signatures<test_std::set_value_t(int)> + >); + static_assert(not test_std::receiver_of< + value_receiver<int>, + test_std::completion_signatures<test_std::set_value_t(int, int)> + >); + + static_assert(test_std::receiver_of< + error_receiver<int>, + test_std::completion_signatures<test_std::set_error_t(int)> + >); + static_assert(not test_std::receiver_of< + error_receiver<int>, + test_std::completion_signatures<test_std::set_error_t(error)> + >); + static_assert(test_std::receiver_of< + error_receiver<error>, + test_std::completion_signatures<test_std::set_error_t(error)> + >); + + static_assert(not test_std::receiver_of< + error_receiver<error>, + test_std::completion_signatures<test_std::set_stopped_t()> + >); + static_assert(test_std::receiver_of< + stopped_receiver, + test_std::completion_signatures<test_std::set_stopped_t()> + >); + + static_assert(test_std::receiver_of< + multi_receiver<>, + test_std::completion_signatures< + test_std::set_value_t(int), + test_std::set_value_t(int, arg), + test_std::set_value_t(arg, arg), + test_std::set_error_t(error), + test_std::set_stopped_t() + > + >); + static_assert(not test_std::receiver_of< + multi_receiver<int>, + test_std::completion_signatures< + test_std::set_value_t(int), + test_std::set_value_t(int, arg), + test_std::set_value_t(arg, arg), + test_std::set_error_t(error), + test_std::set_stopped_t() + > + >); + static_assert(not test_std::receiver_of< + multi_receiver<>, + test_std::completion_signatures< + test_std::set_value_t(int), + test_std::set_value_t(arg, int), + test_std::set_value_t(arg, arg), + test_std::set_error_t(error), + test_std::set_stopped_t() + > + >); + static_assert(not test_std::receiver_of< + multi_receiver<>, test_std::completion_signatures< test_std::set_value_t(int), test_std::set_value_t(int, arg), @@ -156,4 +237,5 @@ auto main() -> int { test_valid_completion_for(); test_has_completions(); + test_receiver_of(); } \ No newline at end of file diff --git a/src/beman/execution26/tests/exec-run-loop-general.pass.cpp b/src/beman/execution26/tests/exec-run-loop-general.pass.cpp new file mode 100644 index 00000000..6c8ece91 --- /dev/null +++ b/src/beman/execution26/tests/exec-run-loop-general.pass.cpp @@ -0,0 +1,27 @@ +// src/beman/execution26/tests/exec-run-loop-general.pass.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include <beman/execution26/detail/run_loop.hpp> +#include <beman/execution26/detail/scheduler.hpp> +#include <test/execution.hpp> +#include <concepts> +#include <type_traits> + +// ---------------------------------------------------------------------------- + +namespace +{ + auto use(auto&&...) {} +} + +auto main() -> int +{ + static_assert(noexcept(test_std::run_loop())); + static_assert(not std::move_constructible<test_std::run_loop>); + test_std::run_loop rl; + use(rl); + + static_assert(requires{ { rl.get_scheduler() } -> test_std::scheduler; }); + static_assert(requires{ rl.run(); }); + static_assert(requires{ rl.finish(); }); +} \ No newline at end of file diff --git a/src/beman/execution26/tests/exec-run-loop-types.pass.cpp b/src/beman/execution26/tests/exec-run-loop-types.pass.cpp new file mode 100644 index 00000000..b23aa46d --- /dev/null +++ b/src/beman/execution26/tests/exec-run-loop-types.pass.cpp @@ -0,0 +1,143 @@ +// src/beman/execution26/tests/exec-run-loop-types.pass.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include <beman/execution26/detail/run_loop.hpp> + +#include <beman/execution26/detail/inplace_stop_source.hpp> +#include <beman/execution26/detail/completion_signatures.hpp> +#include <beman/execution26/detail/connect.hpp> +#include <beman/execution26/detail/start.hpp> +#include <beman/execution26/detail/get_completion_signatures.hpp> +#include <beman/execution26/detail/get_env.hpp> +#include <beman/execution26/detail/get_stop_token.hpp> +#include <beman/execution26/detail/receiver_of.hpp> +#include <beman/execution26/detail/scheduler.hpp> +#include <beman/execution26/detail/sender.hpp> + +#include <test/execution.hpp> +#include <concepts> +#include <exception> + +// ---------------------------------------------------------------------------- + +namespace +{ + auto use(auto&&...) -> void {} + + enum class signal_type { none, error, stopped, value }; + + struct token_env + { + test_std::inplace_stop_token token; + auto query(test_std::get_stop_token_t const&) const noexcept + { + return this->token; + } + }; + + struct receiver + { + using receiver_concept = test_std::receiver_t; + + signal_type* result; + test_std::inplace_stop_token token; + + auto set_value() && noexcept { *result = signal_type::value; } + auto set_error(std::exception_ptr) && noexcept { *result = signal_type::error; } + auto set_stopped() && noexcept { *result = signal_type::stopped; } + + auto get_env() const noexcept -> token_env { return {this->token}; } + }; + + struct finish_receiver + { + test_std::run_loop* loop; + using receiver_concept = test_std::receiver_t; + + auto set_value() && noexcept { this->loop-> finish(); } + auto set_error(std::exception_ptr) && noexcept { this->loop-> finish(); } + auto set_stopped() && noexcept { this->loop-> finish(); } + }; + + auto test_run_loop_scheduler_equality() -> void + { + // p3: + test_std::run_loop rl1; + test_std::run_loop rl2; + + assert(rl1.get_scheduler() == rl1.get_scheduler()); + assert(rl2.get_scheduler() == rl2.get_scheduler()); + assert(rl1.get_scheduler() != rl2.get_scheduler()); + } +} + +auto main() -> int +{ + test_std::run_loop rl; + // p1: + static_assert(requires{ { rl.get_scheduler() } -> test_std::scheduler; }); + test_run_loop_scheduler_equality(); + + // p4: + auto scheduler{rl.get_scheduler()}; + static_assert(requires{ { test_std::schedule(scheduler) } noexcept -> test_std::sender; }); + + // p5: + auto sender{test_std::schedule(scheduler)}; + struct env {}; + static_assert(::std::same_as< + test_std::completion_signatures< + test_std::set_value_t(), + test_std::set_error_t(std::exception_ptr), + test_std::set_stopped_t() + >, + decltype(test_std::get_completion_signatures(sender, env{})) + >); + + // p7: + static_assert(test_std::receiver_of< + receiver, + decltype(test_std::get_completion_signatures(sender, env{})) + >); + // p7.1: + static_assert(requires{ + { test_std::connect(sender, receiver{})} noexcept -> test_std::operation_state; + }); + // p7.2: + auto e{test_std::get_env(sender)}; + static_assert(requires{ + { test_std::get_completion_scheduler<test_std::set_error_t>(e) } + noexcept -> std::same_as<decltype(scheduler)>; + { test_std::get_completion_scheduler<test_std::set_stopped_t>(e) } + noexcept -> std::same_as<decltype(scheduler)>; + { test_std::get_completion_scheduler<test_std::set_value_t>(e) } + noexcept -> std::same_as<decltype(scheduler)>; + }); + assert(scheduler == test_std::get_completion_scheduler<test_std::set_error_t>(e)); + assert(scheduler == test_std::get_completion_scheduler<test_std::set_stopped_t>(e)); + assert(scheduler == test_std::get_completion_scheduler<test_std::set_value_t>(e)); + + // p8, p9* can't be tested directly. + signal_type unstopped_result{signal_type::none}; + signal_type stopped_result{signal_type::none}; + test_std::inplace_stop_source unstopped; + test_std::inplace_stop_source stopped; + stopped.request_stop(); + + auto unstopped_op{test_std::connect(sender, + receiver{&unstopped_result, unstopped.get_token()})}; + auto stopped_op{test_std::connect(sender, + receiver{&stopped_result, stopped.get_token()})}; + auto finish_op{test_std::connect(sender, finish_receiver{&rl})}; + + test_std::start(finish_op); + test_std::start(unstopped_op); + test_std::start(stopped_op); + + rl.run(); + + assert(unstopped_result == signal_type::value); + assert(stopped_result == signal_type::stopped); + + //-dk:TODO more thorough run_loop tests +} \ No newline at end of file From 2bb2fd61aa50e00754a7c9976f739c4f0dfadb82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= <dietmar.kuehl@me.com> Date: Mon, 2 Sep 2024 01:45:07 +0100 Subject: [PATCH 2/4] added a missing header --- include/beman/execution26/detail/run_loop.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/include/beman/execution26/detail/run_loop.hpp b/include/beman/execution26/detail/run_loop.hpp index 726af4b4..5162b997 100644 --- a/include/beman/execution26/detail/run_loop.hpp +++ b/include/beman/execution26/detail/run_loop.hpp @@ -16,6 +16,7 @@ #include <beman/execution26/detail/set_value.hpp> #include <exception> +#include <condition_variable> #include <mutex> #include <type_traits> #include <utility> From 1acafe10b6b56043961608046e42391a4618d491 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= <dietmar.kuehl@me.com> Date: Mon, 2 Sep 2024 02:01:06 +0100 Subject: [PATCH 3/4] removed an unnecessary scope --- include/beman/execution26/detail/run_loop.hpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/include/beman/execution26/detail/run_loop.hpp b/include/beman/execution26/detail/run_loop.hpp index 5162b997..26d00deb 100644 --- a/include/beman/execution26/detail/run_loop.hpp +++ b/include/beman/execution26/detail/run_loop.hpp @@ -151,10 +151,8 @@ namespace beman::execution26 ~run_loop() { ::std::lock_guard guard(this->mutex); - { - if (this->front != nullptr || this->current_state == state::running) - ::std::terminate(); - } + if (this->front != nullptr || this->current_state == state::running) + ::std::terminate(); } auto get_scheduler() -> scheduler { return {this}; } From acd3872a5221afd2bbbcaa4568a0ed00d95d38eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= <dietmar.kuehl@me.com> Date: Mon, 2 Sep 2024 02:24:38 +0100 Subject: [PATCH 4/4] added a missing initialization and dealing with the queue getting empty --- include/beman/execution26/detail/run_loop.hpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/include/beman/execution26/detail/run_loop.hpp b/include/beman/execution26/detail/run_loop.hpp index 26d00deb..68247190 100644 --- a/include/beman/execution26/detail/run_loop.hpp +++ b/include/beman/execution26/detail/run_loop.hpp @@ -44,7 +44,7 @@ namespace beman::execution26 }; struct opstate_base { - opstate_base* next; + opstate_base* next{}; virtual auto execute() noexcept -> void = 0; }; template <typename Receiver> @@ -142,6 +142,8 @@ namespace beman::execution26 ::std::unique_lock guard(this->mutex); this->condition.wait(guard, [this]{ return this->front || this->current_state == state::finishing; }); + if (this->front == this->back) + this->back = nullptr; return this->front? ::std::exchange(this->front, this->front->next): nullptr; }