diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 6025f472d..a5f6a9fd9 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -6,3 +6,6 @@ # Fixed access modifier offset 9150f682444f8f45c02bf858b767c1a0db81e548 + +# Format tail_sender branch +715c973c1d2814c37f4791948bacfd64afb09a83 diff --git a/include/exec/any_sender_of.hpp b/include/exec/any_sender_of.hpp index 2c50ad1af..3adc4a202 100644 --- a/include/exec/any_sender_of.hpp +++ b/include/exec/any_sender_of.hpp @@ -408,7 +408,8 @@ namespace exec { } template - friend void tag_invoke(__move_construct_t, __mtype<_Tp>, __t& __self, __t&& __other) noexcept { + friend void + tag_invoke(__move_construct_t, __mtype<_Tp>, __t& __self, __t&& __other) noexcept { if (!__other.__object_pointer_) { return; } diff --git a/include/exec/finally.hpp b/include/exec/finally.hpp index a6d14bc1e..392b079b1 100644 --- a/include/exec/finally.hpp +++ b/include/exec/finally.hpp @@ -250,7 +250,8 @@ namespace exec { requires receiver_of< _Rec, __completion_signatures_t<_InitialSender, _FinalSender, env_of_t<_Rec>>> - friend __op_t< _Self, _Rec> tag_invoke(connect_t, _Self&& __self, _Rec&& __receiver) noexcept { + friend __op_t< _Self, _Rec> + tag_invoke(connect_t, _Self&& __self, _Rec&& __receiver) noexcept { return { ((_Self&&) __self).__initial_sender_, ((_Self&&) __self).__final_sender_, diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp new file mode 100644 index 000000000..3bf068d35 --- /dev/null +++ b/include/exec/tail_sender.hpp @@ -0,0 +1,1035 @@ +/* + * Copyright (c) 2021-2022 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../stdexec/execution.hpp" +#include "env.hpp" + +namespace exec { + using namespace stdexec; + + template + concept __contextually_convertible_to_bool = requires(const _Ty __val) { + { (static_cast(__val) ? false : false) } -> same_as; + }; + + template + static constexpr bool __nothrow_contextually_convertible_to_bool_v = + noexcept((std::declval() ? (void) 0 : (void) 0)); + + template + concept __nothrow_contextually_convertible_to_bool = + __contextually_convertible_to_bool<_Ty> && __nothrow_contextually_convertible_to_bool_v<_Ty>; + + namespace __unwind { + struct unwind_t { + template + requires std::tag_invocable + void operator()(_Op& __op) const noexcept(std::nothrow_tag_invocable) { + (void) tag_invoke(unwind_t{}, __op); + } + }; + } + + using __unwind::unwind_t; + inline constexpr unwind_t unwind{}; + + namespace __sender_queries { + template + const _Ty& __cref_fn(const _Ty&); + template + using __cref_t = decltype(__sender_queries::__cref_fn(__declval<_Ty>())); + + struct always_completes_inline_t { + template + requires std::tag_invocable, __cref_t<_Env>> + constexpr bool operator()(_Sender&& __sndr, _Env&& __env) const noexcept { + static_assert( + same_as< + bool, + tag_invoke_result_t, __cref_t<_Env>>>); + static_assert( + std::nothrow_tag_invocable, __cref_t<_Env>>); + return tag_invoke(always_completes_inline_t{}, std::as_const(__sndr), std::as_const(__env)); + } + + template + requires std::tag_invocable< + always_completes_inline_t, + __mtype>, + __mtype>> + constexpr bool operator()( + __mtype> __sndr, + __mtype> __env) const noexcept { + static_assert( + same_as< + bool, + tag_invoke_result_t< + always_completes_inline_t, + __mtype>, + __mtype>>>); + static_assert(std::nothrow_tag_invocable< + always_completes_inline_t, + __mtype>, + __mtype>>); + return tag_invoke(always_completes_inline_t{}, __sndr, __env); + } + + constexpr bool operator()(auto&&, auto&&) const noexcept { + return false; + } + }; + } // namespace __sender_queries + + using __sender_queries::always_completes_inline_t; + inline constexpr always_completes_inline_t always_completes_inline{}; + + template + concept tail_operation_state = + operation_state<_TailOperationState> && std::is_nothrow_destructible_v<_TailOperationState> + && std::is_trivially_destructible_v<_TailOperationState> + && (!std::is_copy_constructible_v<_TailOperationState>) // + &&(!std::is_move_constructible_v< _TailOperationState>) // + &&(!std::is_copy_assignable_v< _TailOperationState>) // + &&(!std::is_move_assignable_v< _TailOperationState>) // + &&requires(_TailOperationState& __op) { + { unwind(__op) } noexcept; + }; + + template + inline constexpr bool always_completes_inline_v = + always_completes_inline(__mtype>{}, __mtype>{}); + + template + concept tail_sender = + sender<_TailSender, _Env> && same_as<__single_sender_value_t<_TailSender, _Env>, void> + && always_completes_inline_v<_TailSender, _Env> + && std::is_nothrow_move_constructible_v<_TailSender> + && std::is_nothrow_destructible_v<_TailSender> && std::is_trivially_destructible_v<_TailSender>; + + template + concept tail_receiver = + receiver<_TailReceiver> && std::is_nothrow_copy_constructible_v<_TailReceiver> + && std::is_nothrow_move_constructible_v<_TailReceiver> + && std::is_nothrow_copy_assignable_v<_TailReceiver> + && std::is_nothrow_move_assignable_v<_TailReceiver> + && std::is_nothrow_destructible_v<_TailReceiver> + && std::is_trivially_destructible_v<_TailReceiver>; + + struct __null_tail_receiver { + friend void tag_invoke(set_value_t, __null_tail_receiver&&, auto&&...) noexcept { + } + + friend void tag_invoke(set_stopped_t, __null_tail_receiver&&) noexcept { + } + + friend __empty_env tag_invoke(get_env_t, const __null_tail_receiver& __self) { + return {}; + } + }; + + struct __null_tail_sender { + struct __operation : __immovable { + __operation() = default; + + // this is a nullable_tail_sender that always returns false to prevent + // callers from calling start() and unwind() + inline constexpr operator bool() const noexcept { + return false; + } + + friend void tag_invoke(start_t, __operation& self) noexcept { + printf("__null_tail_sender start\n"); + fflush(stdout); + std::terminate(); + } + + friend void tag_invoke(unwind_t, __operation& self) noexcept { + printf("__null_tail_sender unwind\n"); + fflush(stdout); + std::terminate(); + } + }; + + using completion_signatures = completion_signatures; + + template + friend auto tag_invoke(connect_t, __null_tail_sender&&, _TailReceiver&&) noexcept -> op { + return {}; + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, + exec::__mtype<__null_tail_sender>, + exec::__mtype<_Env>) noexcept { + return true; + } + }; + + template + struct __variant_tail_sender; + + template + using __next_tail_from_operation_t = __call_result_t; + + template + using next_tail_from_operation_t = __if< + __bool>>, + __null_tail_sender, + __call_result_t>; + + template + using __next_tail_from_sender_to_t = + __next_tail_from_operation_t>; + + template + using next_tail_from_sender_to_t = + next_tail_from_operation_t>; + + template + concept tail_sender_to = // + tail_sender<_TailSender> // + && tail_receiver<_TailReceiver> // + && requires(_TailSender&& __sndr, _TailReceiver&& __rcvr) { // + { // + stdexec::connect((_TailSender&&) __sndr, (_TailReceiver&&) __rcvr) // + } noexcept -> tail_operation_state; // + } // + && tail_sender>; + + template + concept __terminal_tail_operation_state = + tail_operation_state<_TailOperationState> + && same_as<__next_tail_from_operation_t<_TailOperationState>, void>; + + template + concept __terminal_tail_sender_to = + tail_sender_to<_TailSender, _TailReceiver> + && same_as<__next_tail_from_sender_to_t<_TailSender, _TailReceiver>, void>; + + template + concept __recursive_tail_sender_to = + tail_sender_to<_TailSender, _TailReceiver> + && tail_operation_state> + && __one_of, _ValidTailSender...>; + + template + concept __nullable_tail_operation_state = + tail_operation_state<_TailOperationState> + && __nothrow_contextually_convertible_to_bool<_TailOperationState>; + + template + concept __nullable_tail_sender_to = + tail_sender_to<_TailSender, _TailReceiver> + && __nullable_tail_operation_state>; + +} // namespace exec + +#include "variant_tail_sender.hpp" + +namespace exec { + + template + constexpr std::decay_t<_To> result_from(_From&& __f) noexcept { + if constexpr ( + __is_instance_of, __variant_tail_sender> + && __is_instance_of, __variant_tail_sender>) { + return variant_cast>((_From&&) __f); + } else if constexpr ( + __is_instance_of, __variant_tail_sender> + && !__is_instance_of, __variant_tail_sender>) { + return get>((_From&&) __f); + } else { + static_assert(std::is_constructible_v<_To, _From>, "result_from cannot convert"); + return (_From&&) __f; + } + } + + template + requires(!same_as<__null_tail_sender, _TailSender>) + struct maybe_tail_sender { + maybe_tail_sender() noexcept = default; + + maybe_tail_sender(__null_tail_sender) noexcept { + } + + maybe_tail_sender(_TailSender __t) noexcept + : tail_sender_(__t) { + } + + template + struct op : __immovable { + using op_t = connect_result_t<_TailSender, _TailReceiver>; + op() = default; + + explicit op(_TailSender __t, _TailReceiver __rcvr) + : op_(stdexec::__conv{[&] { + return stdexec::connect(__t, __rcvr); + }}) { + } + + operator bool() const noexcept { + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + return !!op_ && !!*op_; + } else { + return !!op_; + } + } + + [[nodiscard]] friend auto tag_invoke(start_t, op& __self) noexcept { + if (!__self.op_) { + printf("maybe_tail_sender start optional\n"); + fflush(stdout); + std::terminate(); + } + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + if (!*__self.op_) { + printf("maybe_tail_sender start nullable\n"); + fflush(stdout); + std::terminate(); + } + } + return stdexec::start(*__self.op_); + } + + friend void tag_invoke(unwind_t, op& __self) noexcept { + if (!__self.op_) { + printf("maybe_tail_sender unwind optional\n"); + fflush(stdout); + std::terminate(); + } + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + if (!*__self.op_) { + printf("maybe_tail_sender unwind nullable\n"); + fflush(stdout); + std::terminate(); + } + } + exec::unwind(*__self.op_); + } + + std::optional op_; + }; + + using completion_signatures = completion_signatures; + + template + [[nodiscard]] friend auto + tag_invoke(connect_t, maybe_tail_sender&& __self, _TailReceiver&& __rcvr) noexcept + -> op> { + if (!__self.tail_sender_) { + return {}; + } + return op>{*((maybe_tail_sender&&) __self).tail_sender_, __rcvr}; + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, + exec::__mtype, + exec::__mtype<_Env>) noexcept { + return true; + } + + private: + std::optional<_TailSender> tail_sender_; + }; + + template + struct scoped_tail_sender { + explicit scoped_tail_sender(_TailSender __t, _TailReceiver __rcvr = _TailReceiver{}) noexcept + : t_(__t) + , r_(__rcvr) + , valid_(true) { + } + + scoped_tail_sender(scoped_tail_sender&& other) noexcept + : t_(other.s_) + , r_(other.r_) + , valid_(std::exchange(other.valid_, false)) { + } + + ~scoped_tail_sender() { + if (valid_) { + auto op = stdexec::connect(t_, r_); + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + if (!!op) { + exec::unwind(op); + } + } else { + exec::unwind(op); + } + } + } + + _TailSender get() noexcept { + return t_; + } + + _TailSender release() noexcept { + valid_ = false; + return t_; + } + + private: + _TailSender t_; + _TailReceiver r_; + bool valid_; + }; + + struct __all_resumed_tail_sender { + + using completion_signatures = completion_signatures; + + template + friend auto tag_invoke(connect_t, __all_resumed_tail_sender&&, _TailReceiver&& __rcvr) noexcept + -> __call_result_t { + return stdexec::connect(__null_tail_sender{}, __rcvr); + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, + exec::__mtype<__all_resumed_tail_sender>, + exec::__mtype<_Env>) noexcept { + return true; + } + }; + + namespace __start_until_nullable_ { + + struct __start_until_nullable_t; + + template + struct __start_until_nullable_result; + + template + using __start_until_nullable_result_t = + typename __start_until_nullable_result<_TailSender, _TailReceiver>::type; + + template + struct __start_until_nullable_result { + using type = __if< + __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, + _TailSender, + __if< + __bool<__terminal_tail_sender_to<_TailSender, _TailReceiver>>, + __all_resumed_tail_sender, + __minvoke< + __with_default<__q<__start_until_nullable_result_t>, __all_resumed_tail_sender>, + __next_tail_from_sender_to_t<_TailSender, _TailReceiver>, + _TailReceiver> > >; + }; + + struct __start_until_nullable_t { + template + auto operator()(_TailSender __t, _TailReceiver __rcvr) const noexcept + -> __start_until_nullable_result_t<_TailSender, _TailReceiver> { + if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + return __t; + } else if constexpr (__terminal_tail_sender_to<_TailSender, _TailReceiver>) { + // restrict scope of op + { + auto op = stdexec::connect(std::move(__t), std::move(__rcvr)); + stdexec::start(op); + } + return __all_resumed_tail_sender{}; + } else { + auto op = stdexec::connect(std::move(__t), __rcvr); + return __start_until_nullable_t{}(stdexec::start(op), std::move(__rcvr)); + } + } + }; + + } // namespace __start_until_nullable_ + + using __start_until_nullable_::__start_until_nullable_t; + inline constexpr __start_until_nullable_t __start_until_nullable{}; + + template < + tail_sender _NextTailSender, + tail_sender _TailSender, + tail_receiver _TailReceiver, + tail_sender... _PrevTailSenders> + auto __start_next(_NextTailSender __next, _TailReceiver __rcvr) noexcept; + + template + struct __start_sequential_result; + + template + using __start_sequential_result_t = + typename __start_sequential_result<_TailSender, _TailReceiver>::type; + + template + struct __start_sequential_result { + using next_t = next_tail_from_sender_to_t<_TailSender, _TailReceiver>; + using start_next_t = __call_result_t< + decltype(__start_next), + next_t, + _TailReceiver>; + using type = __if< + __bool< + __nullable_tail_sender_to<_TailSender, _TailReceiver> + && __terminal_tail_sender_to<_TailSender, _TailReceiver>>, + _TailSender, + __if< // elseif + __bool<__nullable_tail_sender_to<_TailSender, _TailReceiver>>, + variant_tail_sender<__all_resumed_tail_sender, start_next_t>, + __if< // elseif + __bool>, + start_next_t, + __all_resumed_tail_sender // else + > > >; + }; + + template + auto __start_sequential(_TailSender __sndr, _TailReceiver __rcvr) noexcept + -> __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...>; + + template < + tail_sender _NextTailSender, + tail_sender _TailSender, + tail_receiver _TailReceiver, + tail_sender... _PrevTailSenders> + auto __start_next(_NextTailSender __next, _TailReceiver __rcvr) noexcept { + if constexpr (__one_of<_NextTailSender, _TailSender, _PrevTailSenders...>) { + static_assert( + (__nullable_tail_sender_to<_TailSender, _TailReceiver> + || (__nullable_tail_sender_to<_PrevTailSenders, _TailReceiver> || ...)), + "At least one tail_sender in a cycle must be nullable to avoid " + "entering an infinite loop"); + return __start_until_nullable(__next, std::move(__rcvr)); + } else { + return __start_sequential<_NextTailSender, _TailReceiver, _TailSender, _PrevTailSenders...>( + __next, std::move(__rcvr)); + } + } + + template + auto __start_sequential(_TailSender __sndr, _TailReceiver __rcvr) noexcept + -> __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...> { + using next_t = next_tail_from_sender_to_t<_TailSender, _TailReceiver>; + using result_t = __start_sequential_result_t<_TailSender, _TailReceiver, _PrevTailSenders...>; + + if constexpr ( + __nullable_tail_sender_to<_TailSender, _TailReceiver> + && __terminal_tail_sender_to<_TailSender, _TailReceiver>) { + // halt the recursion + return __sndr; + } else if constexpr (__nullable_tail_sender_to<_TailSender, _TailReceiver>) { + // recurse if the nullable tail-sender is valid otherwise return + // a nullable and terminal tail-sender + auto op = stdexec::connect(std::move(__sndr), __rcvr); + if (!op) { + return __all_resumed_tail_sender{}; + } + return result_from( + __start_next( + stdexec::start(op), __rcvr)); + } else if constexpr (!__terminal_tail_sender_to<_TailSender, _TailReceiver>) { + auto op = stdexec::connect(std::move(__sndr), __rcvr); + return result_from( + __start_next( + stdexec::start(op), __rcvr)); + } else { + // run the terminal and not nullable tail-sender and return + // a nullable and terminal tail-sender + auto op = stdexec::connect(std::move(__sndr), __rcvr); + stdexec::start(op); + return __all_resumed_tail_sender{}; + } + } + + template + inline __null_tail_sender resume_tail_senders_until_one_remaining(_TailReceiver&&) noexcept { + return {}; + } + + template + _TailSender + resume_tail_senders_until_one_remaining(_TailReceiver&&, _TailSender __sndr) noexcept { + return __sndr; + } + + template + auto _resume_tail_senders_until_one_remaining( + _TailReceiver&& __rcvr, + std::index_sequence<_Is...>, + _TailSenders... __sndrs) noexcept { + using result_type = variant_tail_sender; + result_type result; + + auto cs2_tuple = std::make_tuple( + variant_tail_sender< + __all_resumed_tail_sender, + decltype(__start_sequential(__start_sequential(__sndrs, __rcvr), __rcvr))>{ + __start_sequential(__start_sequential(__sndrs, __rcvr), __rcvr)}...); + while (true) { + std::size_t remaining = sizeof...(__sndrs); + ((remaining > 1 ? ( + !holds_alternative<__all_resumed_tail_sender>(std::get<_Is>(cs2_tuple)) + ? (void) (result = result_from( + std::get<_Is>(cs2_tuple) = result_from(cs2_tuple))>( + __start_sequential(std::get<_Is>(cs2_tuple), __rcvr)))) + : (void) --remaining) + : (void) (result = result_from(std::get<_Is>(cs2_tuple)))), + ...); + + if (remaining <= 1) { + return result; + } + } + } + + template + auto resume_tail_senders_until_one_remaining( + _TailReceiver&& __rcvr, + _TailSenders... __sndrs) noexcept { + return _resume_tail_senders_until_one_remaining( + __rcvr, std::index_sequence_for<_TailSenders...>{}, __sndrs...); + } + + template + void resume_tail_senders(_TailReceiver&& __rcvr, _TailSenders... __sndrs) noexcept { + auto __last_tail = _resume_tail_senders_until_one_remaining( + __rcvr, std::index_sequence_for<_TailSenders...>{}, __sndrs...); + for (;;) { + auto __op = connect(__last_tail, __rcvr); + if (!__op) { + return; + } + if constexpr (__terminal_tail_sender_to) { + start(__last_tail); + return; + } else { + __last_tail = __start_sequential(start(__last_tail), __rcvr); + } + } + } + + ///////////////////////////////////////////////////////////////////////////// + // run_loop + namespace __loop { + class run_loop; + + struct __task : __immovable { + __task* __next_ = this; + + union { + void (*__execute_)(__task*) noexcept; + __task* __tail_; + }; + + void __execute() noexcept { + (*__execute_)(this); + } + }; + + template + struct __operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t : __task { + using __id = __operation; + + run_loop* __loop_; + [[no_unique_address]] _Receiver __rcvr_; + + static void __execute_impl(__task* __p) noexcept { + auto& __rcvr = ((__t*) __p)->__rcvr_; + try { + if (get_stop_token(get_env(__rcvr)).stop_requested()) { + set_stopped((_Receiver&&) __rcvr); + } else { + set_value((_Receiver&&) __rcvr); + } + } catch (...) { + set_error((_Receiver&&) __rcvr, std::current_exception()); + } + } + + explicit __t(__task* __tail) noexcept + : __task{.__tail_ = __tail} { + } + + __t(__task* __next, run_loop* __loop, _Receiver __rcvr) + : __task{{}, __next, {&__execute_impl}} + , __loop_{__loop} + , __rcvr_{(_Receiver&&) __rcvr} { + } + + friend void tag_invoke(start_t, __t& __self) noexcept { + __self.__start_(); + } + + void __start_() noexcept; + }; + }; + + template + struct __run_operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t { + using __id = __run_operation; + + run_loop* __loop_; + [[no_unique_address]] _Receiver __rcvr_; + + __t(run_loop* __loop, _Receiver __rcvr) + : __loop_{__loop} + , __rcvr_{(_Receiver&&) __rcvr} { + } + + friend void tag_invoke(start_t, __t& __self) noexcept { + __self.__start_(); + } + + void __start_() noexcept; + }; + }; + + class run_loop { + template + using __completion_signatures_ = completion_signatures; + + template + friend struct __operation; + public: + struct __scheduler { + using __t = __scheduler; + using __id = __scheduler; + bool operator==(const __scheduler&) const noexcept = default; + + private: + struct __schedule_task { + using __t = __schedule_task; + using __id = __schedule_task; + using completion_signatures = __completion_signatures_< + set_value_t(), + set_error_t(std::exception_ptr), + set_stopped_t()>; + + private: + friend __scheduler; + + template + using __operation = stdexec::__t<__operation>>; + + template + friend __operation<_Receiver> + tag_invoke(connect_t, const __schedule_task& __self, _Receiver __rcvr) { + return __self.__connect_((_Receiver&&) __rcvr); + } + + template + __operation<_Receiver> __connect_(_Receiver&& __rcvr) const { + return {&__loop_->__head_, __loop_, (_Receiver&&) __rcvr}; + } + + template + friend __scheduler + tag_invoke(get_completion_scheduler_t<_CPO>, const __schedule_task& __self) noexcept { + return __scheduler{__self.__loop_}; + } + + explicit __schedule_task(run_loop* __loop) noexcept + : __loop_(__loop) { + } + + run_loop* const __loop_; + }; + + friend run_loop; + + explicit __scheduler(run_loop* __loop) noexcept + : __loop_(__loop) { + } + + friend __schedule_task tag_invoke(schedule_t, const __scheduler& __self) noexcept { + return __self.__schedule(); + } + + friend stdexec::forward_progress_guarantee + tag_invoke(get_forward_progress_guarantee_t, const __scheduler&) noexcept { + return stdexec::forward_progress_guarantee::parallel; + } + + // BUGBUG NOT TO SPEC + friend bool + tag_invoke(this_thread::execute_may_block_caller_t, const __scheduler&) noexcept { + return false; + } + + __schedule_task __schedule() const noexcept { + return __schedule_task{__loop_}; + } + + run_loop* __loop_; + }; + + __scheduler get_scheduler() noexcept { + return __scheduler{this}; + } + + struct __run_sender { + using __t = __run_sender; + using __id = __run_sender; + using completion_signatures = __completion_signatures_< set_value_t(), set_stopped_t()>; + + private: + friend __scheduler; + + template + using __operation = stdexec::__t<__operation>>; + + template + friend __operation<_Receiver> + tag_invoke(connect_t, const __run_sender& __self, _Receiver __rcvr) { + return __self.__connect_((_Receiver&&) __rcvr); + } + + template + __operation<_Receiver> __connect_(_Receiver&& __rcvr) const { + return {&__loop_->__head_, __loop_, (_Receiver&&) __rcvr}; + } + + template + friend __scheduler + tag_invoke(get_completion_scheduler_t<_CPO>, const __run_sender& __self) noexcept { + return __scheduler{__self.__loop_}; + } + + explicit __run_sender(run_loop* __loop) noexcept + : __loop_(__loop) { + } + + run_loop* __loop_; + }; + template + __run_sender run(_Sender&& __sndr); + + void run(); + + void finish(); + + private: + void __push_back_(__task* __task); + __task* __pop_front_(); + + std::mutex __mutex_; + std::condition_variable __cv_; + __task __head_{.__tail_ = &__head_}; + bool __stop_ = false; + }; + + template + inline void __operation<_ReceiverId>::__t::__start_() noexcept try { + __loop_->__push_back_(this); + } catch (...) { + + set_error((_Receiver&&) __rcvr_, std::current_exception()); + } + + template + run_loop::__run_sender run_loop::run(_Sender&& __sndr) { + return {this, (_Sender&&) __sndr}; + } + + inline void run_loop::run() { + for (__task* __task; (__task = __pop_front_()) != &__head_;) { + __task->__execute(); + } + } + + inline void run_loop::finish() { + std::unique_lock __lock{__mutex_}; + __stop_ = true; + __cv_.notify_all(); + } + + inline void run_loop::__push_back_(__task* __task) { + std::unique_lock __lock{__mutex_}; + __task->__next_ = &__head_; + __head_.__tail_ = __head_.__tail_->__next_ = __task; + __cv_.notify_one(); + } + + inline __task* run_loop::__pop_front_() { + std::unique_lock __lock{__mutex_}; + __cv_.wait(__lock, [this] { return __head_.__next_ != &__head_ || __stop_; }); + if (__head_.__tail_ == __head_.__next_) + __head_.__tail_ = &__head_; + return std::exchange(__head_.__next_, __head_.__next_->__next_); + } + } // namespace __loop + + // NOT TO SPEC + using run_loop = __loop::run_loop; + + ///////////////////////////////////////////////////////////////////////////// + // [execution.senders.consumers.sync_wait] + // [execution.senders.consumers.sync_wait_with_variant] + namespace __sync_wait { + template + using __into_variant_result_t = decltype(stdexec::into_variant(__declval<_Sender>())); + + struct __env { + using __t = __env; + using __id = __env; + stdexec::run_loop::__scheduler __sched_; + + friend auto tag_invoke(stdexec::get_scheduler_t, const __env& __self) noexcept + -> stdexec::run_loop::__scheduler { + return __self.__sched_; + } + + friend auto tag_invoke(stdexec::get_delegatee_scheduler_t, const __env& __self) noexcept + -> stdexec::run_loop::__scheduler { + return __self.__sched_; + } + }; + + // What should sync_wait(just_stopped()) return? + template + using __sync_wait_result_impl = + __value_types_of_t< _Sender, __env, __transform<__q, _Continuation>, __q<__msingle>>; + + template _Sender> + using __sync_wait_result_t = __sync_wait_result_impl<_Sender, __q>; + + template + using __sync_wait_with_variant_result_t = + __sync_wait_result_t<__into_variant_result_t<_Sender>>; + + template + struct __state { + using _Tuple = std::tuple<_Values...>; + std::variant __data_{}; + }; + + template + struct __receiver { + struct __t { + using __id = __receiver; + __state<_Values...>* __state_; + stdexec::run_loop* __loop_; + + template + void __set_error(_Error __err) noexcept { + if constexpr (__decays_to<_Error, std::exception_ptr>) + __state_->__data_.template emplace<2>((_Error&&) __err); + else if constexpr (__decays_to<_Error, std::error_code>) + __state_->__data_.template emplace<2>( + std::make_exception_ptr(std::system_error(__err))); + else + __state_->__data_.template emplace<2>(std::make_exception_ptr((_Error&&) __err)); + __loop_->finish(); + } + + template + requires constructible_from, _As...> + friend void tag_invoke(stdexec::set_value_t, __t&& __rcvr, _As&&... __as) noexcept try { + __rcvr.__state_->__data_.template emplace<1>((_As&&) __as...); + __rcvr.__loop_->finish(); + } catch (...) { + + __rcvr.__set_error(std::current_exception()); + } + + template + friend void tag_invoke(stdexec::set_error_t, __t&& __rcvr, _Error __err) noexcept { + __rcvr.__set_error((_Error&&) __err); + } + + friend void tag_invoke(stdexec::set_stopped_t __d, __t&& __rcvr) noexcept { + __rcvr.__state_->__data_.template emplace<3>(__d); + __rcvr.__loop_->finish(); + } + + friend __env tag_invoke(stdexec::get_env_t, const __t& __rcvr) noexcept { + return {__rcvr.__loop_->get_scheduler()}; + } + }; + }; + + template + using __into_variant_result_t = decltype(stdexec::into_variant(__declval<_Sender>())); + + //////////////////////////////////////////////////////////////////////////// + // [execution.senders.consumers.sync_wait] + struct sync_wait_t { + template + using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>; + + // TODO: constrain on return type + template <__single_value_variant_sender<__env> _Sender> // NOT TO SPEC + requires __tag_invocable_with_completion_scheduler< sync_wait_t, set_value_t, _Sender> + tag_invoke_result_t< sync_wait_t, __completion_scheduler_for<_Sender, set_value_t>, _Sender> + operator()(_Sender&& __sndr) const + noexcept(nothrow_tag_invocable< + sync_wait_t, + __completion_scheduler_for<_Sender, set_value_t>, + _Sender>) { + auto __sched = get_completion_scheduler(__sndr); + return tag_invoke(sync_wait_t{}, std::move(__sched), (_Sender&&) __sndr); + } + + // TODO: constrain on return type + template <__single_value_variant_sender<__env> _Sender> // NOT TO SPEC + requires(!__tag_invocable_with_completion_scheduler< sync_wait_t, set_value_t, _Sender>) + && tag_invocable + tag_invoke_result_t operator()(_Sender&& __sndr) const + noexcept(nothrow_tag_invocable) { + return tag_invoke(sync_wait_t{}, (_Sender&&) __sndr); + } + + template <__single_value_variant_sender<__env> _Sender> + requires(!__tag_invocable_with_completion_scheduler< sync_wait_t, set_value_t, _Sender>) + && (!tag_invocable) && sender<_Sender, __env> + && sender_to<_Sender, __receiver_t<_Sender>> + auto operator()(_Sender&& __sndr) const -> std::optional<__sync_wait_result_t<_Sender>> { + using state_t = __sync_wait_result_impl<_Sender, __q<__state>>; + using tail_t = next_tail_from_sender_to_t<_Sender, __receiver_t<_Sender>>; + state_t __state{}; + run_loop __loop; + + // Launch the sender with a continuation that will fill in a variant + // and notify a condition variable. + auto __op_state = connect((_Sender&&) __sndr, __receiver_t<_Sender>{&__state, &__loop}); + tail_t __tail = start(__op_state); + + // Wait for the variant to be filled in. + auto __tail_run = __loop.run(just()); + + resume_tail_senders(__null_tail_sender{}, __tail, __tail_run); + + if (__state.__data_.index() == 2) + std::rethrow_exception(std::get<2>(__state.__data_)); + + if (__state.__data_.index() == 3) + return std::nullopt; + + return std::move(std::get<1>(__state.__data_)); + } + }; + } // namespace __sync_wait + + using __sync_wait::sync_wait_t; + inline constexpr sync_wait_t sync_wait{}; +} // namespace exec diff --git a/include/exec/variant_tail_sender.hpp b/include/exec/variant_tail_sender.hpp new file mode 100644 index 000000000..6c7bbc643 --- /dev/null +++ b/include/exec/variant_tail_sender.hpp @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2021-2022 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../stdexec/execution.hpp" +#include "env.hpp" + +#include + +namespace exec { + using namespace stdexec; + + template + struct __variant_tail_sender : private std::variant<_TailSenderN...> { + static_assert(sizeof...(_TailSenderN) >= 1, "variant_tail_sender requires at least one sender"); + static_assert( + (tail_sender<_TailSenderN> && ...), + "variant_tail_sender requires all senders to be tail_sender"); + + using __senders_t = std::variant<_TailSenderN...>; + + using __senders_t::__senders_t; + using __senders_t::operator=; + using __senders_t::index; + using __senders_t::emplace; + using __senders_t::swap; + + // __variant_tail_sender() = default; + // template + // __variant_tail_sender(const __variant_tail_sender<_OtherTailSenders...>& __o) + // : __senders_t(static_cast&>(__o)) {} + // template + // __variant_tail_sender(__variant_tail_sender<_OtherTailSenders...>&& __o) + // : __senders_t(static_cast&&>(__o)) {} + // template + // __variant_tail_sender& operator=(const __variant_tail_sender<_OtherTailSenders...>& __o) { + // __senders_t::operator=(__o); + // return *this; + // } + // template + // __variant_tail_sender& operator=(__variant_tail_sender<_OtherTailSenders...>&& __o) { + // __senders_t::operator=(std::move(__o)); + // return *this; + // } + + // template + // requires (__is_not_instance_of<_An, __variant_tail_sender> && ...) + // __variant_tail_sender(_An&&... __an) : __senders_t((_An&&)__an...) {} + // template + // requires __is_not_instance_of<_A, __variant_tail_sender> + // __variant_tail_sender& operator=(_A&& __a) { + // __senders_t::operator=((_A&&)__a); + // return *this; + // } + + template + struct op { + using __opn_t = + __variant>...>; + using __start_result_t = + __variant_tail_sender...>; + + op(const op&) = delete; + op(op&&) = delete; + op& operator=(const op&) = delete; + op& operator=(op&&) = delete; + + explicit op(__senders_t&& __t, _TailReceiver __r) { + std::visit( + [&, this](auto&& __t) -> void { + using _T = std::remove_cvref_t; + if constexpr (tail_sender<_T>) { + static_assert( + tail_sender_to<_T, _TailReceiver>, "variant-tail-sender member cannot connect"); + using op_t = connect_result_t<_T, _TailReceiver>; + using opt_t = std::optional; + __opn_.template emplace(); + opt_t& opt = std::get(__opn_); + opt.~opt_t(); + new (&opt) opt_t{stdexec::__conv{[&]() -> op_t { + return stdexec::connect((decltype(__t)&&) __t, __r); + }}}; + } else { + std::terminate(); + } + }, + (__senders_t&&) __t); + } + + operator bool() const noexcept { + return std::visit( + [&](auto&& __op) -> bool { + using _Opt = std::decay_t; + if constexpr (__is_instance_of_<_Opt, std::optional>) { + auto& op = *__op; + using _Op = std::decay_t; + if constexpr (__nullable_tail_operation_state<_Op>) { + return !!op; + } + return true; + } else { + std::terminate(); + } + }, + __opn_); + } + + [[nodiscard]] friend __start_result_t tag_invoke(start_t, op& __self) noexcept { + return std::visit( + [&](auto&& __op) -> __start_result_t { + using _Opt = std::decay_t; + if constexpr (__is_instance_of_<_Opt, std::optional>) { + auto& op = *__op; + using _Op = std::decay_t; + if constexpr (__nullable_tail_operation_state<_Op>) { + if (!op) { + return __start_result_t{}; + } + } + if constexpr (__terminal_tail_operation_state<_Op>) { + stdexec::start(op); + return __start_result_t{}; + } else { + return result_from<__start_result_t>(stdexec::start(op)); + } + } else { + std::terminate(); + } + }, + __self.__opn_); + } + + friend void tag_invoke(unwind_t, op& __self) noexcept { + return std::visit( + [&](auto&& __op) -> void { + using _Opt = std::decay_t; + if constexpr (__is_instance_of_<_Opt, std::optional>) { + exec::unwind(*__op); + } else { + std::terminate(); + } + }, + __self.__opn_); + } + + __opn_t __opn_; + }; + + using completion_signatures = completion_signatures; + + template + [[nodiscard]] friend auto + tag_invoke(connect_t, __variant_tail_sender&& __self, _TailReceiver&& __r) noexcept + -> op> { + return op>{ + (__variant_tail_sender&&) __self, (_TailReceiver&&) __r}; + } + + template + friend constexpr bool tag_invoke( + exec::always_completes_inline_t, + exec::__mtype<__variant_tail_sender>, + exec::__mtype<_Env>) noexcept { + return true; + } + + private: + template + friend struct __variant_tail_sender; + + template + friend constexpr _To variant_cast(__variant_tail_sender __f) noexcept { + return std::visit( + [](_U&& __u) -> _To { + if constexpr (stdexec::__v<__mapply<__contains<_U>, _To>>) { + return _To{(_U&&) __u}; + } else { + printf("variant_cast\n"); + fflush(stdout); + std::terminate(); + } + }, + std::move(static_cast<__senders_t&>(__f))); + } + + template + friend constexpr std::decay_t<_To> get(__variant_tail_sender __f) noexcept { + static_assert( + stdexec::__v<__mapply<__contains>, __variant_tail_sender>>, + "get does not have _To as an alternative"); + if (!holds_alternative>(__f)) { + printf("get\n"); + fflush(stdout); + std::terminate(); + } + return std::get>(std::move(static_cast<__senders_t&>(__f))); + } + + template < class T> + friend inline constexpr bool holds_alternative(const __variant_tail_sender& v) noexcept { + return std::holds_alternative(v); + } + }; + + template