Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added receiver_of and run_loop #16

Merged
merged 4 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/questions.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ likely observable.
- [exec.sched] uses `auto(get_completion_scheduler<set_value_t>(...))`
which is OK for clang but doesn't seem to compile for g++ os MSVC.
- [exec.just] p2.1: movable-value<Ts> doesn't seems right: movable-value<decay_t<Ts>>
- [exec.just] Otherwise after p2.3 is missing <ts...>
- [exec.just] Otherwise after p2.3 is missing <ts...>
- [exec.run.loop.types] p9.1: "refers remains" -> "refers to remains"
- [exec.run.loop.types] p9.2: "get_stop_token(REC(o))": REC is a receiver, any
environment would be attached to get_env(REC(o)).
3 changes: 2 additions & 1 deletion include/beman/execution26/detail/connect.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ namespace beman::execution26
struct connect_t
{
template <typename Sender, typename Receiver>
auto operator()(Sender&& sender, Receiver&& receiver) const
auto operator()(Sender&& sender, Receiver&& receiver) const
noexcept(true/*-dk:TODO*/)
{
auto new_sender = [&sender, &receiver]() -> decltype(auto) {
return ::beman::execution26::transform_sender(
Expand Down
1 change: 1 addition & 0 deletions include/beman/execution26/detail/receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#define INCLUDED_BEMAN_EXECUTION26_DETAIL_RECEIVER

#include <beman/execution26/detail/queryable.hpp>
#include <beman/execution26/detail/get_env.hpp>
#include <concepts>
#include <type_traits>

Expand Down
24 changes: 24 additions & 0 deletions include/beman/execution26/detail/receiver_of.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// include/beman/execution26/detail/receiver_of.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_RECEIVER_OF
#define INCLUDED_BEMAN_EXECUTION26_DETAIL_RECEIVER_OF

#include <beman/execution26/detail/has_completions.hpp>
#include <beman/execution26/detail/receiver.hpp>

// ----------------------------------------------------------------------------

namespace beman::execution26
{
template <typename Receiver, typename Completions>
concept receiver_of
= beman::execution26::receiver<Receiver>
&& beman::execution26::detail::has_completions<Receiver, Completions>
;

}

// ----------------------------------------------------------------------------

#endif
191 changes: 191 additions & 0 deletions include/beman/execution26/detail/run_loop.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// include/beman/execution26/detail/run_loop.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_RUN_LOOP
#define INCLUDED_BEMAN_EXECUTION26_DETAIL_RUN_LOOP

#include <beman/execution26/detail/completion_signatures.hpp>
#include <beman/execution26/detail/get_completion_scheduler.hpp>
#include <beman/execution26/detail/get_env.hpp>
#include <beman/execution26/detail/get_stop_token.hpp>
#include <beman/execution26/detail/operation_state.hpp>
#include <beman/execution26/detail/scheduler.hpp>
#include <beman/execution26/detail/sender.hpp>
#include <beman/execution26/detail/set_error.hpp>
#include <beman/execution26/detail/set_stopped.hpp>
#include <beman/execution26/detail/set_value.hpp>

#include <exception>
#include <condition_variable>
#include <mutex>
#include <type_traits>
#include <utility>

// ----------------------------------------------------------------------------

namespace beman::execution26
{
class run_loop
{
private:
struct scheduler;

struct env
{
run_loop* loop;

template <typename Completion>
auto query(::beman::execution26::get_completion_scheduler_t<Completion> const&) const
noexcept
-> scheduler
{
return {this->loop};
}
};
struct opstate_base
{
opstate_base* next{};
virtual auto execute() noexcept -> void = 0;
};
template <typename Receiver>
struct opstate
: opstate_base
{
using operation_state_concept = ::beman::execution26::operation_state_t;

run_loop* loop;
Receiver receiver;

template <typename R>
opstate(run_loop* loop, R&& receiver)
: loop(loop)
, receiver(::std::forward<Receiver>(receiver))
{
}
opstate(opstate&&) = delete;
auto start() & noexcept -> void
{
try
{
this->loop->push_back(this);
}
catch(...)
{
::beman::execution26::set_error(
::std::move(this->receiver),
::std::current_exception());
}

}
auto execute() noexcept -> void override
{
if (::beman::execution26::get_stop_token(
::beman::execution26::get_env(this->receiver)
).stop_requested())
::beman::execution26::set_stopped(::std::move(this->receiver));
else
::beman::execution26::set_value(::std::move(this->receiver));
}
};
struct sender
{
using sender_concept = ::beman::execution26::sender_t;
using completion_signatures = ::beman::execution26::completion_signatures<
::beman::execution26::set_value_t(),
::beman::execution26::set_error_t(::std::exception_ptr),
::beman::execution26::set_stopped_t()
>;

run_loop* loop;

auto get_env() const noexcept -> env { return {this->loop}; }
template <typename Receiver>
auto connect(Receiver&& receiver) noexcept
-> opstate<::std::decay_t<Receiver>>
{
return {this->loop, ::std::forward<Receiver>(receiver)};
}
};
struct scheduler
{
using scheduler_concept = ::beman::execution26::scheduler_t;

run_loop* loop;

auto schedule() noexcept -> sender { return {this->loop}; }
auto operator== (scheduler const&) const -> bool = default;
};

enum class state { starting, running, finishing };

state current_state{state::starting};
::std::mutex mutex{};
::std::condition_variable condition{};
opstate_base* front{};
opstate_base* back{};

auto push_back(opstate_base* item) -> void
{
::std::lock_guard guard(this->mutex);
if (auto previous_back{::std::exchange(this->back, item)})
{
previous_back->next = item;
}
else
{
this->front = item;
this->condition.notify_one();
}
}
auto pop_front() -> opstate_base*
{
::std::unique_lock guard(this->mutex);
this->condition.wait(guard,
[this]{ return this->front || this->current_state == state::finishing; });
if (this->front == this->back)
this->back = nullptr;
return this->front? ::std::exchange(this->front, this->front->next): nullptr;
}

public:
run_loop() noexcept = default;
run_loop(run_loop&&) = delete;
~run_loop()
{
::std::lock_guard guard(this->mutex);
if (this->front != nullptr || this->current_state == state::running)
::std::terminate();
}

auto get_scheduler() -> scheduler { return {this}; }

auto run() -> void
{
{
::std::lock_guard guard(this->mutex);
auto current{::std::exchange(this->current_state, state::running)};
if (state::starting != current)
{
::std::terminate();
}
}

while (auto* op{this->pop_front()})
{
op->execute();
}
}
auto finish() -> void
{
{
::std::lock_guard guard(this->mutex);
this->current_state = state::finishing;
}
this->condition.notify_one();
}
};
}

// ----------------------------------------------------------------------------

#endif
2 changes: 2 additions & 0 deletions src/beman/execution26/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ target_sources(${TARGET_LIBRARY}
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/query_with_default.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/queryable.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/receiver.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/receiver_of.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/run_loop.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/sched_attrs.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/sched_env.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/schedule.hpp
Expand Down
2 changes: 2 additions & 0 deletions src/beman/execution26/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

list(APPEND execution_tests
exec-run-loop-types.pass
exec-run-loop-general.pass
exec-just.pass
exec-snd-expos.pass
exec-recv-concepts.pass
Expand Down
90 changes: 86 additions & 4 deletions src/beman/execution26/tests/exec-recv-concepts.pass.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// src/beman/execution26/tests/exec-recv-concepts.pass.cpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#include <beman/execution26/detail/receiver_of.hpp>
#include <beman/execution26/detail/has_completions.hpp>
#include <beman/execution26/detail/valid_completion_for.hpp>
#include <beman/execution26/execution.hpp>
Expand Down Expand Up @@ -33,9 +34,10 @@ namespace
auto set_stopped() && noexcept -> void {}
};

template <typename Concept = test_std::receiver_t>
struct multi_receiver
{
using receiver_concept = test_std::receiver_t;
using receiver_concept = Concept;

auto set_value(int) && noexcept -> void {}
auto set_value(int, arg) && noexcept -> void {}
Expand Down Expand Up @@ -120,7 +122,7 @@ namespace

static_assert(test_std::receiver<stopped_receiver>);
static_assert(test_detail::has_completions<
multi_receiver,
multi_receiver<>,
test_std::completion_signatures<
test_std::set_value_t(int),
test_std::set_value_t(int, arg),
Expand All @@ -130,7 +132,7 @@ namespace
>
>);
static_assert(not test_detail::has_completions<
multi_receiver,
multi_receiver<>,
test_std::completion_signatures<
test_std::set_value_t(int),
test_std::set_value_t(arg, int),
Expand All @@ -140,7 +142,86 @@ namespace
>
>);
static_assert(not test_detail::has_completions<
multi_receiver,
multi_receiver<>,
test_std::completion_signatures<
test_std::set_value_t(int),
test_std::set_value_t(int, arg),
test_std::set_value_t(arg, arg),
test_std::set_error_t(int),
test_std::set_stopped_t()
>
>);
}

auto test_receiver_of() -> void
{
static_assert(test_std::receiver_of<
value_receiver<int>,
test_std::completion_signatures<>
>);
static_assert(test_std::receiver_of<
value_receiver<int>,
test_std::completion_signatures<test_std::set_value_t(int)>
>);
static_assert(not test_std::receiver_of<
value_receiver<int>,
test_std::completion_signatures<test_std::set_value_t(int, int)>
>);

static_assert(test_std::receiver_of<
error_receiver<int>,
test_std::completion_signatures<test_std::set_error_t(int)>
>);
static_assert(not test_std::receiver_of<
error_receiver<int>,
test_std::completion_signatures<test_std::set_error_t(error)>
>);
static_assert(test_std::receiver_of<
error_receiver<error>,
test_std::completion_signatures<test_std::set_error_t(error)>
>);

static_assert(not test_std::receiver_of<
error_receiver<error>,
test_std::completion_signatures<test_std::set_stopped_t()>
>);
static_assert(test_std::receiver_of<
stopped_receiver,
test_std::completion_signatures<test_std::set_stopped_t()>
>);

static_assert(test_std::receiver_of<
multi_receiver<>,
test_std::completion_signatures<
test_std::set_value_t(int),
test_std::set_value_t(int, arg),
test_std::set_value_t(arg, arg),
test_std::set_error_t(error),
test_std::set_stopped_t()
>
>);
static_assert(not test_std::receiver_of<
multi_receiver<int>,
test_std::completion_signatures<
test_std::set_value_t(int),
test_std::set_value_t(int, arg),
test_std::set_value_t(arg, arg),
test_std::set_error_t(error),
test_std::set_stopped_t()
>
>);
static_assert(not test_std::receiver_of<
multi_receiver<>,
test_std::completion_signatures<
test_std::set_value_t(int),
test_std::set_value_t(arg, int),
test_std::set_value_t(arg, arg),
test_std::set_error_t(error),
test_std::set_stopped_t()
>
>);
static_assert(not test_std::receiver_of<
multi_receiver<>,
test_std::completion_signatures<
test_std::set_value_t(int),
test_std::set_value_t(int, arg),
Expand All @@ -156,4 +237,5 @@ auto main() -> int
{
test_valid_completion_for();
test_has_completions();
test_receiver_of();
}
Loading