From a3f8c9b9909830a7349337e1b2111a6e1f1eed87 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Mon, 31 Oct 2022 10:18:43 -0700 Subject: [PATCH 01/13] tail_sender concepts --- include/exec/tail_sender.hpp | 166 +++++++++++++++++++++++++++++++++ test/exec/test_tail_sender.cpp | 58 ++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 include/exec/tail_sender.hpp create mode 100644 test/exec/test_tail_sender.cpp diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp new file mode 100644 index 000000000..2166fb40e --- /dev/null +++ b/include/exec/tail_sender.hpp @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2021-2022 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../stdexec/execution.hpp" +#include "env.hpp" + +namespace exec { + using namespace stdexec; + + template + concept __contextually_convertible_to_bool = + requires(const T c) { + { (static_cast(c) ? false : false) } -> same_as; + }; + + template + static constexpr bool __nothrow_contextually_convertible_to_bool_v = + noexcept((std::declval() ? (void)0 : (void)0)); + + template + concept __nothrow_contextually_convertible_to_bool = + __contextually_convertible_to_bool && + __nothrow_contextually_convertible_to_bool_v; + + namespace __unwind { + struct unwind_t { + template + requires std::tag_invocable + void operator()(_Op& __op) const noexcept(std::nothrow_tag_invocable) { + (void) tag_invoke(unwind_t{}, __op); + } + }; + } + using __unwind::unwind_t; + inline constexpr unwind_t unwind{}; + + template + struct c_t { + }; + template + inline constexpr c_t<_T> c_v{}; + + namespace __sender_queries { + template + const _Ty& __cref_fn(const _Ty&); + template + using __cref_t = + decltype(__sender_queries::__cref_fn(__declval<_Ty>())); + + struct always_completes_inline_t { + template + requires std::tag_invocable, __cref_t<_Env>> + constexpr bool operator()(_Sender&& __s, _Env&& __e) const noexcept { + static_assert(same_as, __cref_t<_Env>>>); + static_assert(std::nothrow_tag_invocable, __cref_t<_Env>>); + return tag_invoke(always_completes_inline_t{}, std::as_const(__s), std::as_const(__e)); + } + template + requires std::tag_invocable, c_t<_Env>> + constexpr bool operator()(c_t<_Sender>&& __s, c_t<_Env>&& __e) const noexcept { + static_assert(same_as, c_t<_Env>>>); + static_assert(std::nothrow_tag_invocable, c_t<_Env>>); + return tag_invoke(always_completes_inline_t{}, __s, __e); + } + constexpr bool operator()(auto&&, auto&&) const noexcept { + return false; + } + }; + } // namespace __sender_queries + using __sender_queries::always_completes_inline_t; + inline constexpr always_completes_inline_t always_completes_inline{}; + + template + concept tail_operation_state = + operation_state<_TailOperationState> && + std::is_nothrow_destructible_v<_TailOperationState> && + std::is_trivially_destructible_v<_TailOperationState> && + (!std::is_copy_constructible_v<_TailOperationState>) && + (!std::is_move_constructible_v<_TailOperationState>) && + (!std::is_copy_assignable_v<_TailOperationState>) && + (!std::is_move_assignable_v<_TailOperationState>) && + requires (_TailOperationState& __o) { + { unwind(__o) } noexcept; + }; + + template + constexpr bool always_completes_inline_v = + always_completes_inline(c_v<_Sender>, c_v<_Env>); + + template + concept tail_sender = + sender<_TailSender, _Env> && + same_as<__single_sender_value_t<_TailSender, _Env>, void> && + always_completes_inline_v<_TailSender, _Env> && + std::is_nothrow_move_constructible_v<_TailSender> && + std::is_nothrow_destructible_v<_TailSender> && + std::is_trivially_destructible_v<_TailSender>; + + template + concept tail_receiver = + receiver<_TailReceiver> && + std::is_nothrow_copy_constructible_v<_TailReceiver> && + std::is_nothrow_move_constructible_v<_TailReceiver> && + std::is_nothrow_copy_assignable_v<_TailReceiver> && + std::is_nothrow_move_assignable_v<_TailReceiver> && + std::is_nothrow_destructible_v<_TailReceiver> && + std::is_trivially_destructible_v<_TailReceiver>; + + template + using __next_tail_from_operation_t = + __call_result_t; + + template + using __next_tail_from_sender_to_t = + __next_tail_from_operation_t>; + + template + concept __tail_sender_or_void = + same_as<_TailSender, void> || tail_sender<_TailSender, _Env>; + + template + concept tail_sender_to = + tail_sender<_TailSender> && + tail_receiver<_TailReceiver> && + requires(_TailSender&& __s, _TailReceiver&& __r) { + { stdexec::connect((_TailSender&&) __s, (_TailReceiver&&) __r) } noexcept -> + tail_operation_state; + } && + __tail_sender_or_void<__next_tail_from_sender_to_t<_TailSender, _TailReceiver>>; + + template + concept __terminal_tail_sender_to = + tail_sender_to<_TailSender, _TailReceiver> && + same_as<__next_tail_from_sender_to_t<_TailSender, _TailReceiver>, void>; + + template + concept __recursive_tail_sender_to = + tail_sender_to<_TailSender, _TailReceiver> && + tail_operation_state> && + __one_of<__next_tail_from_sender_to_t<_TailSender, _TailReceiver>, _ValidTailSender...>; + + template + concept __nullable_tail_operation_state = + tail_operation_state<_TailOperationState> && + __nothrow_contextually_convertible_to_bool<_TailOperationState>; + + template + concept __nullable_tail_sender_to = + tail_sender_to<_TailSender, _TailReceiver> && + __nullable_tail_operation_state>; + +} // namespace exec diff --git a/test/exec/test_tail_sender.cpp b/test/exec/test_tail_sender.cpp new file mode 100644 index 000000000..33fd54205 --- /dev/null +++ b/test/exec/test_tail_sender.cpp @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2021-2022 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +using namespace std; +namespace ex = stdexec; + +//! Tail Sender +struct ATailSender { + using completion_signatures = ex::completion_signatures; + + template + struct operation { + Receiver rcvr_; + + friend auto tag_invoke(ex::start_t, operation& self) noexcept { + return ex::set_value(std::move(self.rcvr_)); + } + + friend void tag_invoke(exec::unwind_t, operation& self) noexcept { + ex::set_stopped(std::move(self.rcvr_)); + } + }; + + template + friend auto tag_invoke(ex::connect_t, ATailSender&& self, Receiver&& rcvr) + -> operation> { + return {std::forward(rcvr)}; + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, exec::c_t, exec::c_t<_Env>) noexcept { + return true; + } +}; + +TEST_CASE("Test ATailSender is a tail_sender", "[tail_sender]") { + static_assert(exec::tail_sender); + CHECK(exec::tail_sender); +} From b1d44179254d3ed152d1164de99502ec6b256319 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Mon, 31 Oct 2022 13:58:07 -0700 Subject: [PATCH 02/13] add __null_tail_sender --- include/exec/tail_sender.hpp | 36 ++++++++++++++++++++++++++++++++++ test/exec/test_tail_sender.cpp | 5 +++++ 2 files changed, 41 insertions(+) diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp index 2166fb40e..004742dad 100644 --- a/include/exec/tail_sender.hpp +++ b/include/exec/tail_sender.hpp @@ -163,4 +163,40 @@ namespace exec { tail_sender_to<_TailSender, _TailReceiver> && __nullable_tail_operation_state>; + + struct __null_tail_receiver { + void set_value() noexcept {} + void set_error(std::exception_ptr) noexcept {} + void set_done() noexcept {} + }; + + struct __null_tail_sender { + struct op { + // this is a nullable_tail_sender that always returns false to prevent + // callers from calling start() and unwind() + inline constexpr operator bool() const noexcept { return false; } + friend void tag_invoke(start_t, op& self) noexcept { + std::terminate(); + } + + friend void tag_invoke(unwind_t, op& self) noexcept { + std::terminate(); + } + }; + + using completion_signatures = completion_signatures; + + template + friend auto tag_invoke(connect_t, __null_tail_sender&&, Receiver&&) + -> op { + return {}; + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, exec::c_t<__null_tail_sender>, exec::c_t<_Env>) noexcept { + return true; + } + }; + } // namespace exec diff --git a/test/exec/test_tail_sender.cpp b/test/exec/test_tail_sender.cpp index 33fd54205..2d48b60de 100644 --- a/test/exec/test_tail_sender.cpp +++ b/test/exec/test_tail_sender.cpp @@ -56,3 +56,8 @@ TEST_CASE("Test ATailSender is a tail_sender", "[tail_sender]") { static_assert(exec::tail_sender); CHECK(exec::tail_sender); } + +TEST_CASE("Test __null_tail_sender is a tail_sender", "[tail_sender]") { + static_assert(exec::tail_sender); + CHECK(exec::tail_sender); +} From 5699467e82cacd9c7d7af535b72427fe9a039317 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Mon, 31 Oct 2022 14:19:44 -0700 Subject: [PATCH 03/13] add maybe_tail_sender --- include/exec/tail_sender.hpp | 46 ++++++++++++++++++++++++++++++++-- test/exec/test_tail_sender.cpp | 5 ++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp index 004742dad..b7f80f391 100644 --- a/include/exec/tail_sender.hpp +++ b/include/exec/tail_sender.hpp @@ -186,8 +186,8 @@ namespace exec { using completion_signatures = completion_signatures; - template - friend auto tag_invoke(connect_t, __null_tail_sender&&, Receiver&&) + template + friend auto tag_invoke(connect_t, __null_tail_sender&&, _TailReceiver&&) -> op { return {}; } @@ -199,4 +199,46 @@ namespace exec { } }; + template + struct maybe_tail_sender { + maybe_tail_sender() noexcept = default; + maybe_tail_sender(__null_tail_sender) noexcept {} + maybe_tail_sender(_TailSender __t) noexcept : tail_sender_(__t) {} + template + struct op { + using op_t = connect_result_t<_TailSender, _TailReceiver>; + explicit op() {} + explicit op(_TailSender __t, _TailReceiver __r) : op_(stdexec::connect(__t, __r)) {} + operator bool() const noexcept { return !!op_ && !!*op_; } + + friend auto tag_invoke(start_t, op& __self) noexcept { + if (!__self.op_ || !*__self.op_) { std::terminate(); } + return stdexec::start(*__self.op_); + } + + friend void tag_invoke(unwind_t, op& __self) noexcept { + if (!__self.op_ || !*__self.op_) { std::terminate(); } + exec::unwind(*__self.op_); + } + std::optional op_; + }; + + using completion_signatures = completion_signatures; + + template + friend auto tag_invoke(connect_t, maybe_tail_sender&& __self, _TailReceiver&& __r) + -> op<_TailReceiver> { + if (!__self.tail_sender_) { return {}; } + return {((maybe_tail_sender&&)__self).tail_sender_, __r}; + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, exec::c_t, exec::c_t<_Env>) noexcept { + return true; + } + + private: + std::optional<_TailSender> tail_sender_; + }; } // namespace exec diff --git a/test/exec/test_tail_sender.cpp b/test/exec/test_tail_sender.cpp index 2d48b60de..b0951d413 100644 --- a/test/exec/test_tail_sender.cpp +++ b/test/exec/test_tail_sender.cpp @@ -61,3 +61,8 @@ TEST_CASE("Test __null_tail_sender is a tail_sender", "[tail_sender]") { static_assert(exec::tail_sender); CHECK(exec::tail_sender); } + +TEST_CASE("Test maybe_tail_sender is a tail_sender", "[tail_sender]") { + static_assert(exec::tail_sender>); + CHECK(exec::tail_sender>); +} From 374043dc586a62badb49985db824c72c14b522d7 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Tue, 1 Nov 2022 08:29:26 -0700 Subject: [PATCH 04/13] start, set_value, set_error, and set_stopped may return a tail_sender --- include/stdexec/execution.hpp | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/include/stdexec/execution.hpp b/include/stdexec/execution.hpp index c8af26026..4edfe32a3 100644 --- a/include/stdexec/execution.hpp +++ b/include/stdexec/execution.hpp @@ -334,9 +334,10 @@ namespace stdexec { template requires tag_invocable STDEXEC_DETAIL_CUDACC_HOST_DEVICE // - void operator()(_Receiver&& __rcvr, _As&&... __as) const noexcept { + auto operator()(_Receiver&& __rcvr, _As&&... __as) const noexcept + -> tag_invoke_result_t { static_assert(nothrow_tag_invocable); - (void) tag_invoke(set_value_t{}, (_Receiver&&) __rcvr, (_As&&) __as...); + return tag_invoke(set_value_t{}, (_Receiver&&) __rcvr, (_As&&) __as...); } }; @@ -348,9 +349,10 @@ namespace stdexec { template requires tag_invocable STDEXEC_DETAIL_CUDACC_HOST_DEVICE // - void operator()(_Receiver&& __rcvr, _Error&& __err) const noexcept { + auto operator()(_Receiver&& __rcvr, _Error&& __err) const noexcept + -> tag_invoke_result_t { static_assert(nothrow_tag_invocable); - (void) tag_invoke(set_error_t{}, (_Receiver&&) __rcvr, (_Error&&) __err); + return tag_invoke(set_error_t{}, (_Receiver&&) __rcvr, (_Error&&) __err); } }; @@ -362,9 +364,10 @@ namespace stdexec { template requires tag_invocable STDEXEC_DETAIL_CUDACC_HOST_DEVICE // - void operator()(_Receiver&& __rcvr) const noexcept { + auto operator()(_Receiver&& __rcvr) const noexcept + -> tag_invoke_result_t { static_assert(nothrow_tag_invocable); - (void) tag_invoke(set_stopped_t{}, (_Receiver&&) __rcvr); + return tag_invoke(set_stopped_t{}, (_Receiver&&) __rcvr); } }; } // namespace __receivers @@ -1210,8 +1213,9 @@ namespace stdexec { struct start_t { template requires tag_invocable - void operator()(_Op& __op) const noexcept(nothrow_tag_invocable) { - (void) tag_invoke(start_t{}, __op); + auto operator()(_Op& __op) const noexcept(nothrow_tag_invocable) + -> tag_invoke_result_t { + return tag_invoke(start_t{}, __op); } }; } From ca7e6f2ac5911fba43b40c1301cce91553787507 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Tue, 1 Nov 2022 08:29:53 -0700 Subject: [PATCH 05/13] add __start_until_nullable --- include/exec/tail_sender.hpp | 192 ++++++++++++++++++++++++++++++--- test/exec/test_tail_sender.cpp | 124 ++++++++++++++++++++- 2 files changed, 301 insertions(+), 15 deletions(-) diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp index b7f80f391..93ab47b0f 100644 --- a/include/exec/tail_sender.hpp +++ b/include/exec/tail_sender.hpp @@ -21,17 +21,17 @@ namespace exec { using namespace stdexec; - template + template concept __contextually_convertible_to_bool = requires(const T c) { { (static_cast(c) ? false : false) } -> same_as; }; - template + template static constexpr bool __nothrow_contextually_convertible_to_bool_v = noexcept((std::declval() ? (void)0 : (void)0)); - template + template concept __nothrow_contextually_convertible_to_bool = __contextually_convertible_to_bool && __nothrow_contextually_convertible_to_bool_v; @@ -165,16 +165,25 @@ namespace exec { struct __null_tail_receiver { - void set_value() noexcept {} - void set_error(std::exception_ptr) noexcept {} - void set_done() noexcept {} + friend void tag_invoke(set_value_t, __null_tail_receiver&&, auto&&...) noexcept {} + friend void tag_invoke(set_stopped_t, __null_tail_receiver&&) noexcept {} + friend __empty_env tag_invoke(get_env_t, const __null_tail_receiver& __self) { + return {}; + } }; struct __null_tail_sender { struct op { + op() = default; + op(const op&) = delete; + op(op&&) = delete; + op& operator=(const op&) = delete; + op& operator=(op&&) = delete; + // this is a nullable_tail_sender that always returns false to prevent // callers from calling start() and unwind() inline constexpr operator bool() const noexcept { return false; } + friend void tag_invoke(start_t, op& self) noexcept { std::terminate(); } @@ -187,7 +196,7 @@ namespace exec { using completion_signatures = completion_signatures; template - friend auto tag_invoke(connect_t, __null_tail_sender&&, _TailReceiver&&) + friend auto tag_invoke(connect_t, __null_tail_sender&&, _TailReceiver&&) noexcept -> op { return {}; } @@ -207,17 +216,40 @@ namespace exec { template struct op { using op_t = connect_result_t<_TailSender, _TailReceiver>; - explicit op() {} - explicit op(_TailSender __t, _TailReceiver __r) : op_(stdexec::connect(__t, __r)) {} - operator bool() const noexcept { return !!op_ && !!*op_; } + op() = default; + op(const op&) = delete; + op(op&&) = delete; + op& operator=(const op&) = delete; + op& operator=(op&&) = delete; + + explicit op(_TailSender __t, _TailReceiver __r) + : op_(stdexec::__conv{ + [&] { + return stdexec::connect(__t, __r); + } + }) {} + operator bool() const noexcept { + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + return !!op_ && !!*op_; + } else { + return !!op_; + } + } + [[nodiscard]] friend auto tag_invoke(start_t, op& __self) noexcept { - if (!__self.op_ || !*__self.op_) { std::terminate(); } + if (!__self.op_) { std::terminate(); } + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + if (!*__self.op_) { std::terminate(); } + } return stdexec::start(*__self.op_); } friend void tag_invoke(unwind_t, op& __self) noexcept { - if (!__self.op_ || !*__self.op_) { std::terminate(); } + if (!__self.op_) { std::terminate(); } + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + if (!*__self.op_) { std::terminate(); } + } exec::unwind(*__self.op_); } std::optional op_; @@ -226,10 +258,11 @@ namespace exec { using completion_signatures = completion_signatures; template - friend auto tag_invoke(connect_t, maybe_tail_sender&& __self, _TailReceiver&& __r) + [[nodiscard]] + friend auto tag_invoke(connect_t, maybe_tail_sender&& __self, _TailReceiver&& __r) noexcept -> op<_TailReceiver> { if (!__self.tail_sender_) { return {}; } - return {((maybe_tail_sender&&)__self).tail_sender_, __r}; + return op<_TailReceiver>{*((maybe_tail_sender&&)__self).tail_sender_, __r}; } template @@ -241,4 +274,135 @@ namespace exec { private: std::optional<_TailSender> tail_sender_; }; + + template + struct scoped_tail_sender { + explicit scoped_tail_sender(_TailSender __t, _TailReceiver __r = _TailReceiver{}) noexcept + : t_(__t) + , r_(__r) + , valid_(true) {} + + scoped_tail_sender(scoped_tail_sender&& other) noexcept + : t_(other.s_) + , r_(other.r_) + , valid_(std::exchange(other.valid_, false)) {} + + ~scoped_tail_sender() { + if (valid_) { + auto op = stdexec::connect(t_, r_); + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + if (!!op) { + exec::unwind(op); + } + } else { + exec::unwind(op); + } + } + } + + _TailSender get() noexcept { return t_; } + + _TailSender release() noexcept { + valid_ = false; + return t_; + } + + private: + _TailSender t_; + _TailReceiver r_; + bool valid_; + }; + + template + auto __start_until_nullable(_TailSender __t, _TailReceiver __r) { + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + return __t; + } else if constexpr (__terminal_tail_sender_to<_TailSender, _TailReceiver>) { + // restrict scope of op + { + auto op = stdexec::connect(std::move(__t), std::move(__r)); + stdexec::start(op); + } + return __null_tail_sender{}; + } else { + auto op = stdexec::connect(std::move(__t), __r); + return __start_until_nullable(stdexec::start(op), std::move(__r)); + } + } + +#if 0 + template + auto __start_next(_Next next, _TailReceiver r) { + if constexpr (__one_of<_Next, _TailSender, _Prev...>) { + static_assert( + (nullable_tail_sender_to<_TailSender, _TailReceiver> || + (nullable_tail_sender_to<_Prev, _TailReceiver> || ...)), + "At least one tail_sender in a cycle must be nullable to avoid " + "entering an infinite loop"); + return __start_until_nullable(next, std::move(r)); + } else { + using result_type = + decltype(__start_sequential(next, r, type_list<_TailSender, _Prev...>{})); + if constexpr (same_as) { + // Let the loop in resume_tail_sender() handle checking the boolean. + return next; + } else { + return __start_sequential(next, std::move(r), type_list<_TailSender, _Prev...>{}); + } + } + } + + template + auto _start_sequential(_TailSender c, _TailReceiver r, type_list<_Prev...>) { + static_assert( + _tail_sender<_TailSender>, "_start_sequential: must be called with a tail_sender"); + if constexpr (_terminal_tail_sender_to<_TailSender, _TailReceiver>) { + if constexpr (nullable_tail_sender_to<_TailSender, _TailReceiver>) { + return c; + } else { + // restrict scope of op + { + auto op = unifex::connect(std::move(c), std::move(r)); + unifex::start(op); + } + return null_tail_sender{}; + } + } else { + using next_t = next_tail_sender_to_t<_TailSender, _TailReceiver>; + using result_type = decltype(_start_next( + std::declval(), r)); + if constexpr (std::is_void_v) { + // restrict scope of op + { + auto op = unifex::connect(std::move(c), std::move(r)); + unifex::start(op); + } + return null_tail_sender{}; + } else if constexpr (same_as) { + auto op = unifex::connect(std::move(c), std::move(r)); + return unifex::start(op); + } else if constexpr (nullable_tail_sender_to<_TailSender, _TailReceiver>) { + auto op = unifex::connect(std::move(c), r); + using result_type = variant_tail_sender< + null_tail_sender, + decltype(_start_next( + unifex::start(op), r))>; + if (!op) { + return result_type{null_tail_sender{}}; + } + return result_type{ + _start_next(unifex::start(op), r)}; + } else { + auto op = unifex::connect(std::move(c), r); + return _start_next(unifex::start(op), r); + } + } + } + + template // + auto _start_sequential(_TailSender c, _TailReceiver r) { + return _start_sequential(c, r, type_list<>{}); + } +#endif + } // namespace exec diff --git a/test/exec/test_tail_sender.cpp b/test/exec/test_tail_sender.cpp index b0951d413..cb82d865f 100644 --- a/test/exec/test_tail_sender.cpp +++ b/test/exec/test_tail_sender.cpp @@ -30,6 +30,13 @@ struct ATailSender { struct operation { Receiver rcvr_; + operation(Receiver __r) : rcvr_(__r) {} + + operation(const operation&) = delete; + operation(operation&&) = delete; + operation& operator=(const operation&) = delete; + operation& operator=(operation&&) = delete; + [[nodiscard]] friend auto tag_invoke(ex::start_t, operation& self) noexcept { return ex::set_value(std::move(self.rcvr_)); } @@ -40,7 +47,7 @@ struct ATailSender { }; template - friend auto tag_invoke(ex::connect_t, ATailSender&& self, Receiver&& rcvr) + friend auto tag_invoke(ex::connect_t, ATailSender self, Receiver&& rcvr) noexcept -> operation> { return {std::forward(rcvr)}; } @@ -52,17 +59,132 @@ struct ATailSender { } }; +struct ATailReceiver { + int* called; + friend void tag_invoke(ex::set_value_t, ATailReceiver&& __self, auto&&...) noexcept { ++*__self.called; } + friend void tag_invoke(ex::set_stopped_t, ATailReceiver&& __self) noexcept { ++*__self.called; } + friend ex::__debug_env_t tag_invoke(ex::get_env_t, const ATailReceiver&) { + return {{}}; + } +}; + + +template +struct ANestTailReceiver { + std::decay_t nested_tail_sender; + int* called; + [[nodiscard]] + friend std::decay_t tag_invoke(ex::set_value_t, ANestTailReceiver&& __self, auto&&...) noexcept { + ++*__self.called; + return __self.nested_tail_sender; + } + [[nodiscard]] + friend std::decay_t tag_invoke(ex::set_stopped_t, ANestTailReceiver&& __self) noexcept { + ++*__self.called; + return __self.nested_tail_sender; + } + friend ex::__debug_env_t tag_invoke(ex::get_env_t, const ANestTailReceiver&) { + return {{}}; + } +}; + TEST_CASE("Test ATailSender is a tail_sender", "[tail_sender]") { static_assert(exec::tail_sender); + static_assert(exec::__terminal_tail_sender_to); CHECK(exec::tail_sender); + CHECK(exec::__terminal_tail_sender_to); } TEST_CASE("Test __null_tail_sender is a tail_sender", "[tail_sender]") { static_assert(exec::tail_sender); + static_assert(exec::__terminal_tail_sender_to); CHECK(exec::tail_sender); + CHECK(exec::__terminal_tail_sender_to); } TEST_CASE("Test maybe_tail_sender is a tail_sender", "[tail_sender]") { static_assert(exec::tail_sender>); + static_assert(exec::__nullable_tail_sender_to, ATailReceiver>); CHECK(exec::tail_sender>); + CHECK(exec::__nullable_tail_sender_to, ATailReceiver>); +} + +TEST_CASE("Test scoped_tail_sender", "[tail_sender]") { + int called = 0; + { + exec::scoped_tail_sender exit{ATailSender{}, ATailReceiver{&called}}; + CHECK(called == 0); + } + CHECK(called == 1); +} + +TEST_CASE("Test __start_until_nullable()", "[tail_sender]") { + static_assert(exec::__terminal_tail_sender_to, ATailReceiver>); + CHECK(exec::__terminal_tail_sender_to, ATailReceiver>); + + int called = 0; + CHECK(called == 0); + exec::maybe_tail_sender maybe = + exec::__start_until_nullable(exec::maybe_tail_sender{}, ATailReceiver{&called}); + CHECK(called == 0); + auto op0 = ex::connect(std::move(maybe), ATailReceiver{&called}); + CHECK(called == 0); + CHECK(!op0); + + called = 0; + maybe = + exec::__start_until_nullable(exec::maybe_tail_sender{ATailSender{}}, ATailReceiver{&called}); + CHECK(called == 0); + auto op1 = ex::connect(std::move(maybe), ATailReceiver{&called}); + CHECK(called == 0); + CHECK(!!op1); + ex::start(op1); + CHECK(called == 1); + + called = 0; + maybe = + exec::__start_until_nullable( + ATailSender{}, + ANestTailReceiver>{ATailSender{}, &called}); + CHECK(called == 1); + auto op2 = ex::connect(std::move(maybe), ATailReceiver{&called}); + CHECK(called == 1); + CHECK(!!op2); + ex::start(op2); + CHECK(called == 2); +} + +#if 0 +TEST_CASE("Test __resume_until_nullable()", "[tail_sender]") { + int called = 0; + CHECK(called == 0); + exec::maybe_tail_sender maybe = + exec::__resume_until_nullable(exec::maybe_tail_sender{}, ATailReceiver{&called}); + CHECK(called == 0); + auto op0 = ex::connect(std::move(maybe), ATailReceiver{&called}); + CHECK(called == 0); + CHECK(!op0); + + called = 0; + maybe = + exec::__resume_until_nullable(exec::maybe_tail_sender{ATailSender{}}, ATailReceiver{&called}); + CHECK(called == 0); + auto op1 = ex::connect(std::move(maybe), ATailReceiver{&called}); + CHECK(called == 0); + CHECK(!!op1); + ex::start(op1); + CHECK(called == 1); + + called = 0; + maybe = + exec::__resume_until_nullable( + ATailSender{}, + ANestTailReceiver>{ATailSender{}, &called}); + CHECK(called == 1); + auto op2 = ex::connect(std::move(maybe), ATailReceiver{&called}); + CHECK(called == 1); + CHECK(!!op2); + ex::start(op2); + CHECK(called == 2); } +#endif From e80f3a6488a889e5f5c759e324aa1366db12b841 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Wed, 2 Nov 2022 08:33:55 -0700 Subject: [PATCH 06/13] add __start_sequential --- include/exec/tail_sender.hpp | 155 +++++++++++++++++++-------------- test/exec/test_tail_sender.cpp | 72 ++++++++++----- 2 files changed, 141 insertions(+), 86 deletions(-) diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp index 93ab47b0f..180cb0e65 100644 --- a/include/exec/tail_sender.hpp +++ b/include/exec/tail_sender.hpp @@ -209,6 +209,7 @@ namespace exec { }; template + requires (!same_as<__null_tail_sender, _TailSender>) struct maybe_tail_sender { maybe_tail_sender() noexcept = default; maybe_tail_sender(__null_tail_sender) noexcept {} @@ -313,96 +314,124 @@ namespace exec { bool valid_; }; - template - auto __start_until_nullable(_TailSender __t, _TailReceiver __r) { - if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - return __t; - } else if constexpr (__terminal_tail_sender_to<_TailSender, _TailReceiver>) { - // restrict scope of op - { - auto op = stdexec::connect(std::move(__t), std::move(__r)); - stdexec::start(op); + namespace __start_until_nullable_ { + + struct __start_until_nullable_t; + + template + struct __start_until_nullable_result; + + template + using __start_until_nullable_result_t = typename __start_until_nullable_result<_TailSender, _TailReceiver>::type; + + template + struct __start_until_nullable_result { + using type = + __if< + __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, _TailSender, + __if< + __bool<__terminal_tail_sender_to<_TailSender, _TailReceiver>>, __null_tail_sender, + __minvoke<__with_default<__q<__start_until_nullable_result_t>, __null_tail_sender>, + __next_tail_from_sender_to_t<_TailSender, _TailReceiver>, + _TailReceiver> + > + >; + }; + + struct __start_until_nullable_t { + template + auto operator()(_TailSender __t, _TailReceiver __r) const noexcept + -> __start_until_nullable_result_t<_TailSender, _TailReceiver> { + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + return __t; + } else if constexpr (__terminal_tail_sender_to<_TailSender, _TailReceiver>) { + // restrict scope of op + { + auto op = stdexec::connect(std::move(__t), std::move(__r)); + stdexec::start(op); + } + return __null_tail_sender{}; + } else { + auto op = stdexec::connect(std::move(__t), __r); + return __start_until_nullable_t{}(stdexec::start(op), std::move(__r)); + } } - return __null_tail_sender{}; - } else { - auto op = stdexec::connect(std::move(__t), __r); - return __start_until_nullable(stdexec::start(op), std::move(__r)); - } - } + }; + + } // namespace __start_until_nullable_ + using __start_until_nullable_::__start_until_nullable_t; + inline constexpr __start_until_nullable_t __start_until_nullable{}; -#if 0 - template - auto __start_next(_Next next, _TailReceiver r) { - if constexpr (__one_of<_Next, _TailSender, _Prev...>) { + template + auto __start_next(_NextTailSender __next, _TailReceiver __r) { + if constexpr (__one_of<_NextTailSender, _TailSender, _PrevTailSenders...>) { static_assert( - (nullable_tail_sender_to<_TailSender, _TailReceiver> || - (nullable_tail_sender_to<_Prev, _TailReceiver> || ...)), + (__nullable_tail_sender_to<_TailSender, _TailReceiver> || + (__nullable_tail_sender_to<_PrevTailSenders, _TailReceiver> || ...)), "At least one tail_sender in a cycle must be nullable to avoid " "entering an infinite loop"); - return __start_until_nullable(next, std::move(r)); + return __start_until_nullable(__next, std::move(__r)); } else { using result_type = - decltype(__start_sequential(next, r, type_list<_TailSender, _Prev...>{})); - if constexpr (same_as) { + decltype(__start_sequential<_NextTailSender, _TailReceiver, _TailSender, _PrevTailSenders...>(__next, __r)); + if constexpr (same_as) { // Let the loop in resume_tail_sender() handle checking the boolean. - return next; + return __next; } else { - return __start_sequential(next, std::move(r), type_list<_TailSender, _Prev...>{}); + return __start_sequential<_NextTailSender, _TailReceiver, _TailSender, _PrevTailSenders...>(__next, std::move(__r)); } } } - template - auto _start_sequential(_TailSender c, _TailReceiver r, type_list<_Prev...>) { - static_assert( - _tail_sender<_TailSender>, "_start_sequential: must be called with a tail_sender"); - if constexpr (_terminal_tail_sender_to<_TailSender, _TailReceiver>) { - if constexpr (nullable_tail_sender_to<_TailSender, _TailReceiver>) { + template + auto __start_sequential(_TailSender c, _TailReceiver r) { + if constexpr (__terminal_tail_sender_to<_TailSender, _TailReceiver>) { + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { return c; } else { // restrict scope of op { - auto op = unifex::connect(std::move(c), std::move(r)); - unifex::start(op); + auto op = stdexec::connect(std::move(c), std::move(r)); + stdexec::start(op); } - return null_tail_sender{}; + return __null_tail_sender{}; } } else { - using next_t = next_tail_sender_to_t<_TailSender, _TailReceiver>; - using result_type = decltype(_start_next( + using next_t = __next_tail_from_sender_to_t<_TailSender, _TailReceiver>; + using result_type = decltype(__start_next( std::declval(), r)); - if constexpr (std::is_void_v) { - // restrict scope of op - { - auto op = unifex::connect(std::move(c), std::move(r)); - unifex::start(op); + if constexpr (same_as) { + static_assert(__nullable_tail_sender_to<_TailSender, _TailReceiver>, "recursing tail_sender must be nullable"); + auto op = stdexec::connect(std::move(c), std::move(r)); + // using result_type = variant_tail_sender< + // __null_tail_sender, + // next_t>; + if (!op) { + return //result_type{ + next_t{};//__null_tail_sender{}}; } - return null_tail_sender{}; - } else if constexpr (same_as) { - auto op = unifex::connect(std::move(c), std::move(r)); - return unifex::start(op); - } else if constexpr (nullable_tail_sender_to<_TailSender, _TailReceiver>) { - auto op = unifex::connect(std::move(c), r); - using result_type = variant_tail_sender< - null_tail_sender, - decltype(_start_next( - unifex::start(op), r))>; + return //result_type{ + stdexec::start(op);//}; + } else if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + auto op = stdexec::connect(std::move(c), r); + // using result_type = variant_tail_sender< + // __null_tail_sender, + // decltype(_start_next( + // stdexec::start(op), r))>; + // if (!op) { + // return result_type{__null_tail_sender{}}; + // } if (!op) { - return result_type{null_tail_sender{}}; + return //result_type{ + __null_tail_sender{};//}; } - return result_type{ - _start_next(unifex::start(op), r)}; + return //result_type{ + __start_next(stdexec::start(op), r);//}; } else { - auto op = unifex::connect(std::move(c), r); - return _start_next(unifex::start(op), r); + auto op = stdexec::connect(std::move(c), r); + return __start_next(stdexec::start(op), r); } } } - template // - auto _start_sequential(_TailSender c, _TailReceiver r) { - return _start_sequential(c, r, type_list<>{}); - } -#endif - } // namespace exec diff --git a/test/exec/test_tail_sender.cpp b/test/exec/test_tail_sender.cpp index cb82d865f..cea8a9ef1 100644 --- a/test/exec/test_tail_sender.cpp +++ b/test/exec/test_tail_sender.cpp @@ -98,8 +98,10 @@ TEST_CASE("Test ATailSender is a tail_sender", "[tail_sender]") { TEST_CASE("Test __null_tail_sender is a tail_sender", "[tail_sender]") { static_assert(exec::tail_sender); static_assert(exec::__terminal_tail_sender_to); + static_assert(exec::__nullable_tail_sender_to); CHECK(exec::tail_sender); CHECK(exec::__terminal_tail_sender_to); + CHECK(exec::__nullable_tail_sender_to); } TEST_CASE("Test maybe_tail_sender is a tail_sender", "[tail_sender]") { @@ -119,9 +121,9 @@ TEST_CASE("Test scoped_tail_sender", "[tail_sender]") { } TEST_CASE("Test __start_until_nullable()", "[tail_sender]") { - static_assert(exec::__terminal_tail_sender_to, ATailReceiver>); - CHECK(exec::__terminal_tail_sender_to, ATailReceiver>); + // return sender arg when it is nullable + // an empty maybe_tail_sender arg is empty when returned int called = 0; CHECK(called == 0); exec::maybe_tail_sender maybe = @@ -131,6 +133,8 @@ TEST_CASE("Test __start_until_nullable()", "[tail_sender]") { CHECK(called == 0); CHECK(!op0); + // return sender arg when it is nullable + // a valid maybe_tail_sender arg is valid when returned called = 0; maybe = exec::__start_until_nullable(exec::maybe_tail_sender{ATailSender{}}, ATailReceiver{&called}); @@ -141,6 +145,7 @@ TEST_CASE("Test __start_until_nullable()", "[tail_sender]") { ex::start(op1); CHECK(called == 1); + // return the nullable sender that was passed through set_value and start called = 0; maybe = exec::__start_until_nullable( @@ -154,20 +159,12 @@ TEST_CASE("Test __start_until_nullable()", "[tail_sender]") { CHECK(called == 2); } -#if 0 -TEST_CASE("Test __resume_until_nullable()", "[tail_sender]") { +TEST_CASE("Test __start_next()", "[tail_sender]") { int called = 0; CHECK(called == 0); - exec::maybe_tail_sender maybe = - exec::__resume_until_nullable(exec::maybe_tail_sender{}, ATailReceiver{&called}); - CHECK(called == 0); - auto op0 = ex::connect(std::move(maybe), ATailReceiver{&called}); - CHECK(called == 0); - CHECK(!op0); - - called = 0; - maybe = - exec::__resume_until_nullable(exec::maybe_tail_sender{ATailSender{}}, ATailReceiver{&called}); + auto maybe = exec::__start_next, ATailSender>( + exec::maybe_tail_sender{ATailSender{}}, + ANestTailReceiver>{ATailSender{}, &called}); CHECK(called == 0); auto op1 = ex::connect(std::move(maybe), ATailReceiver{&called}); CHECK(called == 0); @@ -175,16 +172,45 @@ TEST_CASE("Test __resume_until_nullable()", "[tail_sender]") { ex::start(op1); CHECK(called == 1); - called = 0; - maybe = - exec::__resume_until_nullable( - ATailSender{}, - ANestTailReceiver>{ATailSender{}, &called}); + // static_assert that this is infinite.. + // exec::__start_next(ATailSender{ATailSender{}}, + // ANestTailReceiver{ATailSender{}, &called}); +} + +TEST_CASE("Test __start_sequential()", "[tail_sender]") { + int called = 0; + CHECK(called == 0); + exec::maybe_tail_sender maybe = exec::__start_sequential( + exec::maybe_tail_sender{ATailSender{}}, + ANestTailReceiver>{ATailSender{}, &called}); CHECK(called == 1); - auto op2 = ex::connect(std::move(maybe), ATailReceiver{&called}); + auto op1 = ex::connect(std::move(maybe), ATailReceiver{&called}); CHECK(called == 1); - CHECK(!!op2); - ex::start(op2); + CHECK(!!op1); + ex::start(op1); CHECK(called == 2); + + called = 0; + CHECK(called == 0); + maybe = exec::__start_sequential( + exec::maybe_tail_sender{}, + ANestTailReceiver>{ATailSender{}, &called}); + CHECK(called == 0); + auto op2 = ex::connect(std::move(maybe), ATailReceiver{&called}); + CHECK(called == 0); + CHECK(!op2); + + called = 0; + CHECK(called == 0); + exec::__null_tail_sender empty = exec::__start_sequential( + exec::__null_tail_sender{}, + ANestTailReceiver{exec::__null_tail_sender{}, &called}); + CHECK(called == 0); + auto op3 = ex::connect(std::move(empty), ATailReceiver{&called}); + CHECK(called == 0); + CHECK(!op3); + + // static_assert that this is infinite.. + // exec::__start_next(ATailSender{ATailSender{}}, + // ANestTailReceiver{ATailSender{}, &called}); } -#endif From 6af616a64cf9baf4e9b506df050757a1a580cb7c Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Sat, 5 Nov 2022 13:23:10 -0700 Subject: [PATCH 07/13] add variant_tail_sender --- include/exec/variant_tail_sender.hpp | 242 +++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 include/exec/variant_tail_sender.hpp diff --git a/include/exec/variant_tail_sender.hpp b/include/exec/variant_tail_sender.hpp new file mode 100644 index 000000000..e75a75a6a --- /dev/null +++ b/include/exec/variant_tail_sender.hpp @@ -0,0 +1,242 @@ +/* + * Copyright (c) 2021-2022 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../stdexec/execution.hpp" +#include "env.hpp" + +#include + +namespace exec { + using namespace stdexec; + + template + struct __variant_tail_sender : private std::variant<_TailSenderN...> { + static_assert(sizeof...(_TailSenderN) >= 1, "variant_tail_sender requires at least one sender"); + static_assert((tail_sender<_TailSenderN> && ...), "variant_tail_sender requires all senders to be tail_sender"); + + using __senders_t = std::variant<_TailSenderN...>; + + using __senders_t::__senders_t; + using __senders_t::operator=; + using __senders_t::index; + using __senders_t::emplace; + using __senders_t::swap; + + // __variant_tail_sender() = default; + // template + // __variant_tail_sender(const __variant_tail_sender<_OtherTailSenders...>& __o) + // : __senders_t(static_cast&>(__o)) {} + // template + // __variant_tail_sender(__variant_tail_sender<_OtherTailSenders...>&& __o) + // : __senders_t(static_cast&&>(__o)) {} + // template + // __variant_tail_sender& operator=(const __variant_tail_sender<_OtherTailSenders...>& __o) { + // __senders_t::operator=(__o); + // return *this; + // } + // template + // __variant_tail_sender& operator=(__variant_tail_sender<_OtherTailSenders...>&& __o) { + // __senders_t::operator=(std::move(__o)); + // return *this; + // } + + // template + // requires (__is_not_instance_of<_An, __variant_tail_sender> && ...) + // __variant_tail_sender(_An&&... __an) : __senders_t((_An&&)__an...) {} + // template + // requires __is_not_instance_of<_A, __variant_tail_sender> + // __variant_tail_sender& operator=(_A&& __a) { + // __senders_t::operator=((_A&&)__a); + // return *this; + // } + + template + struct op { + using __opn_t = __variant>...>; + using __start_result_t = __variant_tail_sender...>; + + op(const op&) = delete; + op(op&&) = delete; + op& operator=(const op&) = delete; + op& operator=(op&&) = delete; + + explicit op(__senders_t&& __t, _TailReceiver __r) { + std::visit([&, this](auto&& __t) -> void { + using _T = std::remove_cvref_t; + if constexpr (tail_sender<_T>) { + static_assert(tail_sender_to<_T, _TailReceiver>, "variant-tail-sender member cannot connect"); + using op_t = connect_result_t<_T, _TailReceiver>; + using opt_t = std::optional; + __opn_.template emplace(); + opt_t& opt = std::get(__opn_); + opt.~opt_t(); + new (&opt) opt_t{stdexec::__conv{ + [&] () -> op_t { + return stdexec::connect((decltype(__t)&&)__t, __r); + } + }}; + } else { + std::terminate(); + } + }, (__senders_t&&)__t); + } + + operator bool() const noexcept { + return std::visit([&](auto&& __op) -> bool { + using _Opt = std::decay_t; + if constexpr (__is_instance_of_<_Opt, std::optional>) { + auto& op = *__op; + using _Op = std::decay_t; + if constexpr (__nullable_tail_operation_state<_Op>) { + return !!op; + } + return true; + } else { + std::terminate(); + } + }, __opn_); + } + + [[nodiscard]] + friend __start_result_t tag_invoke(start_t, op& __self) noexcept { + return std::visit([&](auto&& __op) -> __start_result_t { + using _Opt = std::decay_t; + if constexpr (__is_instance_of_<_Opt, std::optional>) { + auto& op = *__op; + using _Op = std::decay_t; + if constexpr (__nullable_tail_operation_state<_Op>) { + if (!op) { + return __start_result_t{}; + } + } + if constexpr (__terminal_tail_operation_state<_Op>) { + stdexec::start(op); + return __start_result_t{}; + } else { + return result_from<__start_result_t>(stdexec::start(op)); + } + } else { + std::terminate(); + } + }, __self.__opn_); + } + + friend void tag_invoke(unwind_t, op& __self) noexcept { + return std::visit([&](auto&& __op) -> void { + using _Opt = std::decay_t; + if constexpr (__is_instance_of_<_Opt, std::optional>) { + exec::unwind(*__op); + } else { + std::terminate(); + } + }, __self.__opn_); + } + __opn_t __opn_; + }; + + using completion_signatures = completion_signatures; + + template + [[nodiscard]] + friend auto tag_invoke(connect_t, __variant_tail_sender&& __self, _TailReceiver&& __r) noexcept + -> op> { + return op>{(__variant_tail_sender&&)__self, (_TailReceiver&&)__r}; + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, exec::c_t<__variant_tail_sender>, exec::c_t<_Env>) noexcept { + return true; + } + + private: + template + friend struct __variant_tail_sender; + + template + friend constexpr _To variant_cast(__variant_tail_sender __f) noexcept { + return std::visit([](_U&& __u) -> _To { + if constexpr (stdexec::__v<__mapply<__contains<_U>, _To>>) { + return _To{(_U&&) __u}; + } else { + printf("variant_cast\n"); fflush(stdout); + std::terminate(); + } + }, std::move(static_cast<__senders_t&>(__f))); + } + + template + friend constexpr std::decay_t<_To> get(__variant_tail_sender __f) noexcept { + static_assert(stdexec::__v<__mapply<__contains>, __variant_tail_sender>>, "get does not have _To as an alternative"); + if (!holds_alternative>(__f)) { + printf("get\n"); fflush(stdout); + std::terminate(); + } + return std::get>(std::move(static_cast<__senders_t&>(__f))); + } + + template< class T> + friend inline constexpr bool holds_alternative( const __variant_tail_sender& v ) noexcept { + return std::holds_alternative(v); + } + }; + + template class _T> + struct __mflattener_of { + + template + struct __push_back_flatten; + + template > + struct __mflatten { + template + using __f = + __mapply< + _Continuation, + __minvoke<__fold_right<__types<>, __push_back_flatten<__q<__types>>>, _Ts...>>; + }; + + template + struct __push_back_flatten { + + template + struct __f_; + template class _List, class... _ListItems, template class _Instance, class... _InstanceItems> + struct __f_, _Instance<_InstanceItems...>> { + using __t = __minvoke<__mflatten<_Continuation>, _ListItems..., _InstanceItems...>; + }; + template class _List, class... _ListItems, class _Item> + struct __f_, _Item> { + using __t = __minvoke<_Continuation, _ListItems..., _Item>; + }; + template + using __f = __t<__f_<__is_instance_of<_Item, _T>, _List, _Item>>; + }; + }; + + template + using variant_tail_sender = + __minvoke< + __if_c< + sizeof...(_TailSenderN) != 0, + __transform<__q, + __mflattener_of<__variant_tail_sender>::__mflatten< + __munique<__q<__variant_tail_sender>>>>, + __mconst<__not_a_variant>>, + _TailSenderN...>; + +} // namespace exec From 86b486af44df28623407f33a30a02297ea6e4cf6 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Sat, 5 Nov 2022 13:28:46 -0700 Subject: [PATCH 08/13] add resume_tail_senders_until_one_remaining --- include/exec/tail_sender.hpp | 408 +++++++++++++++++++++------------ test/exec/test_tail_sender.cpp | 91 +++++++- 2 files changed, 344 insertions(+), 155 deletions(-) diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp index 180cb0e65..2a97413af 100644 --- a/include/exec/tail_sender.hpp +++ b/include/exec/tail_sender.hpp @@ -28,8 +28,8 @@ namespace exec { }; template - static constexpr bool __nothrow_contextually_convertible_to_bool_v = - noexcept((std::declval() ? (void)0 : (void)0)); + static constexpr bool __nothrow_contextually_convertible_to_bool_v = + noexcept((std::declval() ? (void)0 : (void)0)); template concept __nothrow_contextually_convertible_to_bool = @@ -39,10 +39,10 @@ namespace exec { namespace __unwind { struct unwind_t { template - requires std::tag_invocable - void operator()(_Op& __op) const noexcept(std::nothrow_tag_invocable) { - (void) tag_invoke(unwind_t{}, __op); - } + requires std::tag_invocable + void operator()(_Op& __op) const noexcept(std::nothrow_tag_invocable) { + (void) tag_invoke(unwind_t{}, __op); + } }; } using __unwind::unwind_t; @@ -63,19 +63,19 @@ namespace exec { struct always_completes_inline_t { template - requires std::tag_invocable, __cref_t<_Env>> - constexpr bool operator()(_Sender&& __s, _Env&& __e) const noexcept { - static_assert(same_as, __cref_t<_Env>>>); - static_assert(std::nothrow_tag_invocable, __cref_t<_Env>>); - return tag_invoke(always_completes_inline_t{}, std::as_const(__s), std::as_const(__e)); - } + requires std::tag_invocable, __cref_t<_Env>> + constexpr bool operator()(_Sender&& __s, _Env&& __e) const noexcept { + static_assert(same_as, __cref_t<_Env>>>); + static_assert(std::nothrow_tag_invocable, __cref_t<_Env>>); + return tag_invoke(always_completes_inline_t{}, std::as_const(__s), std::as_const(__e)); + } template - requires std::tag_invocable, c_t<_Env>> - constexpr bool operator()(c_t<_Sender>&& __s, c_t<_Env>&& __e) const noexcept { - static_assert(same_as, c_t<_Env>>>); - static_assert(std::nothrow_tag_invocable, c_t<_Env>>); - return tag_invoke(always_completes_inline_t{}, __s, __e); - } + requires std::tag_invocable>, c_t>> + constexpr bool operator()(c_t>&& __s, c_t>&& __e) const noexcept { + static_assert(same_as>, c_t>>>); + static_assert(std::nothrow_tag_invocable>, c_t>>); + return tag_invoke(always_completes_inline_t{}, __s, __e); + } constexpr bool operator()(auto&&, auto&&) const noexcept { return false; } @@ -99,7 +99,7 @@ namespace exec { template constexpr bool always_completes_inline_v = - always_completes_inline(c_v<_Sender>, c_v<_Env>); + always_completes_inline(c_v>, c_v>); template concept tail_sender = @@ -120,17 +120,68 @@ namespace exec { std::is_nothrow_destructible_v<_TailReceiver> && std::is_trivially_destructible_v<_TailReceiver>; + struct __null_tail_receiver { + friend void tag_invoke(set_value_t, __null_tail_receiver&&, auto&&...) noexcept {} + friend void tag_invoke(set_stopped_t, __null_tail_receiver&&) noexcept {} + friend __empty_env tag_invoke(get_env_t, const __null_tail_receiver& __self) { + return {}; + } + }; + + struct __null_tail_sender { + struct op : __immovable { + op() = default; + + // this is a nullable_tail_sender that always returns false to prevent + // callers from calling start() and unwind() + inline constexpr operator bool() const noexcept { return false; } + + friend void tag_invoke(start_t, op& self) noexcept { + printf("__null_tail_sender start\n"); fflush(stdout); + std::terminate(); + } + + friend void tag_invoke(unwind_t, op& self) noexcept { + printf("__null_tail_sender unwind\n"); fflush(stdout); + std::terminate(); + } + }; + + using completion_signatures = completion_signatures; + + template + friend auto tag_invoke(connect_t, __null_tail_sender&&, _TailReceiver&&) noexcept + -> op { + return {}; + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, exec::c_t<__null_tail_sender>, exec::c_t<_Env>) noexcept { + return true; + } + }; + + template + struct __variant_tail_sender; + template using __next_tail_from_operation_t = __call_result_t; + template + using next_tail_from_operation_t = + __if<__bool>>, + __null_tail_sender, + __call_result_t>; + template using __next_tail_from_sender_to_t = __next_tail_from_operation_t>; - template - concept __tail_sender_or_void = - same_as<_TailSender, void> || tail_sender<_TailSender, _Env>; + template + using next_tail_from_sender_to_t = + next_tail_from_operation_t>; template concept tail_sender_to = @@ -140,7 +191,12 @@ namespace exec { { stdexec::connect((_TailSender&&) __s, (_TailReceiver&&) __r) } noexcept -> tail_operation_state; } && - __tail_sender_or_void<__next_tail_from_sender_to_t<_TailSender, _TailReceiver>>; + tail_sender>; + + template + concept __terminal_tail_operation_state = + tail_operation_state<_TailOperationState> && + same_as<__next_tail_from_operation_t<_TailOperationState>, void>; template concept __terminal_tail_sender_to = @@ -151,7 +207,7 @@ namespace exec { concept __recursive_tail_sender_to = tail_sender_to<_TailSender, _TailReceiver> && tail_operation_state> && - __one_of<__next_tail_from_sender_to_t<_TailSender, _TailReceiver>, _ValidTailSender...>; + __one_of, _ValidTailSender...>; template concept __nullable_tail_operation_state = @@ -163,50 +219,31 @@ namespace exec { tail_sender_to<_TailSender, _TailReceiver> && __nullable_tail_operation_state>; +} // namespace exec - struct __null_tail_receiver { - friend void tag_invoke(set_value_t, __null_tail_receiver&&, auto&&...) noexcept {} - friend void tag_invoke(set_stopped_t, __null_tail_receiver&&) noexcept {} - friend __empty_env tag_invoke(get_env_t, const __null_tail_receiver& __self) { - return {}; - } - }; - struct __null_tail_sender { - struct op { - op() = default; - op(const op&) = delete; - op(op&&) = delete; - op& operator=(const op&) = delete; - op& operator=(op&&) = delete; - // this is a nullable_tail_sender that always returns false to prevent - // callers from calling start() and unwind() - inline constexpr operator bool() const noexcept { return false; } +#include "variant_tail_sender.hpp" - friend void tag_invoke(start_t, op& self) noexcept { - std::terminate(); - } - friend void tag_invoke(unwind_t, op& self) noexcept { - std::terminate(); - } - }; - using completion_signatures = completion_signatures; - - template - friend auto tag_invoke(connect_t, __null_tail_sender&&, _TailReceiver&&) noexcept - -> op { - return {}; - } +namespace exec { - template - friend constexpr bool tag_invoke( - exec::always_completes_inline_t, exec::c_t<__null_tail_sender>, exec::c_t<_Env>) noexcept { - return true; + template + constexpr std::decay_t<_To> result_from(_From&& __f) noexcept { + if constexpr ( + __is_instance_of, __variant_tail_sender> && + __is_instance_of, __variant_tail_sender>) { + return variant_cast>((_From&&)__f); + } else if constexpr ( + __is_instance_of, __variant_tail_sender> && + !__is_instance_of, __variant_tail_sender>) { + return get>((_From&&)__f); + } else { + static_assert(std::is_constructible_v<_To, _From>, "result_from cannot convert"); + return (_From&&)__f; + } } - }; template requires (!same_as<__null_tail_sender, _TailSender>) @@ -215,13 +252,9 @@ namespace exec { maybe_tail_sender(__null_tail_sender) noexcept {} maybe_tail_sender(_TailSender __t) noexcept : tail_sender_(__t) {} template - struct op { + struct op : __immovable { using op_t = connect_result_t<_TailSender, _TailReceiver>; op() = default; - op(const op&) = delete; - op(op&&) = delete; - op& operator=(const op&) = delete; - op& operator=(op&&) = delete; explicit op(_TailSender __t, _TailReceiver __r) : op_(stdexec::__conv{ @@ -239,17 +272,29 @@ namespace exec { [[nodiscard]] friend auto tag_invoke(start_t, op& __self) noexcept { - if (!__self.op_) { std::terminate(); } + if (!__self.op_) { + printf("maybe_tail_sender start optional\n"); fflush(stdout); + std::terminate(); + } if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - if (!*__self.op_) { std::terminate(); } + if (!*__self.op_) { + printf("maybe_tail_sender start nullable\n"); fflush(stdout); + std::terminate(); + } } return stdexec::start(*__self.op_); } friend void tag_invoke(unwind_t, op& __self) noexcept { - if (!__self.op_) { std::terminate(); } + if (!__self.op_) { + printf("maybe_tail_sender unwind optional\n"); fflush(stdout); + std::terminate(); + } if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - if (!*__self.op_) { std::terminate(); } + if (!*__self.op_) { + printf("maybe_tail_sender unwind nullable\n"); fflush(stdout); + std::terminate(); + } } exec::unwind(*__self.op_); } @@ -260,17 +305,17 @@ namespace exec { template [[nodiscard]] - friend auto tag_invoke(connect_t, maybe_tail_sender&& __self, _TailReceiver&& __r) noexcept - -> op<_TailReceiver> { - if (!__self.tail_sender_) { return {}; } - return op<_TailReceiver>{*((maybe_tail_sender&&)__self).tail_sender_, __r}; - } + friend auto tag_invoke(connect_t, maybe_tail_sender&& __self, _TailReceiver&& __r) noexcept + -> op> { + if (!__self.tail_sender_) { return {}; } + return op>{*((maybe_tail_sender&&)__self).tail_sender_, __r}; + } template - friend constexpr bool tag_invoke( + friend constexpr bool tag_invoke( exec::always_completes_inline_t, exec::c_t, exec::c_t<_Env>) noexcept { - return true; - } + return true; + } private: std::optional<_TailSender> tail_sender_; @@ -314,29 +359,47 @@ namespace exec { bool valid_; }; + struct __all_resumed_tail_sender { + + using completion_signatures = completion_signatures; + + template + friend auto tag_invoke(connect_t, __all_resumed_tail_sender&&, _TailReceiver&& __r) noexcept + -> __call_result_t { + return stdexec::connect(__null_tail_sender{}, __r); + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, exec::c_t<__all_resumed_tail_sender>, exec::c_t<_Env>) noexcept { + return true; + } + }; + namespace __start_until_nullable_ { struct __start_until_nullable_t; template - struct __start_until_nullable_result; + struct __start_until_nullable_result; template - using __start_until_nullable_result_t = typename __start_until_nullable_result<_TailSender, _TailReceiver>::type; + using __start_until_nullable_result_t = + typename __start_until_nullable_result<_TailSender, _TailReceiver>::type; template - struct __start_until_nullable_result { - using type = - __if< - __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, _TailSender, + struct __start_until_nullable_result { + using type = __if< - __bool<__terminal_tail_sender_to<_TailSender, _TailReceiver>>, __null_tail_sender, - __minvoke<__with_default<__q<__start_until_nullable_result_t>, __null_tail_sender>, - __next_tail_from_sender_to_t<_TailSender, _TailReceiver>, - _TailReceiver> - > - >; - }; + __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, _TailSender, + __if< + __bool<__terminal_tail_sender_to<_TailSender, _TailReceiver>>, __all_resumed_tail_sender, + __minvoke<__with_default<__q<__start_until_nullable_result_t>, __all_resumed_tail_sender>, + __next_tail_from_sender_to_t<_TailSender, _TailReceiver>, + _TailReceiver> + > + >; + }; struct __start_until_nullable_t { template @@ -350,7 +413,7 @@ namespace exec { auto op = stdexec::connect(std::move(__t), std::move(__r)); stdexec::start(op); } - return __null_tail_sender{}; + return __all_resumed_tail_sender{}; } else { auto op = stdexec::connect(std::move(__t), __r); return __start_until_nullable_t{}(stdexec::start(op), std::move(__r)); @@ -363,7 +426,45 @@ namespace exec { inline constexpr __start_until_nullable_t __start_until_nullable{}; template - auto __start_next(_NextTailSender __next, _TailReceiver __r) { + auto __start_next(_NextTailSender __next, _TailReceiver __r) noexcept; + + template + struct __start_sequential_result; + + template + using __start_sequential_result_t = typename __start_sequential_result<_TailSender, _TailReceiver>::type; + + template + struct __start_sequential_result { + using next_t = next_tail_from_sender_to_t<_TailSender, _TailReceiver>; + using start_next_t = __call_result_t< + decltype(__start_next), + next_t, + _TailReceiver>; + using type = + __if< + __bool< + __nullable_tail_sender_to<_TailSender, _TailReceiver> && + __terminal_tail_sender_to<_TailSender, _TailReceiver>>, + _TailSender, + __if< // elseif + __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, + variant_tail_sender<__all_resumed_tail_sender, start_next_t>, + __if< // elseif + __bool>, + start_next_t, + __all_resumed_tail_sender // else + > + > + >; + }; + + template + auto __start_sequential(_TailSender c, _TailReceiver r) noexcept + -> __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...>; + + template + auto __start_next(_NextTailSender __next, _TailReceiver __r) noexcept { if constexpr (__one_of<_NextTailSender, _TailSender, _PrevTailSenders...>) { static_assert( (__nullable_tail_sender_to<_TailSender, _TailReceiver> || @@ -372,66 +473,85 @@ namespace exec { "entering an infinite loop"); return __start_until_nullable(__next, std::move(__r)); } else { - using result_type = - decltype(__start_sequential<_NextTailSender, _TailReceiver, _TailSender, _PrevTailSenders...>(__next, __r)); - if constexpr (same_as) { - // Let the loop in resume_tail_sender() handle checking the boolean. - return __next; - } else { - return __start_sequential<_NextTailSender, _TailReceiver, _TailSender, _PrevTailSenders...>(__next, std::move(__r)); - } + return __start_sequential<_NextTailSender, _TailReceiver, _TailSender, _PrevTailSenders...>(__next, std::move(__r)); } } template - auto __start_sequential(_TailSender c, _TailReceiver r) { - if constexpr (__terminal_tail_sender_to<_TailSender, _TailReceiver>) { - if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - return c; - } else { - // restrict scope of op - { - auto op = stdexec::connect(std::move(c), std::move(r)); - stdexec::start(op); - } - return __null_tail_sender{}; + auto __start_sequential(_TailSender c, _TailReceiver r) noexcept + -> __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...> { + using next_t = next_tail_from_sender_to_t<_TailSender, _TailReceiver>; + using result_t = __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...>; + + if constexpr ( + __nullable_tail_sender_to<_TailSender, _TailReceiver> && + __terminal_tail_sender_to<_TailSender, _TailReceiver>) { + // halt the recursion + return c; + } else if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + // recurse if the nullable tail-sender is valid otherwise return + // a nullable and terminal tail-sender + auto op = stdexec::connect(std::move(c), r); + if (!op) { + return __all_resumed_tail_sender{}; } + return result_from( + __start_next( + stdexec::start(op), r)); + } else if constexpr (!__terminal_tail_sender_to<_TailSender, _TailReceiver>) { + auto op = stdexec::connect(std::move(c), r); + return result_from( + __start_next( + stdexec::start(op), r)); } else { - using next_t = __next_tail_from_sender_to_t<_TailSender, _TailReceiver>; - using result_type = decltype(__start_next( - std::declval(), r)); - if constexpr (same_as) { - static_assert(__nullable_tail_sender_to<_TailSender, _TailReceiver>, "recursing tail_sender must be nullable"); - auto op = stdexec::connect(std::move(c), std::move(r)); - // using result_type = variant_tail_sender< - // __null_tail_sender, - // next_t>; - if (!op) { - return //result_type{ - next_t{};//__null_tail_sender{}}; - } - return //result_type{ - stdexec::start(op);//}; - } else if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - auto op = stdexec::connect(std::move(c), r); - // using result_type = variant_tail_sender< - // __null_tail_sender, - // decltype(_start_next( - // stdexec::start(op), r))>; - // if (!op) { - // return result_type{__null_tail_sender{}}; - // } - if (!op) { - return //result_type{ - __null_tail_sender{};//}; - } - return //result_type{ - __start_next(stdexec::start(op), r);//}; - } else { - auto op = stdexec::connect(std::move(c), r); - return __start_next(stdexec::start(op), r); + // run the terminal and not nullable tail-sender and return + // a nullable and terminal tail-sender + auto op = stdexec::connect(std::move(c), r); + stdexec::start(op); + return __all_resumed_tail_sender{}; + } + } + + + template + inline __null_tail_sender resume_tail_senders_until_one_remaining(_TailReceiver&&) noexcept { + return {}; + } + + template + C resume_tail_senders_until_one_remaining(_TailReceiver&&, C c) noexcept { + return c; + } + + template + auto _resume_tail_senders_until_one_remaining(_TailReceiver&& __r, std::index_sequence, Cs... cs) noexcept { + using result_type = variant_tail_sender; + result_type result; + + auto cs2_tuple = std::make_tuple( + variant_tail_sender< + __all_resumed_tail_sender, + decltype(__start_sequential(__start_sequential(cs, __r), __r))>{ + __start_sequential(__start_sequential(cs, __r), __r)}...); + while (true) { + std::size_t remaining = sizeof...(cs); + ((remaining > 1 ? + (!holds_alternative<__all_resumed_tail_sender>(std::get(cs2_tuple)) ? + (void)(result = result_from( + std::get(cs2_tuple) = result_from(cs2_tuple))>( + __start_sequential(std::get(cs2_tuple), __r)))) : + (void)--remaining) : + (void)(result = result_from(std::get(cs2_tuple)))), ...); + + if (remaining <= 1) { + return result; } } } + template + auto resume_tail_senders_until_one_remaining(_TailReceiver&& __r, Cs... cs) noexcept { + return _resume_tail_senders_until_one_remaining(__r, std::index_sequence_for{}, cs...); + } + } // namespace exec diff --git a/test/exec/test_tail_sender.cpp b/test/exec/test_tail_sender.cpp index cea8a9ef1..6c9a22dac 100644 --- a/test/exec/test_tail_sender.cpp +++ b/test/exec/test_tail_sender.cpp @@ -126,8 +126,10 @@ TEST_CASE("Test __start_until_nullable()", "[tail_sender]") { // an empty maybe_tail_sender arg is empty when returned int called = 0; CHECK(called == 0); - exec::maybe_tail_sender maybe = - exec::__start_until_nullable(exec::maybe_tail_sender{}, ATailReceiver{&called}); + exec::maybe_tail_sender + maybe = exec::__start_until_nullable( + exec::maybe_tail_sender{}, + ATailReceiver{&called}); CHECK(called == 0); auto op0 = ex::connect(std::move(maybe), ATailReceiver{&called}); CHECK(called == 0); @@ -162,15 +164,18 @@ TEST_CASE("Test __start_until_nullable()", "[tail_sender]") { TEST_CASE("Test __start_next()", "[tail_sender]") { int called = 0; CHECK(called == 0); - auto maybe = exec::__start_next, ATailSender>( - exec::maybe_tail_sender{ATailSender{}}, - ANestTailReceiver>{ATailSender{}, &called}); - CHECK(called == 0); + exec::__variant_tail_sender< + exec::__all_resumed_tail_sender, + exec::maybe_tail_sender> + maybe = exec::__start_next, ATailSender>( + exec::maybe_tail_sender{ATailSender{}}, + ANestTailReceiver>{ATailSender{}, &called}); + CHECK(called == 1); auto op1 = ex::connect(std::move(maybe), ATailReceiver{&called}); - CHECK(called == 0); + CHECK(called == 1); CHECK(!!op1); ex::start(op1); - CHECK(called == 1); + CHECK(called == 2); // static_assert that this is infinite.. // exec::__start_next(ATailSender{ATailSender{}}, @@ -180,9 +185,12 @@ TEST_CASE("Test __start_next()", "[tail_sender]") { TEST_CASE("Test __start_sequential()", "[tail_sender]") { int called = 0; CHECK(called == 0); - exec::maybe_tail_sender maybe = exec::__start_sequential( - exec::maybe_tail_sender{ATailSender{}}, - ANestTailReceiver>{ATailSender{}, &called}); + exec::__variant_tail_sender< + exec::__all_resumed_tail_sender, + exec::maybe_tail_sender> + maybe = exec::__start_sequential( + exec::maybe_tail_sender{ATailSender{}}, + ANestTailReceiver>{ATailSender{}, &called}); CHECK(called == 1); auto op1 = ex::connect(std::move(maybe), ATailReceiver{&called}); CHECK(called == 1); @@ -214,3 +222,64 @@ TEST_CASE("Test __start_sequential()", "[tail_sender]") { // exec::__start_next(ATailSender{ATailSender{}}, // ANestTailReceiver{ATailSender{}, &called}); } + +TEST_CASE("Test resume_tail_senders_until_one_remaining()", "[tail_sender]") { + int called = 0; + CHECK(called == 0); + exec::__variant_tail_sender< + exec::__all_resumed_tail_sender, + exec::__null_tail_sender, + exec::maybe_tail_sender> + maybe = exec::resume_tail_senders_until_one_remaining( + ANestTailReceiver>{ATailSender{}, &called}, + exec::maybe_tail_sender{}, + ATailSender{}, + exec::__null_tail_sender{} + ); + CHECK(called == 8); + auto op1 = ex::connect(std::move(maybe), ATailReceiver{&called}); + CHECK(called == 8); + CHECK(!!op1); + ex::start(op1); + CHECK(called == 9); + + called = 0; + CHECK(called == 0); + maybe = exec::resume_tail_senders_until_one_remaining( + ANestTailReceiver>{exec::maybe_tail_sender{}, &called}, + exec::maybe_tail_sender{ATailSender{}}, + ATailSender{}, + exec::__null_tail_sender{} + ); + CHECK(called == 2); + auto op2 = ex::connect(std::move(maybe), ATailReceiver{&called}); + CHECK(called == 2); + CHECK(!op2); + + called = 0; + CHECK(called == 0); + exec::__variant_tail_sender + empty = exec::resume_tail_senders_until_one_remaining( + ANestTailReceiver{exec::__null_tail_sender{}, &called}, + exec::maybe_tail_sender{ATailSender{}}, + ATailSender{}, + exec::__null_tail_sender{} + ); + CHECK(called == 2); + auto op3 = ex::connect(std::move(empty), ATailReceiver{&called}); + CHECK(called == 2); + CHECK(!op3); + + called = 0; + CHECK(called == 0); + empty = exec::resume_tail_senders_until_one_remaining( + ANestTailReceiver{exec::__null_tail_sender{}, &called}, + exec::maybe_tail_sender{}, + ATailSender{}, + exec::__null_tail_sender{} + ); + CHECK(called == 1); + auto op4 = ex::connect(std::move(empty), ATailReceiver{&called}); + CHECK(called == 1); + CHECK(!op4); +} From c71ebd0ba7d77ccfbe47479bbff40f0f0636a0b8 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Thu, 29 Dec 2022 21:20:52 -0800 Subject: [PATCH 09/13] fixes post rebase --- include/exec/variant_tail_sender.hpp | 2 +- test/exec/test_tail_sender.cpp | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/exec/variant_tail_sender.hpp b/include/exec/variant_tail_sender.hpp index e75a75a6a..2d82cd156 100644 --- a/include/exec/variant_tail_sender.hpp +++ b/include/exec/variant_tail_sender.hpp @@ -207,7 +207,7 @@ namespace exec { using __f = __mapply< _Continuation, - __minvoke<__fold_right<__types<>, __push_back_flatten<__q<__types>>>, _Ts...>>; + __minvoke<__mfold_right<__types<>, __push_back_flatten<__q<__types>>>, _Ts...>>; }; template diff --git a/test/exec/test_tail_sender.cpp b/test/exec/test_tail_sender.cpp index 6c9a22dac..7b0838f88 100644 --- a/test/exec/test_tail_sender.cpp +++ b/test/exec/test_tail_sender.cpp @@ -63,8 +63,8 @@ struct ATailReceiver { int* called; friend void tag_invoke(ex::set_value_t, ATailReceiver&& __self, auto&&...) noexcept { ++*__self.called; } friend void tag_invoke(ex::set_stopped_t, ATailReceiver&& __self) noexcept { ++*__self.called; } - friend ex::__debug_env_t tag_invoke(ex::get_env_t, const ATailReceiver&) { - return {{}}; + friend ex::__empty_env tag_invoke(ex::get_env_t, const ATailReceiver&) { + return {}; } }; @@ -83,8 +83,8 @@ struct ANestTailReceiver { ++*__self.called; return __self.nested_tail_sender; } - friend ex::__debug_env_t tag_invoke(ex::get_env_t, const ANestTailReceiver&) { - return {{}}; + friend ex::__empty_env tag_invoke(ex::get_env_t, const ANestTailReceiver&) { + return {}; } }; From d05357c45173b3dc825d8a9a1788fcdbca896799 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Mon, 6 Feb 2023 11:43:32 -0800 Subject: [PATCH 10/13] adding some tests --- include/exec/tail_sender.hpp | 435 +++++++++++++++++++++++++++++++++ test/exec/test_tail_sender.cpp | 74 +++++- 2 files changed, 507 insertions(+), 2 deletions(-) diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp index 2a97413af..187d8213b 100644 --- a/include/exec/tail_sender.hpp +++ b/include/exec/tail_sender.hpp @@ -554,4 +554,439 @@ namespace exec { return _resume_tail_senders_until_one_remaining(__r, std::index_sequence_for{}, cs...); } + template + void resume_tail_senders(_TailReceiver&& __r, Cs... cs) noexcept { + auto __last_tail = _resume_tail_senders_until_one_remaining(__r, std::index_sequence_for{}, cs...); + for(;;) { + auto __op = connect(__last_tail, __r); + if (!__op) { + return; + } + if constexpr (__terminal_tail_sender_to) { + start(__last_tail); + return; + } else { + __last_tail = __start_sequential(start(__last_tail), __r); + } + } + } + + ///////////////////////////////////////////////////////////////////////////// + // run_loop + namespace __loop { + class run_loop; + + struct __task : __immovable { + __task* __next_ = this; + union { + void (*__execute_)(__task*) noexcept; + __task* __tail_; + }; + + void __execute() noexcept { (*__execute_)(this); } + }; + + template + struct __operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t : __task { + using __id = __operation; + + run_loop* __loop_; + [[no_unique_address]] _Receiver __rcvr_; + + static void __execute_impl(__task* __p) noexcept { + auto& __rcvr = ((__t*) __p)->__rcvr_; + try { + if (get_stop_token(get_env(__rcvr)).stop_requested()) { + set_stopped((_Receiver&&) __rcvr); + } else { + set_value((_Receiver&&) __rcvr); + } + } catch(...) { + set_error((_Receiver&&) __rcvr, std::current_exception()); + } + } + + explicit __t(__task* __tail) noexcept + : __task{.__tail_ = __tail} {} + __t(__task* __next, run_loop* __loop, _Receiver __rcvr) + : __task{{}, __next, {&__execute_impl}} + , __loop_{__loop} + , __rcvr_{(_Receiver&&) __rcvr} {} + + friend void tag_invoke(start_t, __t& __self) noexcept { + __self.__start_(); + } + + void __start_() noexcept; + }; + }; + + template + struct __run_operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t { + using __id = __run_operation; + + run_loop* __loop_; + [[no_unique_address]] _Receiver __rcvr_; + + __t(run_loop* __loop, _Receiver __rcvr) + : __loop_{__loop} + , __rcvr_{(_Receiver&&) __rcvr} {} + + friend void tag_invoke(start_t, __t& __self) noexcept { + __self.__start_(); + } + + void __start_() noexcept; + }; + }; + + class run_loop { + template + using __completion_signatures_ = completion_signatures; + + template + friend struct __operation; + public: + struct __scheduler { + using __t = __scheduler; + using __id = __scheduler; + bool operator==(const __scheduler&) const noexcept = default; + + private: + struct __schedule_task { + using __t = __schedule_task; + using __id = __schedule_task; + using completion_signatures = + __completion_signatures_< + set_value_t(), + set_error_t(std::exception_ptr), + set_stopped_t()>; + + private: + friend __scheduler; + + template + using __operation = stdexec::__t<__operation>>; + + template + friend __operation<_Receiver> + tag_invoke(connect_t, const __schedule_task& __self, _Receiver __rcvr) { + return __self.__connect_((_Receiver &&) __rcvr); + } + + template + __operation<_Receiver> __connect_(_Receiver&& __rcvr) const { + return {&__loop_->__head_, __loop_, (_Receiver &&) __rcvr}; + } + + template + friend __scheduler + tag_invoke(get_completion_scheduler_t<_CPO>, const __schedule_task& __self) noexcept { + return __scheduler{__self.__loop_}; + } + + explicit __schedule_task(run_loop* __loop) noexcept + : __loop_(__loop) + {} + + run_loop* const __loop_; + }; + + friend run_loop; + + explicit __scheduler(run_loop* __loop) noexcept + : __loop_(__loop) {} + + friend __schedule_task tag_invoke(schedule_t, const __scheduler& __self) noexcept { + return __self.__schedule(); + } + + friend stdexec::forward_progress_guarantee tag_invoke( + get_forward_progress_guarantee_t, const __scheduler&) noexcept { + return stdexec::forward_progress_guarantee::parallel; + } + + // BUGBUG NOT TO SPEC + friend bool tag_invoke( + this_thread::execute_may_block_caller_t, const __scheduler&) noexcept { + return false; + } + + __schedule_task __schedule() const noexcept { + return __schedule_task{__loop_}; + } + + run_loop* __loop_; + }; + __scheduler get_scheduler() noexcept { + return __scheduler{this}; + } + + struct __run_sender { + using __t = __run_sender; + using __id = __run_sender; + using completion_signatures = + __completion_signatures_< + set_value_t(), + set_stopped_t()>; + + private: + friend __scheduler; + + template + using __operation = stdexec::__t<__operation>>; + + template + friend __operation<_Receiver> + tag_invoke(connect_t, const __run_sender& __self, _Receiver __rcvr) { + return __self.__connect_((_Receiver &&) __rcvr); + } + + template + __operation<_Receiver> __connect_(_Receiver&& __rcvr) const { + return {&__loop_->__head_, __loop_, (_Receiver &&) __rcvr}; + } + + template + friend __scheduler + tag_invoke(get_completion_scheduler_t<_CPO>, const __run_sender& __self) noexcept { + return __scheduler{__self.__loop_}; + } + + explicit __run_sender(run_loop* __loop) noexcept + : __loop_(__loop) + {} + + run_loop* __loop_; + }; + template + __run_sender run(_Sender&& __s); + + void run(); + + void finish(); + + private: + void __push_back_(__task* __task); + __task* __pop_front_(); + + std::mutex __mutex_; + std::condition_variable __cv_; + __task __head_{.__tail_ = &__head_}; + bool __stop_ = false; + }; + + template + inline void __operation<_ReceiverId>::__t::__start_() noexcept try { + __loop_->__push_back_(this); + } catch(...) { + set_error((_Receiver&&) __rcvr_, std::current_exception()); + } + + template + run_loop::__run_sender run_loop::run(_Sender&& __s) { + return {this, (_Sender &&)__s}; + } + + inline void run_loop::run() { + for (__task* __task; (__task = __pop_front_()) != &__head_;) { + __task->__execute(); + } + } + + inline void run_loop::finish() { + std::unique_lock __lock{__mutex_}; + __stop_ = true; + __cv_.notify_all(); + } + + inline void run_loop::__push_back_(__task* __task) { + std::unique_lock __lock{__mutex_}; + __task->__next_ = &__head_; + __head_.__tail_ = __head_.__tail_->__next_ = __task; + __cv_.notify_one(); + } + + inline __task* run_loop::__pop_front_() { + std::unique_lock __lock{__mutex_}; + __cv_.wait(__lock, [this]{ return __head_.__next_ != &__head_ || __stop_; }); + if (__head_.__tail_ == __head_.__next_) + __head_.__tail_ = &__head_; + return std::exchange(__head_.__next_, __head_.__next_->__next_); + } + } // namespace __loop + + // NOT TO SPEC + using run_loop = __loop::run_loop; + + ///////////////////////////////////////////////////////////////////////////// + // [execution.senders.consumers.sync_wait] + // [execution.senders.consumers.sync_wait_with_variant] + namespace __sync_wait { + template + using __into_variant_result_t = + decltype(stdexec::into_variant(__declval<_Sender>())); + + struct __env { + using __t = __env; + using __id = __env; + stdexec::run_loop::__scheduler __sched_; + + friend auto tag_invoke(stdexec::get_scheduler_t, const __env& __self) noexcept + -> stdexec::run_loop::__scheduler { + return __self.__sched_; + } + + friend auto tag_invoke(stdexec::get_delegatee_scheduler_t, const __env& __self) noexcept + -> stdexec::run_loop::__scheduler { + return __self.__sched_; + } + }; + + // What should sync_wait(just_stopped()) return? + template + using __sync_wait_result_impl = + __value_types_of_t< + _Sender, + __env, + __transform<__q, _Continuation>, + __q<__msingle>>; + + template _Sender> + using __sync_wait_result_t = + __sync_wait_result_impl<_Sender, __q>; + + template + using __sync_wait_with_variant_result_t = + __sync_wait_result_t<__into_variant_result_t<_Sender>>; + + template + struct __state { + using _Tuple = std::tuple<_Values...>; + std::variant __data_{}; + }; + + template + struct __receiver { + struct __t { + using __id = __receiver; + __state<_Values...>* __state_; + stdexec::run_loop* __loop_; + template + void __set_error(_Error __err) noexcept { + if constexpr (__decays_to<_Error, std::exception_ptr>) + __state_->__data_.template emplace<2>((_Error&&) __err); + else if constexpr (__decays_to<_Error, std::error_code>) + __state_->__data_.template emplace<2>(std::make_exception_ptr(std::system_error(__err))); + else + __state_->__data_.template emplace<2>(std::make_exception_ptr((_Error&&) __err)); + __loop_->finish(); + } + template + requires constructible_from, _As...> + friend void tag_invoke(stdexec::set_value_t, __t&& __rcvr, _As&&... __as) noexcept try { + __rcvr.__state_->__data_.template emplace<1>((_As&&) __as...); + __rcvr.__loop_->finish(); + } catch(...) { + __rcvr.__set_error(std::current_exception()); + } + template + friend void tag_invoke(stdexec::set_error_t, __t&& __rcvr, _Error __err) noexcept { + __rcvr.__set_error((_Error &&) __err); + } + friend void tag_invoke(stdexec::set_stopped_t __d, __t&& __rcvr) noexcept { + __rcvr.__state_->__data_.template emplace<3>(__d); + __rcvr.__loop_->finish(); + } + friend __env + tag_invoke(stdexec::get_env_t, const __t& __rcvr) noexcept { + return {__rcvr.__loop_->get_scheduler()}; + } + }; + }; + + template + using __into_variant_result_t = + decltype(stdexec::into_variant(__declval<_Sender>())); + + //////////////////////////////////////////////////////////////////////////// + // [execution.senders.consumers.sync_wait] + struct sync_wait_t { + template + using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>; + + // TODO: constrain on return type + template <__single_value_variant_sender<__env> _Sender> // NOT TO SPEC + requires + __tag_invocable_with_completion_scheduler< + sync_wait_t, set_value_t, _Sender> + tag_invoke_result_t< + sync_wait_t, + __completion_scheduler_for<_Sender, set_value_t>, + _Sender> + operator()(_Sender&& __sndr) const noexcept( + nothrow_tag_invocable< + sync_wait_t, + __completion_scheduler_for<_Sender, set_value_t>, + _Sender>) { + auto __sched = + get_completion_scheduler(__sndr); + return tag_invoke(sync_wait_t{}, std::move(__sched), (_Sender&&) __sndr); + } + + // TODO: constrain on return type + template <__single_value_variant_sender<__env> _Sender> // NOT TO SPEC + requires + (!__tag_invocable_with_completion_scheduler< + sync_wait_t, set_value_t, _Sender>) && + tag_invocable + tag_invoke_result_t + operator()(_Sender&& __sndr) const noexcept( + nothrow_tag_invocable) { + return tag_invoke(sync_wait_t{}, (_Sender&&) __sndr); + } + + template <__single_value_variant_sender<__env> _Sender> + requires + (!__tag_invocable_with_completion_scheduler< + sync_wait_t, set_value_t, _Sender>) && + (!tag_invocable) && + sender<_Sender, __env> && + sender_to<_Sender, __receiver_t<_Sender>> + auto operator()(_Sender&& __sndr) const + -> std::optional<__sync_wait_result_t<_Sender>> { + using state_t = __sync_wait_result_impl<_Sender, __q<__state>>; + using tail_t = next_tail_from_sender_to_t<_Sender, __receiver_t<_Sender>>; + state_t __state {}; + run_loop __loop; + + // Launch the sender with a continuation that will fill in a variant + // and notify a condition variable. + auto __op_state = + connect((_Sender&&) __sndr, __receiver_t<_Sender>{&__state, &__loop}); + tail_t __tail = start(__op_state); + + // Wait for the variant to be filled in. + auto __tail_run = __loop.run(just()); + + resume_tail_senders(__null_tail_sender{}, __tail, __tail_run); + + if (__state.__data_.index() == 2) + std::rethrow_exception(std::get<2>(__state.__data_)); + + if (__state.__data_.index() == 3) + return std::nullopt; + + return std::move(std::get<1>(__state.__data_)); + } + }; + } // namespace __sync_wait + using __sync_wait::sync_wait_t; + inline constexpr sync_wait_t sync_wait{}; } // namespace exec diff --git a/test/exec/test_tail_sender.cpp b/test/exec/test_tail_sender.cpp index 7b0838f88..52f668ca7 100644 --- a/test/exec/test_tail_sender.cpp +++ b/test/exec/test_tail_sender.cpp @@ -22,6 +22,8 @@ using namespace std; namespace ex = stdexec; +namespace { + //! Tail Sender struct ATailSender { using completion_signatures = ex::completion_signatures; @@ -63,12 +65,13 @@ struct ATailReceiver { int* called; friend void tag_invoke(ex::set_value_t, ATailReceiver&& __self, auto&&...) noexcept { ++*__self.called; } friend void tag_invoke(ex::set_stopped_t, ATailReceiver&& __self) noexcept { ++*__self.called; } - friend ex::__empty_env tag_invoke(ex::get_env_t, const ATailReceiver&) { + template + requires same_as<_Self, ATailReceiver> + friend ex::__empty_env tag_invoke(ex::get_env_t, const _Self&) { return {}; } }; - template struct ANestTailReceiver { std::decay_t nested_tail_sender; @@ -88,6 +91,70 @@ struct ANestTailReceiver { } }; +// struct ATailSender { +// using completion_signatures = ex::completion_signatures; + +// template +// struct operation { +// Receiver rcvr_; + +// operation(Receiver __r) : rcvr_(__r) {} + +// operation(const operation&) = delete; +// operation(operation&&) = delete; +// operation& operator=(const operation&) = delete; +// operation& operator=(operation&&) = delete; +// [[nodiscard]] +// friend auto tag_invoke(ex::start_t, operation& self) noexcept { +// return ex::set_value(std::move(self.rcvr_)); +// } + +// friend void tag_invoke(exec::unwind_t, operation& self) noexcept { +// ex::set_stopped(std::move(self.rcvr_)); +// } +// }; + +// template +// friend auto tag_invoke(ex::connect_t, ATailSender self, Receiver&& rcvr) noexcept +// -> operation> { +// return {std::forward(rcvr)}; +// } + +// template +// friend constexpr bool tag_invoke( +// exec::always_completes_inline_t, exec::c_t, exec::c_t<_Env>) noexcept { +// return true; +// } +// }; + +struct ASenderWithTail { + using completion_signatures = ex::completion_signatures; + + template + struct operation { + Receiver rcvr_; + + operation(Receiver __r) : rcvr_(__r) {} + + operation(const operation&) = delete; + operation(operation&&) = delete; + operation& operator=(const operation&) = delete; + operation& operator=(operation&&) = delete; + [[nodiscard]] + friend auto tag_invoke(ex::start_t, operation& self) noexcept { + return ex::set_value(std::move(self.rcvr_)); + } + }; + + template + friend auto tag_invoke(ex::connect_t, ATailSender self, Receiver&& rcvr) noexcept + -> operation> { + return {std::forward(rcvr)}; + } +}; + +} + TEST_CASE("Test ATailSender is a tail_sender", "[tail_sender]") { static_assert(exec::tail_sender); static_assert(exec::__terminal_tail_sender_to); @@ -283,3 +350,6 @@ TEST_CASE("Test resume_tail_senders_until_one_remaining()", "[tail_sender]") { CHECK(called == 1); CHECK(!op4); } + +TEST_CASE("Test sync_wait() with start() that returns a tail_sender", "[tail_sender]") { +} From 715c973c1d2814c37f4791948bacfd64afb09a83 Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Wed, 22 Mar 2023 17:10:09 +0000 Subject: [PATCH 11/13] apply clang-format --- include/exec/any_sender_of.hpp | 3 +- include/exec/finally.hpp | 3 +- include/exec/tail_sender.hpp | 1161 +++++++++++++------------- include/exec/variant_tail_sender.hpp | 224 ++--- include/stdexec/concepts.hpp | 2 +- include/stdexec/execution.hpp | 12 +- include/stdexec/functional.hpp | 2 +- include/tbbexec/tbb_thread_pool.hpp | 4 +- test/exec/test_tail_sender.cpp | 313 +++---- 9 files changed, 905 insertions(+), 819 deletions(-) diff --git a/include/exec/any_sender_of.hpp b/include/exec/any_sender_of.hpp index 2c50ad1af..3adc4a202 100644 --- a/include/exec/any_sender_of.hpp +++ b/include/exec/any_sender_of.hpp @@ -408,7 +408,8 @@ namespace exec { } template - friend void tag_invoke(__move_construct_t, __mtype<_Tp>, __t& __self, __t&& __other) noexcept { + friend void + tag_invoke(__move_construct_t, __mtype<_Tp>, __t& __self, __t&& __other) noexcept { if (!__other.__object_pointer_) { return; } diff --git a/include/exec/finally.hpp b/include/exec/finally.hpp index a6d14bc1e..392b079b1 100644 --- a/include/exec/finally.hpp +++ b/include/exec/finally.hpp @@ -250,7 +250,8 @@ namespace exec { requires receiver_of< _Rec, __completion_signatures_t<_InitialSender, _FinalSender, env_of_t<_Rec>>> - friend __op_t< _Self, _Rec> tag_invoke(connect_t, _Self&& __self, _Rec&& __receiver) noexcept { + friend __op_t< _Self, _Rec> + tag_invoke(connect_t, _Self&& __self, _Rec&& __receiver) noexcept { return { ((_Self&&) __self).__initial_sender_, ((_Self&&) __self).__final_sender_, diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp index 187d8213b..afe70bbe5 100644 --- a/include/exec/tail_sender.hpp +++ b/include/exec/tail_sender.hpp @@ -22,35 +22,33 @@ namespace exec { using namespace stdexec; template - concept __contextually_convertible_to_bool = - requires(const T c) { - { (static_cast(c) ? false : false) } -> same_as; - }; + concept __contextually_convertible_to_bool = requires(const T c) { + { (static_cast(c) ? false : false) } -> same_as; + }; template - static constexpr bool __nothrow_contextually_convertible_to_bool_v = - noexcept((std::declval() ? (void)0 : (void)0)); + static constexpr bool __nothrow_contextually_convertible_to_bool_v = + noexcept((std::declval() ? (void) 0 : (void) 0)); template - concept __nothrow_contextually_convertible_to_bool = - __contextually_convertible_to_bool && - __nothrow_contextually_convertible_to_bool_v; + concept __nothrow_contextually_convertible_to_bool = + __contextually_convertible_to_bool && __nothrow_contextually_convertible_to_bool_v; namespace __unwind { struct unwind_t { template - requires std::tag_invocable - void operator()(_Op& __op) const noexcept(std::nothrow_tag_invocable) { - (void) tag_invoke(unwind_t{}, __op); - } + requires std::tag_invocable + void operator()(_Op& __op) const noexcept(std::nothrow_tag_invocable) { + (void) tag_invoke(unwind_t{}, __op); + } }; } + using __unwind::unwind_t; inline constexpr unwind_t unwind{}; template - struct c_t { - }; + struct c_t { }; template inline constexpr c_t<_T> c_v{}; @@ -58,71 +56,93 @@ namespace exec { template const _Ty& __cref_fn(const _Ty&); template - using __cref_t = - decltype(__sender_queries::__cref_fn(__declval<_Ty>())); + using __cref_t = decltype(__sender_queries::__cref_fn(__declval<_Ty>())); struct always_completes_inline_t { template - requires std::tag_invocable, __cref_t<_Env>> - constexpr bool operator()(_Sender&& __s, _Env&& __e) const noexcept { - static_assert(same_as, __cref_t<_Env>>>); - static_assert(std::nothrow_tag_invocable, __cref_t<_Env>>); - return tag_invoke(always_completes_inline_t{}, std::as_const(__s), std::as_const(__e)); - } + requires std::tag_invocable, __cref_t<_Env>> + constexpr bool operator()(_Sender&& __s, _Env&& __e) const noexcept { + static_assert( + same_as< + bool, + tag_invoke_result_t, __cref_t<_Env>>>); + static_assert( + std::nothrow_tag_invocable, __cref_t<_Env>>); + return tag_invoke(always_completes_inline_t{}, std::as_const(__s), std::as_const(__e)); + } + template - requires std::tag_invocable>, c_t>> - constexpr bool operator()(c_t>&& __s, c_t>&& __e) const noexcept { - static_assert(same_as>, c_t>>>); - static_assert(std::nothrow_tag_invocable>, c_t>>); - return tag_invoke(always_completes_inline_t{}, __s, __e); - } + requires std::tag_invocable< + always_completes_inline_t, + c_t>, + c_t>> + constexpr bool + operator()(c_t>&& __s, c_t>&& __e) const noexcept { + static_assert( + same_as< + bool, + tag_invoke_result_t< + always_completes_inline_t, + c_t>, + c_t>>>); + static_assert(std::nothrow_tag_invocable< + always_completes_inline_t, + c_t>, + c_t>>); + return tag_invoke(always_completes_inline_t{}, __s, __e); + } + constexpr bool operator()(auto&&, auto&&) const noexcept { - return false; + return false; } }; } // namespace __sender_queries + using __sender_queries::always_completes_inline_t; inline constexpr always_completes_inline_t always_completes_inline{}; template - concept tail_operation_state = - operation_state<_TailOperationState> && - std::is_nothrow_destructible_v<_TailOperationState> && - std::is_trivially_destructible_v<_TailOperationState> && - (!std::is_copy_constructible_v<_TailOperationState>) && - (!std::is_move_constructible_v<_TailOperationState>) && - (!std::is_copy_assignable_v<_TailOperationState>) && - (!std::is_move_assignable_v<_TailOperationState>) && - requires (_TailOperationState& __o) { - { unwind(__o) } noexcept; - }; + concept tail_operation_state = + operation_state<_TailOperationState> && std::is_nothrow_destructible_v<_TailOperationState> + && std::is_trivially_destructible_v<_TailOperationState> + && (!std::is_copy_constructible_v<_TailOperationState>) &&( + !std::is_move_constructible_v< + _TailOperationState>) &&(!std:: + is_copy_assignable_v< + _TailOperationState>) &&(!std:: + is_move_assignable_v< + _TailOperationState>) &&requires(_TailOperationState& + __o) { + { unwind(__o) } noexcept; + }; template - constexpr bool always_completes_inline_v = - always_completes_inline(c_v>, c_v>); + constexpr bool always_completes_inline_v = + always_completes_inline(c_v>, c_v>); template - concept tail_sender = - sender<_TailSender, _Env> && - same_as<__single_sender_value_t<_TailSender, _Env>, void> && - always_completes_inline_v<_TailSender, _Env> && - std::is_nothrow_move_constructible_v<_TailSender> && - std::is_nothrow_destructible_v<_TailSender> && - std::is_trivially_destructible_v<_TailSender>; + concept tail_sender = + sender<_TailSender, _Env> && same_as<__single_sender_value_t<_TailSender, _Env>, void> + && always_completes_inline_v<_TailSender, _Env> + && std::is_nothrow_move_constructible_v<_TailSender> + && std::is_nothrow_destructible_v<_TailSender> && std::is_trivially_destructible_v<_TailSender>; template - concept tail_receiver = - receiver<_TailReceiver> && - std::is_nothrow_copy_constructible_v<_TailReceiver> && - std::is_nothrow_move_constructible_v<_TailReceiver> && - std::is_nothrow_copy_assignable_v<_TailReceiver> && - std::is_nothrow_move_assignable_v<_TailReceiver> && - std::is_nothrow_destructible_v<_TailReceiver> && - std::is_trivially_destructible_v<_TailReceiver>; + concept tail_receiver = + receiver<_TailReceiver> && std::is_nothrow_copy_constructible_v<_TailReceiver> + && std::is_nothrow_move_constructible_v<_TailReceiver> + && std::is_nothrow_copy_assignable_v<_TailReceiver> + && std::is_nothrow_move_assignable_v<_TailReceiver> + && std::is_nothrow_destructible_v<_TailReceiver> + && std::is_trivially_destructible_v<_TailReceiver>; struct __null_tail_receiver { - friend void tag_invoke(set_value_t, __null_tail_receiver&&, auto&&...) noexcept {} - friend void tag_invoke(set_stopped_t, __null_tail_receiver&&) noexcept {} + friend void tag_invoke(set_value_t, __null_tail_receiver&&, auto&&...) noexcept { + } + + friend void tag_invoke(set_stopped_t, __null_tail_receiver&&) noexcept { + } + friend __empty_env tag_invoke(get_env_t, const __null_tail_receiver& __self) { return {}; } @@ -134,15 +154,19 @@ namespace exec { // this is a nullable_tail_sender that always returns false to prevent // callers from calling start() and unwind() - inline constexpr operator bool() const noexcept { return false; } + inline constexpr operator bool() const noexcept { + return false; + } friend void tag_invoke(start_t, op& self) noexcept { - printf("__null_tail_sender start\n"); fflush(stdout); + printf("__null_tail_sender start\n"); + fflush(stdout); std::terminate(); } friend void tag_invoke(unwind_t, op& self) noexcept { - printf("__null_tail_sender unwind\n"); fflush(stdout); + printf("__null_tail_sender unwind\n"); + fflush(stdout); std::terminate(); } }; @@ -150,426 +174,453 @@ namespace exec { using completion_signatures = completion_signatures; template - friend auto tag_invoke(connect_t, __null_tail_sender&&, _TailReceiver&&) noexcept - -> op { - return {}; - } + friend auto tag_invoke(connect_t, __null_tail_sender&&, _TailReceiver&&) noexcept -> op { + return {}; + } - template - friend constexpr bool tag_invoke( - exec::always_completes_inline_t, exec::c_t<__null_tail_sender>, exec::c_t<_Env>) noexcept { - return true; - } + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, + exec::c_t<__null_tail_sender>, + exec::c_t<_Env>) noexcept { + return true; + } }; template struct __variant_tail_sender; template - using __next_tail_from_operation_t = - __call_result_t; + using __next_tail_from_operation_t = __call_result_t; template - using next_tail_from_operation_t = - __if<__bool>>, - __null_tail_sender, - __call_result_t>; + using next_tail_from_operation_t = __if< + __bool>>, + __null_tail_sender, + __call_result_t>; template - using __next_tail_from_sender_to_t = - __next_tail_from_operation_t>; + using __next_tail_from_sender_to_t = + __next_tail_from_operation_t>; template - using next_tail_from_sender_to_t = - next_tail_from_operation_t>; + using next_tail_from_sender_to_t = + next_tail_from_operation_t>; template - concept tail_sender_to = - tail_sender<_TailSender> && - tail_receiver<_TailReceiver> && - requires(_TailSender&& __s, _TailReceiver&& __r) { - { stdexec::connect((_TailSender&&) __s, (_TailReceiver&&) __r) } noexcept -> - tail_operation_state; - } && - tail_sender>; + concept tail_sender_to = tail_sender<_TailSender> && tail_receiver<_TailReceiver> + && requires(_TailSender&& __s, _TailReceiver&& __r) { + { + stdexec::connect((_TailSender&&) __s, (_TailReceiver&&) __r) + } noexcept -> tail_operation_state; + } && tail_sender>; template - concept __terminal_tail_operation_state = - tail_operation_state<_TailOperationState> && - same_as<__next_tail_from_operation_t<_TailOperationState>, void>; + concept __terminal_tail_operation_state = + tail_operation_state<_TailOperationState> + && same_as<__next_tail_from_operation_t<_TailOperationState>, void>; template - concept __terminal_tail_sender_to = - tail_sender_to<_TailSender, _TailReceiver> && - same_as<__next_tail_from_sender_to_t<_TailSender, _TailReceiver>, void>; + concept __terminal_tail_sender_to = + tail_sender_to<_TailSender, _TailReceiver> + && same_as<__next_tail_from_sender_to_t<_TailSender, _TailReceiver>, void>; template - concept __recursive_tail_sender_to = - tail_sender_to<_TailSender, _TailReceiver> && - tail_operation_state> && - __one_of, _ValidTailSender...>; + concept __recursive_tail_sender_to = + tail_sender_to<_TailSender, _TailReceiver> + && tail_operation_state> + && __one_of, _ValidTailSender...>; template - concept __nullable_tail_operation_state = - tail_operation_state<_TailOperationState> && - __nothrow_contextually_convertible_to_bool<_TailOperationState>; + concept __nullable_tail_operation_state = + tail_operation_state<_TailOperationState> + && __nothrow_contextually_convertible_to_bool<_TailOperationState>; template - concept __nullable_tail_sender_to = - tail_sender_to<_TailSender, _TailReceiver> && - __nullable_tail_operation_state>; + concept __nullable_tail_sender_to = + tail_sender_to<_TailSender, _TailReceiver> + && __nullable_tail_operation_state>; } // namespace exec +#include "variant_tail_sender.hpp" +namespace exec { -#include "variant_tail_sender.hpp" + template + constexpr std::decay_t<_To> result_from(_From&& __f) noexcept { + if constexpr ( + __is_instance_of, __variant_tail_sender> + && __is_instance_of, __variant_tail_sender>) { + return variant_cast>((_From&&) __f); + } else if constexpr ( + __is_instance_of, __variant_tail_sender> + && !__is_instance_of, __variant_tail_sender>) { + return get>((_From&&) __f); + } else { + static_assert(std::is_constructible_v<_To, _From>, "result_from cannot convert"); + return (_From&&) __f; + } + } + template + requires(!same_as<__null_tail_sender, _TailSender>) + struct maybe_tail_sender { + maybe_tail_sender() noexcept = default; + maybe_tail_sender(__null_tail_sender) noexcept { + } -namespace exec { + maybe_tail_sender(_TailSender __t) noexcept + : tail_sender_(__t) { + } - template - constexpr std::decay_t<_To> result_from(_From&& __f) noexcept { - if constexpr ( - __is_instance_of, __variant_tail_sender> && - __is_instance_of, __variant_tail_sender>) { - return variant_cast>((_From&&)__f); - } else if constexpr ( - __is_instance_of, __variant_tail_sender> && - !__is_instance_of, __variant_tail_sender>) { - return get>((_From&&)__f); - } else { - static_assert(std::is_constructible_v<_To, _From>, "result_from cannot convert"); - return (_From&&)__f; + template + struct op : __immovable { + using op_t = connect_result_t<_TailSender, _TailReceiver>; + op() = default; + + explicit op(_TailSender __t, _TailReceiver __r) + : op_(stdexec::__conv{[&] { + return stdexec::connect(__t, __r); + }}) { } - } - template - requires (!same_as<__null_tail_sender, _TailSender>) - struct maybe_tail_sender { - maybe_tail_sender() noexcept = default; - maybe_tail_sender(__null_tail_sender) noexcept {} - maybe_tail_sender(_TailSender __t) noexcept : tail_sender_(__t) {} - template - struct op : __immovable { - using op_t = connect_result_t<_TailSender, _TailReceiver>; - op() = default; - - explicit op(_TailSender __t, _TailReceiver __r) - : op_(stdexec::__conv{ - [&] { - return stdexec::connect(__t, __r); - } - }) {} - operator bool() const noexcept { - if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - return !!op_ && !!*op_; - } else { - return !!op_; - } + operator bool() const noexcept { + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + return !!op_ && !!*op_; + } else { + return !!op_; } + } - [[nodiscard]] - friend auto tag_invoke(start_t, op& __self) noexcept { - if (!__self.op_) { - printf("maybe_tail_sender start optional\n"); fflush(stdout); + [[nodiscard]] friend auto tag_invoke(start_t, op& __self) noexcept { + if (!__self.op_) { + printf("maybe_tail_sender start optional\n"); + fflush(stdout); + std::terminate(); + } + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + if (!*__self.op_) { + printf("maybe_tail_sender start nullable\n"); + fflush(stdout); std::terminate(); } - if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - if (!*__self.op_) { - printf("maybe_tail_sender start nullable\n"); fflush(stdout); - std::terminate(); - } - } - return stdexec::start(*__self.op_); } + return stdexec::start(*__self.op_); + } - friend void tag_invoke(unwind_t, op& __self) noexcept { - if (!__self.op_) { - printf("maybe_tail_sender unwind optional\n"); fflush(stdout); + friend void tag_invoke(unwind_t, op& __self) noexcept { + if (!__self.op_) { + printf("maybe_tail_sender unwind optional\n"); + fflush(stdout); + std::terminate(); + } + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + if (!*__self.op_) { + printf("maybe_tail_sender unwind nullable\n"); + fflush(stdout); std::terminate(); } - if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - if (!*__self.op_) { - printf("maybe_tail_sender unwind nullable\n"); fflush(stdout); - std::terminate(); - } - } - exec::unwind(*__self.op_); } - std::optional op_; - }; + exec::unwind(*__self.op_); + } - using completion_signatures = completion_signatures; + std::optional op_; + }; - template - [[nodiscard]] - friend auto tag_invoke(connect_t, maybe_tail_sender&& __self, _TailReceiver&& __r) noexcept - -> op> { - if (!__self.tail_sender_) { return {}; } - return op>{*((maybe_tail_sender&&)__self).tail_sender_, __r}; - } + using completion_signatures = completion_signatures; - template - friend constexpr bool tag_invoke( - exec::always_completes_inline_t, exec::c_t, exec::c_t<_Env>) noexcept { - return true; - } + template + [[nodiscard]] friend auto + tag_invoke(connect_t, maybe_tail_sender&& __self, _TailReceiver&& __r) noexcept + -> op> { + if (!__self.tail_sender_) { + return {}; + } + return op>{*((maybe_tail_sender&&) __self).tail_sender_, __r}; + } - private: - std::optional<_TailSender> tail_sender_; - }; + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, + exec::c_t, + exec::c_t<_Env>) noexcept { + return true; + } - template - struct scoped_tail_sender { - explicit scoped_tail_sender(_TailSender __t, _TailReceiver __r = _TailReceiver{}) noexcept - : t_(__t) - , r_(__r) - , valid_(true) {} - - scoped_tail_sender(scoped_tail_sender&& other) noexcept - : t_(other.s_) - , r_(other.r_) - , valid_(std::exchange(other.valid_, false)) {} - - ~scoped_tail_sender() { - if (valid_) { - auto op = stdexec::connect(t_, r_); - if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - if (!!op) { - exec::unwind(op); - } - } else { + private: + std::optional<_TailSender> tail_sender_; + }; + + template + struct scoped_tail_sender { + explicit scoped_tail_sender(_TailSender __t, _TailReceiver __r = _TailReceiver{}) noexcept + : t_(__t) + , r_(__r) + , valid_(true) { + } + + scoped_tail_sender(scoped_tail_sender&& other) noexcept + : t_(other.s_) + , r_(other.r_) + , valid_(std::exchange(other.valid_, false)) { + } + + ~scoped_tail_sender() { + if (valid_) { + auto op = stdexec::connect(t_, r_); + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + if (!!op) { exec::unwind(op); } + } else { + exec::unwind(op); } } + } - _TailSender get() noexcept { return t_; } + _TailSender get() noexcept { + return t_; + } - _TailSender release() noexcept { - valid_ = false; - return t_; - } + _TailSender release() noexcept { + valid_ = false; + return t_; + } - private: - _TailSender t_; - _TailReceiver r_; - bool valid_; - }; + private: + _TailSender t_; + _TailReceiver r_; + bool valid_; + }; struct __all_resumed_tail_sender { using completion_signatures = completion_signatures; template - friend auto tag_invoke(connect_t, __all_resumed_tail_sender&&, _TailReceiver&& __r) noexcept - -> __call_result_t { - return stdexec::connect(__null_tail_sender{}, __r); - } + friend auto tag_invoke(connect_t, __all_resumed_tail_sender&&, _TailReceiver&& __r) noexcept + -> __call_result_t { + return stdexec::connect(__null_tail_sender{}, __r); + } - template - friend constexpr bool tag_invoke( - exec::always_completes_inline_t, exec::c_t<__all_resumed_tail_sender>, exec::c_t<_Env>) noexcept { - return true; - } + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, + exec::c_t<__all_resumed_tail_sender>, + exec::c_t<_Env>) noexcept { + return true; + } }; namespace __start_until_nullable_ { struct __start_until_nullable_t; - template - struct __start_until_nullable_result; - - template - using __start_until_nullable_result_t = - typename __start_until_nullable_result<_TailSender, _TailReceiver>::type; - - template - struct __start_until_nullable_result { - using type = - __if< - __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, _TailSender, - __if< - __bool<__terminal_tail_sender_to<_TailSender, _TailReceiver>>, __all_resumed_tail_sender, - __minvoke<__with_default<__q<__start_until_nullable_result_t>, __all_resumed_tail_sender>, - __next_tail_from_sender_to_t<_TailSender, _TailReceiver>, - _TailReceiver> - > - >; - }; + template + struct __start_until_nullable_result; + + template + using __start_until_nullable_result_t = + typename __start_until_nullable_result<_TailSender, _TailReceiver>::type; + + template + struct __start_until_nullable_result { + using type = __if< + __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, + _TailSender, + __if< + __bool<__terminal_tail_sender_to<_TailSender, _TailReceiver>>, + __all_resumed_tail_sender, + __minvoke< + __with_default<__q<__start_until_nullable_result_t>, __all_resumed_tail_sender>, + __next_tail_from_sender_to_t<_TailSender, _TailReceiver>, + _TailReceiver> > >; + }; struct __start_until_nullable_t { - template - auto operator()(_TailSender __t, _TailReceiver __r) const noexcept - -> __start_until_nullable_result_t<_TailSender, _TailReceiver> { - if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - return __t; - } else if constexpr (__terminal_tail_sender_to<_TailSender, _TailReceiver>) { - // restrict scope of op - { - auto op = stdexec::connect(std::move(__t), std::move(__r)); - stdexec::start(op); - } - return __all_resumed_tail_sender{}; - } else { - auto op = stdexec::connect(std::move(__t), __r); - return __start_until_nullable_t{}(stdexec::start(op), std::move(__r)); + template + auto operator()(_TailSender __t, _TailReceiver __r) const noexcept + -> __start_until_nullable_result_t<_TailSender, _TailReceiver> { + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + return __t; + } else if constexpr (__terminal_tail_sender_to<_TailSender, _TailReceiver>) { + // restrict scope of op + { + auto op = stdexec::connect(std::move(__t), std::move(__r)); + stdexec::start(op); } + return __all_resumed_tail_sender{}; + } else { + auto op = stdexec::connect(std::move(__t), __r); + return __start_until_nullable_t{}(stdexec::start(op), std::move(__r)); } + } }; } // namespace __start_until_nullable_ + using __start_until_nullable_::__start_until_nullable_t; inline constexpr __start_until_nullable_t __start_until_nullable{}; - template - auto __start_next(_NextTailSender __next, _TailReceiver __r) noexcept; - - template - struct __start_sequential_result; - - template - using __start_sequential_result_t = typename __start_sequential_result<_TailSender, _TailReceiver>::type; - - template - struct __start_sequential_result { - using next_t = next_tail_from_sender_to_t<_TailSender, _TailReceiver>; - using start_next_t = __call_result_t< - decltype(__start_next), - next_t, - _TailReceiver>; - using type = - __if< - __bool< - __nullable_tail_sender_to<_TailSender, _TailReceiver> && - __terminal_tail_sender_to<_TailSender, _TailReceiver>>, - _TailSender, - __if< // elseif - __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, - variant_tail_sender<__all_resumed_tail_sender, start_next_t>, - __if< // elseif - __bool>, - start_next_t, - __all_resumed_tail_sender // else - > - > - >; - }; - - template - auto __start_sequential(_TailSender c, _TailReceiver r) noexcept - -> __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...>; + template < + tail_sender _NextTailSender, + tail_sender _TailSender, + tail_receiver _TailReceiver, + tail_sender... _PrevTailSenders> + auto __start_next(_NextTailSender __next, _TailReceiver __r) noexcept; + + template + struct __start_sequential_result; + + template + using __start_sequential_result_t = + typename __start_sequential_result<_TailSender, _TailReceiver>::type; + + template + struct __start_sequential_result { + using next_t = next_tail_from_sender_to_t<_TailSender, _TailReceiver>; + using start_next_t = __call_result_t< + decltype(__start_next), + next_t, + _TailReceiver>; + using type = __if< + __bool< + __nullable_tail_sender_to<_TailSender, _TailReceiver> + && __terminal_tail_sender_to<_TailSender, _TailReceiver>>, + _TailSender, + __if< // elseif + __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, + variant_tail_sender<__all_resumed_tail_sender, start_next_t>, + __if< // elseif + __bool>, + start_next_t, + __all_resumed_tail_sender // else + > > >; + }; - template - auto __start_next(_NextTailSender __next, _TailReceiver __r) noexcept { - if constexpr (__one_of<_NextTailSender, _TailSender, _PrevTailSenders...>) { - static_assert( - (__nullable_tail_sender_to<_TailSender, _TailReceiver> || - (__nullable_tail_sender_to<_PrevTailSenders, _TailReceiver> || ...)), - "At least one tail_sender in a cycle must be nullable to avoid " - "entering an infinite loop"); - return __start_until_nullable(__next, std::move(__r)); - } else { - return __start_sequential<_NextTailSender, _TailReceiver, _TailSender, _PrevTailSenders...>(__next, std::move(__r)); - } + template + auto __start_sequential(_TailSender c, _TailReceiver r) noexcept + -> __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...>; + + template < + tail_sender _NextTailSender, + tail_sender _TailSender, + tail_receiver _TailReceiver, + tail_sender... _PrevTailSenders> + auto __start_next(_NextTailSender __next, _TailReceiver __r) noexcept { + if constexpr (__one_of<_NextTailSender, _TailSender, _PrevTailSenders...>) { + static_assert( + (__nullable_tail_sender_to<_TailSender, _TailReceiver> + || (__nullable_tail_sender_to<_PrevTailSenders, _TailReceiver> || ...)), + "At least one tail_sender in a cycle must be nullable to avoid " + "entering an infinite loop"); + return __start_until_nullable(__next, std::move(__r)); + } else { + return __start_sequential<_NextTailSender, _TailReceiver, _TailSender, _PrevTailSenders...>( + __next, std::move(__r)); } + } - template - auto __start_sequential(_TailSender c, _TailReceiver r) noexcept - -> __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...> { - using next_t = next_tail_from_sender_to_t<_TailSender, _TailReceiver>; - using result_t = __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...>; - - if constexpr ( - __nullable_tail_sender_to<_TailSender, _TailReceiver> && - __terminal_tail_sender_to<_TailSender, _TailReceiver>) { - // halt the recursion - return c; - } else if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { - // recurse if the nullable tail-sender is valid otherwise return - // a nullable and terminal tail-sender - auto op = stdexec::connect(std::move(c), r); - if (!op) { - return __all_resumed_tail_sender{}; - } - return result_from( - __start_next( - stdexec::start(op), r)); - } else if constexpr (!__terminal_tail_sender_to<_TailSender, _TailReceiver>) { - auto op = stdexec::connect(std::move(c), r); - return result_from( - __start_next( - stdexec::start(op), r)); - } else { - // run the terminal and not nullable tail-sender and return - // a nullable and terminal tail-sender - auto op = stdexec::connect(std::move(c), r); - stdexec::start(op); + template + auto __start_sequential(_TailSender c, _TailReceiver r) noexcept + -> __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...> { + using next_t = next_tail_from_sender_to_t<_TailSender, _TailReceiver>; + using result_t = __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...>; + + if constexpr ( + __nullable_tail_sender_to<_TailSender, _TailReceiver> + && __terminal_tail_sender_to<_TailSender, _TailReceiver>) { + // halt the recursion + return c; + } else if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + // recurse if the nullable tail-sender is valid otherwise return + // a nullable and terminal tail-sender + auto op = stdexec::connect(std::move(c), r); + if (!op) { return __all_resumed_tail_sender{}; } + return result_from( + __start_next( + stdexec::start(op), r)); + } else if constexpr (!__terminal_tail_sender_to<_TailSender, _TailReceiver>) { + auto op = stdexec::connect(std::move(c), r); + return result_from( + __start_next( + stdexec::start(op), r)); + } else { + // run the terminal and not nullable tail-sender and return + // a nullable and terminal tail-sender + auto op = stdexec::connect(std::move(c), r); + stdexec::start(op); + return __all_resumed_tail_sender{}; } + } + template + inline __null_tail_sender resume_tail_senders_until_one_remaining(_TailReceiver&&) noexcept { + return {}; + } - template - inline __null_tail_sender resume_tail_senders_until_one_remaining(_TailReceiver&&) noexcept { - return {}; - } - - template - C resume_tail_senders_until_one_remaining(_TailReceiver&&, C c) noexcept { - return c; - } - - template - auto _resume_tail_senders_until_one_remaining(_TailReceiver&& __r, std::index_sequence, Cs... cs) noexcept { - using result_type = variant_tail_sender; - result_type result; + template + C resume_tail_senders_until_one_remaining(_TailReceiver&&, C c) noexcept { + return c; + } - auto cs2_tuple = std::make_tuple( - variant_tail_sender< - __all_resumed_tail_sender, - decltype(__start_sequential(__start_sequential(cs, __r), __r))>{ - __start_sequential(__start_sequential(cs, __r), __r)}...); - while (true) { - std::size_t remaining = sizeof...(cs); - ((remaining > 1 ? - (!holds_alternative<__all_resumed_tail_sender>(std::get(cs2_tuple)) ? - (void)(result = result_from( - std::get(cs2_tuple) = result_from(cs2_tuple))>( - __start_sequential(std::get(cs2_tuple), __r)))) : - (void)--remaining) : - (void)(result = result_from(std::get(cs2_tuple)))), ...); - - if (remaining <= 1) { - return result; - } + template + auto _resume_tail_senders_until_one_remaining( + _TailReceiver&& __r, + std::index_sequence, + Cs... cs) noexcept { + using result_type = + variant_tail_sender; + result_type result; + + auto cs2_tuple = std::make_tuple( + variant_tail_sender< + __all_resumed_tail_sender, + decltype(__start_sequential(__start_sequential(cs, __r), __r))>{ + __start_sequential(__start_sequential(cs, __r), __r)}...); + while (true) { + std::size_t remaining = sizeof...(cs); + ((remaining > 1 ? ( + !holds_alternative<__all_resumed_tail_sender>(std::get(cs2_tuple)) + ? (void) (result = result_from( + std::get(cs2_tuple) = result_from(cs2_tuple))>( + __start_sequential(std::get(cs2_tuple), __r)))) + : (void) --remaining) + : (void) (result = result_from(std::get(cs2_tuple)))), + ...); + + if (remaining <= 1) { + return result; } } + } - template - auto resume_tail_senders_until_one_remaining(_TailReceiver&& __r, Cs... cs) noexcept { - return _resume_tail_senders_until_one_remaining(__r, std::index_sequence_for{}, cs...); - } + template + auto resume_tail_senders_until_one_remaining(_TailReceiver&& __r, Cs... cs) noexcept { + return _resume_tail_senders_until_one_remaining(__r, std::index_sequence_for{}, cs...); + } - template - void resume_tail_senders(_TailReceiver&& __r, Cs... cs) noexcept { - auto __last_tail = _resume_tail_senders_until_one_remaining(__r, std::index_sequence_for{}, cs...); - for(;;) { - auto __op = connect(__last_tail, __r); - if (!__op) { - return; - } - if constexpr (__terminal_tail_sender_to) { - start(__last_tail); - return; - } else { - __last_tail = __start_sequential(start(__last_tail), __r); - } + template + void resume_tail_senders(_TailReceiver&& __r, Cs... cs) noexcept { + auto __last_tail = _resume_tail_senders_until_one_remaining( + __r, std::index_sequence_for{}, cs...); + for (;;) { + auto __op = connect(__last_tail, __r); + if (!__op) { + return; + } + if constexpr (__terminal_tail_sender_to) { + start(__last_tail); + return; + } else { + __last_tail = __start_sequential(start(__last_tail), __r); } } + } ///////////////////////////////////////////////////////////////////////////// // run_loop @@ -578,80 +629,87 @@ namespace exec { struct __task : __immovable { __task* __next_ = this; + union { void (*__execute_)(__task*) noexcept; __task* __tail_; }; - void __execute() noexcept { (*__execute_)(this); } + void __execute() noexcept { + (*__execute_)(this); + } }; template - struct __operation { - using _Receiver = stdexec::__t<_ReceiverId>; - - struct __t : __task { - using __id = __operation; - - run_loop* __loop_; - [[no_unique_address]] _Receiver __rcvr_; - - static void __execute_impl(__task* __p) noexcept { - auto& __rcvr = ((__t*) __p)->__rcvr_; - try { - if (get_stop_token(get_env(__rcvr)).stop_requested()) { - set_stopped((_Receiver&&) __rcvr); - } else { - set_value((_Receiver&&) __rcvr); - } - } catch(...) { - set_error((_Receiver&&) __rcvr, std::current_exception()); + struct __operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t : __task { + using __id = __operation; + + run_loop* __loop_; + [[no_unique_address]] _Receiver __rcvr_; + + static void __execute_impl(__task* __p) noexcept { + auto& __rcvr = ((__t*) __p)->__rcvr_; + try { + if (get_stop_token(get_env(__rcvr)).stop_requested()) { + set_stopped((_Receiver&&) __rcvr); + } else { + set_value((_Receiver&&) __rcvr); } + } catch (...) { + set_error((_Receiver&&) __rcvr, std::current_exception()); } + } - explicit __t(__task* __tail) noexcept - : __task{.__tail_ = __tail} {} - __t(__task* __next, run_loop* __loop, _Receiver __rcvr) - : __task{{}, __next, {&__execute_impl}} - , __loop_{__loop} - , __rcvr_{(_Receiver&&) __rcvr} {} + explicit __t(__task* __tail) noexcept + : __task{.__tail_ = __tail} { + } - friend void tag_invoke(start_t, __t& __self) noexcept { - __self.__start_(); - } + __t(__task* __next, run_loop* __loop, _Receiver __rcvr) + : __task{{}, __next, {&__execute_impl}} + , __loop_{__loop} + , __rcvr_{(_Receiver&&) __rcvr} { + } - void __start_() noexcept; - }; + friend void tag_invoke(start_t, __t& __self) noexcept { + __self.__start_(); + } + + void __start_() noexcept; }; + }; template - struct __run_operation { - using _Receiver = stdexec::__t<_ReceiverId>; + struct __run_operation { + using _Receiver = stdexec::__t<_ReceiverId>; - struct __t { - using __id = __run_operation; + struct __t { + using __id = __run_operation; - run_loop* __loop_; - [[no_unique_address]] _Receiver __rcvr_; + run_loop* __loop_; + [[no_unique_address]] _Receiver __rcvr_; - __t(run_loop* __loop, _Receiver __rcvr) - : __loop_{__loop} - , __rcvr_{(_Receiver&&) __rcvr} {} + __t(run_loop* __loop, _Receiver __rcvr) + : __loop_{__loop} + , __rcvr_{(_Receiver&&) __rcvr} { + } - friend void tag_invoke(start_t, __t& __self) noexcept { - __self.__start_(); - } + friend void tag_invoke(start_t, __t& __self) noexcept { + __self.__start_(); + } - void __start_() noexcept; - }; + void __start_() noexcept; }; + }; class run_loop { - template - using __completion_signatures_ = completion_signatures; + template + using __completion_signatures_ = completion_signatures; template - friend struct __operation; + friend struct __operation; public: struct __scheduler { using __t = __scheduler; @@ -662,38 +720,37 @@ namespace exec { struct __schedule_task { using __t = __schedule_task; using __id = __schedule_task; - using completion_signatures = - __completion_signatures_< - set_value_t(), - set_error_t(std::exception_ptr), - set_stopped_t()>; + using completion_signatures = __completion_signatures_< + set_value_t(), + set_error_t(std::exception_ptr), + set_stopped_t()>; private: friend __scheduler; template - using __operation = stdexec::__t<__operation>>; + using __operation = stdexec::__t<__operation>>; template friend __operation<_Receiver> - tag_invoke(connect_t, const __schedule_task& __self, _Receiver __rcvr) { - return __self.__connect_((_Receiver &&) __rcvr); + tag_invoke(connect_t, const __schedule_task& __self, _Receiver __rcvr) { + return __self.__connect_((_Receiver&&) __rcvr); } template __operation<_Receiver> __connect_(_Receiver&& __rcvr) const { - return {&__loop_->__head_, __loop_, (_Receiver &&) __rcvr}; + return {&__loop_->__head_, __loop_, (_Receiver&&) __rcvr}; } template friend __scheduler - tag_invoke(get_completion_scheduler_t<_CPO>, const __schedule_task& __self) noexcept { + tag_invoke(get_completion_scheduler_t<_CPO>, const __schedule_task& __self) noexcept { return __scheduler{__self.__loop_}; } explicit __schedule_task(run_loop* __loop) noexcept - : __loop_(__loop) - {} + : __loop_(__loop) { + } run_loop* const __loop_; }; @@ -701,20 +758,21 @@ namespace exec { friend run_loop; explicit __scheduler(run_loop* __loop) noexcept - : __loop_(__loop) {} + : __loop_(__loop) { + } friend __schedule_task tag_invoke(schedule_t, const __scheduler& __self) noexcept { return __self.__schedule(); } - friend stdexec::forward_progress_guarantee tag_invoke( - get_forward_progress_guarantee_t, const __scheduler&) noexcept { + friend stdexec::forward_progress_guarantee + tag_invoke(get_forward_progress_guarantee_t, const __scheduler&) noexcept { return stdexec::forward_progress_guarantee::parallel; } // BUGBUG NOT TO SPEC - friend bool tag_invoke( - this_thread::execute_may_block_caller_t, const __scheduler&) noexcept { + friend bool + tag_invoke(this_thread::execute_may_block_caller_t, const __scheduler&) noexcept { return false; } @@ -724,6 +782,7 @@ namespace exec { run_loop* __loop_; }; + __scheduler get_scheduler() noexcept { return __scheduler{this}; } @@ -731,42 +790,39 @@ namespace exec { struct __run_sender { using __t = __run_sender; using __id = __run_sender; - using completion_signatures = - __completion_signatures_< - set_value_t(), - set_stopped_t()>; + using completion_signatures = __completion_signatures_< set_value_t(), set_stopped_t()>; - private: + private: friend __scheduler; template - using __operation = stdexec::__t<__operation>>; + using __operation = stdexec::__t<__operation>>; template friend __operation<_Receiver> - tag_invoke(connect_t, const __run_sender& __self, _Receiver __rcvr) { - return __self.__connect_((_Receiver &&) __rcvr); + tag_invoke(connect_t, const __run_sender& __self, _Receiver __rcvr) { + return __self.__connect_((_Receiver&&) __rcvr); } template __operation<_Receiver> __connect_(_Receiver&& __rcvr) const { - return {&__loop_->__head_, __loop_, (_Receiver &&) __rcvr}; + return {&__loop_->__head_, __loop_, (_Receiver&&) __rcvr}; } template friend __scheduler - tag_invoke(get_completion_scheduler_t<_CPO>, const __run_sender& __self) noexcept { + tag_invoke(get_completion_scheduler_t<_CPO>, const __run_sender& __self) noexcept { return __scheduler{__self.__loop_}; } explicit __run_sender(run_loop* __loop) noexcept - : __loop_(__loop) - {} + : __loop_(__loop) { + } run_loop* __loop_; }; - template - __run_sender run(_Sender&& __s); + template + __run_sender run(_Sender&& __s); void run(); @@ -785,13 +841,14 @@ namespace exec { template inline void __operation<_ReceiverId>::__t::__start_() noexcept try { __loop_->__push_back_(this); - } catch(...) { + } catch (...) { + set_error((_Receiver&&) __rcvr_, std::current_exception()); } - template - run_loop::__run_sender run_loop::run(_Sender&& __s) { - return {this, (_Sender &&)__s}; + template + run_loop::__run_sender run_loop::run(_Sender&& __s) { + return {this, (_Sender&&) __s}; } inline void run_loop::run() { @@ -815,7 +872,7 @@ namespace exec { inline __task* run_loop::__pop_front_() { std::unique_lock __lock{__mutex_}; - __cv_.wait(__lock, [this]{ return __head_.__next_ != &__head_ || __stop_; }); + __cv_.wait(__lock, [this] { return __head_.__next_ != &__head_ || __stop_; }); if (__head_.__tail_ == __head_.__next_) __head_.__tail_ = &__head_; return std::exchange(__head_.__next_, __head_.__next_->__next_); @@ -830,8 +887,7 @@ namespace exec { // [execution.senders.consumers.sync_wait_with_variant] namespace __sync_wait { template - using __into_variant_result_t = - decltype(stdexec::into_variant(__declval<_Sender>())); + using __into_variant_result_t = decltype(stdexec::into_variant(__declval<_Sender>())); struct __env { using __t = __env; @@ -851,125 +907,111 @@ namespace exec { // What should sync_wait(just_stopped()) return? template - using __sync_wait_result_impl = - __value_types_of_t< - _Sender, - __env, - __transform<__q, _Continuation>, - __q<__msingle>>; + using __sync_wait_result_impl = + __value_types_of_t< _Sender, __env, __transform<__q, _Continuation>, __q<__msingle>>; template _Sender> - using __sync_wait_result_t = - __sync_wait_result_impl<_Sender, __q>; + using __sync_wait_result_t = __sync_wait_result_impl<_Sender, __q>; template - using __sync_wait_with_variant_result_t = - __sync_wait_result_t<__into_variant_result_t<_Sender>>; + using __sync_wait_with_variant_result_t = + __sync_wait_result_t<__into_variant_result_t<_Sender>>; template - struct __state { - using _Tuple = std::tuple<_Values...>; - std::variant __data_{}; - }; + struct __state { + using _Tuple = std::tuple<_Values...>; + std::variant __data_{}; + }; template - struct __receiver { - struct __t { - using __id = __receiver; - __state<_Values...>* __state_; - stdexec::run_loop* __loop_; - template - void __set_error(_Error __err) noexcept { - if constexpr (__decays_to<_Error, std::exception_ptr>) - __state_->__data_.template emplace<2>((_Error&&) __err); - else if constexpr (__decays_to<_Error, std::error_code>) - __state_->__data_.template emplace<2>(std::make_exception_ptr(std::system_error(__err))); - else - __state_->__data_.template emplace<2>(std::make_exception_ptr((_Error&&) __err)); - __loop_->finish(); - } - template - requires constructible_from, _As...> - friend void tag_invoke(stdexec::set_value_t, __t&& __rcvr, _As&&... __as) noexcept try { - __rcvr.__state_->__data_.template emplace<1>((_As&&) __as...); - __rcvr.__loop_->finish(); - } catch(...) { - __rcvr.__set_error(std::current_exception()); - } - template - friend void tag_invoke(stdexec::set_error_t, __t&& __rcvr, _Error __err) noexcept { - __rcvr.__set_error((_Error &&) __err); - } - friend void tag_invoke(stdexec::set_stopped_t __d, __t&& __rcvr) noexcept { - __rcvr.__state_->__data_.template emplace<3>(__d); - __rcvr.__loop_->finish(); - } - friend __env - tag_invoke(stdexec::get_env_t, const __t& __rcvr) noexcept { - return {__rcvr.__loop_->get_scheduler()}; - } - }; + struct __receiver { + struct __t { + using __id = __receiver; + __state<_Values...>* __state_; + stdexec::run_loop* __loop_; + + template + void __set_error(_Error __err) noexcept { + if constexpr (__decays_to<_Error, std::exception_ptr>) + __state_->__data_.template emplace<2>((_Error&&) __err); + else if constexpr (__decays_to<_Error, std::error_code>) + __state_->__data_.template emplace<2>( + std::make_exception_ptr(std::system_error(__err))); + else + __state_->__data_.template emplace<2>(std::make_exception_ptr((_Error&&) __err)); + __loop_->finish(); + } + + template + requires constructible_from, _As...> + friend void tag_invoke(stdexec::set_value_t, __t&& __rcvr, _As&&... __as) noexcept try { + __rcvr.__state_->__data_.template emplace<1>((_As&&) __as...); + __rcvr.__loop_->finish(); + } catch (...) { + + __rcvr.__set_error(std::current_exception()); + } + + template + friend void tag_invoke(stdexec::set_error_t, __t&& __rcvr, _Error __err) noexcept { + __rcvr.__set_error((_Error&&) __err); + } + + friend void tag_invoke(stdexec::set_stopped_t __d, __t&& __rcvr) noexcept { + __rcvr.__state_->__data_.template emplace<3>(__d); + __rcvr.__loop_->finish(); + } + + friend __env tag_invoke(stdexec::get_env_t, const __t& __rcvr) noexcept { + return {__rcvr.__loop_->get_scheduler()}; + } }; + }; template - using __into_variant_result_t = - decltype(stdexec::into_variant(__declval<_Sender>())); + using __into_variant_result_t = decltype(stdexec::into_variant(__declval<_Sender>())); //////////////////////////////////////////////////////////////////////////// // [execution.senders.consumers.sync_wait] struct sync_wait_t { template - using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>; + using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>; // TODO: constrain on return type template <__single_value_variant_sender<__env> _Sender> // NOT TO SPEC - requires - __tag_invocable_with_completion_scheduler< - sync_wait_t, set_value_t, _Sender> - tag_invoke_result_t< - sync_wait_t, - __completion_scheduler_for<_Sender, set_value_t>, - _Sender> - operator()(_Sender&& __sndr) const noexcept( - nothrow_tag_invocable< - sync_wait_t, - __completion_scheduler_for<_Sender, set_value_t>, - _Sender>) { - auto __sched = - get_completion_scheduler(__sndr); + requires __tag_invocable_with_completion_scheduler< sync_wait_t, set_value_t, _Sender> + tag_invoke_result_t< sync_wait_t, __completion_scheduler_for<_Sender, set_value_t>, _Sender> + operator()(_Sender&& __sndr) const + noexcept(nothrow_tag_invocable< + sync_wait_t, + __completion_scheduler_for<_Sender, set_value_t>, + _Sender>) { + auto __sched = get_completion_scheduler(__sndr); return tag_invoke(sync_wait_t{}, std::move(__sched), (_Sender&&) __sndr); } // TODO: constrain on return type template <__single_value_variant_sender<__env> _Sender> // NOT TO SPEC - requires - (!__tag_invocable_with_completion_scheduler< - sync_wait_t, set_value_t, _Sender>) && - tag_invocable - tag_invoke_result_t - operator()(_Sender&& __sndr) const noexcept( - nothrow_tag_invocable) { + requires(!__tag_invocable_with_completion_scheduler< sync_wait_t, set_value_t, _Sender>) + && tag_invocable + tag_invoke_result_t operator()(_Sender&& __sndr) const + noexcept(nothrow_tag_invocable) { return tag_invoke(sync_wait_t{}, (_Sender&&) __sndr); } template <__single_value_variant_sender<__env> _Sender> - requires - (!__tag_invocable_with_completion_scheduler< - sync_wait_t, set_value_t, _Sender>) && - (!tag_invocable) && - sender<_Sender, __env> && - sender_to<_Sender, __receiver_t<_Sender>> - auto operator()(_Sender&& __sndr) const - -> std::optional<__sync_wait_result_t<_Sender>> { + requires(!__tag_invocable_with_completion_scheduler< sync_wait_t, set_value_t, _Sender>) + && (!tag_invocable) && sender<_Sender, __env> + && sender_to<_Sender, __receiver_t<_Sender>> + auto operator()(_Sender&& __sndr) const -> std::optional<__sync_wait_result_t<_Sender>> { using state_t = __sync_wait_result_impl<_Sender, __q<__state>>; using tail_t = next_tail_from_sender_to_t<_Sender, __receiver_t<_Sender>>; - state_t __state {}; + state_t __state{}; run_loop __loop; // Launch the sender with a continuation that will fill in a variant // and notify a condition variable. - auto __op_state = - connect((_Sender&&) __sndr, __receiver_t<_Sender>{&__state, &__loop}); + auto __op_state = connect((_Sender&&) __sndr, __receiver_t<_Sender>{&__state, &__loop}); tail_t __tail = start(__op_state); // Wait for the variant to be filled in. @@ -987,6 +1029,7 @@ namespace exec { } }; } // namespace __sync_wait + using __sync_wait::sync_wait_t; inline constexpr sync_wait_t sync_wait{}; } // namespace exec diff --git a/include/exec/variant_tail_sender.hpp b/include/exec/variant_tail_sender.hpp index 2d82cd156..64c3e2e34 100644 --- a/include/exec/variant_tail_sender.hpp +++ b/include/exec/variant_tail_sender.hpp @@ -26,7 +26,9 @@ namespace exec { template struct __variant_tail_sender : private std::variant<_TailSenderN...> { static_assert(sizeof...(_TailSenderN) >= 1, "variant_tail_sender requires at least one sender"); - static_assert((tail_sender<_TailSenderN> && ...), "variant_tail_sender requires all senders to be tail_sender"); + static_assert( + (tail_sender<_TailSenderN> && ...), + "variant_tail_sender requires all senders to be tail_sender"); using __senders_t = std::variant<_TailSenderN...>; @@ -65,38 +67,42 @@ namespace exec { // } template - struct op { - using __opn_t = __variant>...>; - using __start_result_t = __variant_tail_sender...>; - - op(const op&) = delete; - op(op&&) = delete; - op& operator=(const op&) = delete; - op& operator=(op&&) = delete; - - explicit op(__senders_t&& __t, _TailReceiver __r) { - std::visit([&, this](auto&& __t) -> void { - using _T = std::remove_cvref_t; - if constexpr (tail_sender<_T>) { - static_assert(tail_sender_to<_T, _TailReceiver>, "variant-tail-sender member cannot connect"); - using op_t = connect_result_t<_T, _TailReceiver>; - using opt_t = std::optional; - __opn_.template emplace(); - opt_t& opt = std::get(__opn_); - opt.~opt_t(); - new (&opt) opt_t{stdexec::__conv{ - [&] () -> op_t { - return stdexec::connect((decltype(__t)&&)__t, __r); - } - }}; - } else { - std::terminate(); - } - }, (__senders_t&&)__t); - } + struct op { + using __opn_t = + __variant>...>; + using __start_result_t = + __variant_tail_sender...>; - operator bool() const noexcept { - return std::visit([&](auto&& __op) -> bool { + op(const op&) = delete; + op(op&&) = delete; + op& operator=(const op&) = delete; + op& operator=(op&&) = delete; + + explicit op(__senders_t&& __t, _TailReceiver __r) { + std::visit( + [&, this](auto&& __t) -> void { + using _T = std::remove_cvref_t; + if constexpr (tail_sender<_T>) { + static_assert( + tail_sender_to<_T, _TailReceiver>, "variant-tail-sender member cannot connect"); + using op_t = connect_result_t<_T, _TailReceiver>; + using opt_t = std::optional; + __opn_.template emplace(); + opt_t& opt = std::get(__opn_); + opt.~opt_t(); + new (&opt) opt_t{stdexec::__conv{[&]() -> op_t { + return stdexec::connect((decltype(__t)&&) __t, __r); + }}}; + } else { + std::terminate(); + } + }, + (__senders_t&&) __t); + } + + operator bool() const noexcept { + return std::visit( + [&](auto&& __op) -> bool { using _Opt = std::decay_t; if constexpr (__is_instance_of_<_Opt, std::optional>) { auto& op = *__op; @@ -108,12 +114,13 @@ namespace exec { } else { std::terminate(); } - }, __opn_); - } + }, + __opn_); + } - [[nodiscard]] - friend __start_result_t tag_invoke(start_t, op& __self) noexcept { - return std::visit([&](auto&& __op) -> __start_result_t { + [[nodiscard]] friend __start_result_t tag_invoke(start_t, op& __self) noexcept { + return std::visit( + [&](auto&& __op) -> __start_result_t { using _Opt = std::decay_t; if constexpr (__is_instance_of_<_Opt, std::optional>) { auto& op = *__op; @@ -132,111 +139,130 @@ namespace exec { } else { std::terminate(); } - }, __self.__opn_); - } + }, + __self.__opn_); + } - friend void tag_invoke(unwind_t, op& __self) noexcept { - return std::visit([&](auto&& __op) -> void { + friend void tag_invoke(unwind_t, op& __self) noexcept { + return std::visit( + [&](auto&& __op) -> void { using _Opt = std::decay_t; if constexpr (__is_instance_of_<_Opt, std::optional>) { exec::unwind(*__op); } else { std::terminate(); } - }, __self.__opn_); - } - __opn_t __opn_; - }; + }, + __self.__opn_); + } + + __opn_t __opn_; + }; using completion_signatures = completion_signatures; template - [[nodiscard]] - friend auto tag_invoke(connect_t, __variant_tail_sender&& __self, _TailReceiver&& __r) noexcept - -> op> { - return op>{(__variant_tail_sender&&)__self, (_TailReceiver&&)__r}; + [[nodiscard]] friend auto + tag_invoke(connect_t, __variant_tail_sender&& __self, _TailReceiver&& __r) noexcept + -> op> { + return op>{ + (__variant_tail_sender&&) __self, (_TailReceiver&&) __r}; } - template + template friend constexpr bool tag_invoke( - exec::always_completes_inline_t, exec::c_t<__variant_tail_sender>, exec::c_t<_Env>) noexcept { + exec::always_completes_inline_t, + exec::c_t<__variant_tail_sender>, + exec::c_t<_Env>) noexcept { return true; } - private: + private: template friend struct __variant_tail_sender; template friend constexpr _To variant_cast(__variant_tail_sender __f) noexcept { - return std::visit([](_U&& __u) -> _To { - if constexpr (stdexec::__v<__mapply<__contains<_U>, _To>>) { - return _To{(_U&&) __u}; - } else { - printf("variant_cast\n"); fflush(stdout); - std::terminate(); - } - }, std::move(static_cast<__senders_t&>(__f))); + return std::visit( + [](_U&& __u) -> _To { + if constexpr (stdexec::__v<__mapply<__contains<_U>, _To>>) { + return _To{(_U&&) __u}; + } else { + printf("variant_cast\n"); + fflush(stdout); + std::terminate(); + } + }, + std::move(static_cast<__senders_t&>(__f))); } - template + template friend constexpr std::decay_t<_To> get(__variant_tail_sender __f) noexcept { - static_assert(stdexec::__v<__mapply<__contains>, __variant_tail_sender>>, "get does not have _To as an alternative"); + static_assert( + stdexec::__v<__mapply<__contains>, __variant_tail_sender>>, + "get does not have _To as an alternative"); if (!holds_alternative>(__f)) { - printf("get\n"); fflush(stdout); + printf("get\n"); + fflush(stdout); std::terminate(); } return std::get>(std::move(static_cast<__senders_t&>(__f))); } - template< class T> - friend inline constexpr bool holds_alternative( const __variant_tail_sender& v ) noexcept { + template < class T> + friend inline constexpr bool holds_alternative(const __variant_tail_sender& v) noexcept { return std::holds_alternative(v); } }; - template class _T> + template