Skip to content

Commit

Permalink
✨ Add periodic adapter
Browse files Browse the repository at this point in the history
Problem:
- There is no way to periodically run a sender without drift.

Solution:
- Introduce `periodic`, `periodic_n`, and `periodic_until`.

Notes:
- `periodic` interacts with the durationless time_scheduler. On first `start`,
  it schedules an expiry time of `now + duration`. On subsequence `start`s, it
  schedules an expiry time of `previous expiry + duration`. In this way it
  eliminates drift caused by the small amount of time taken to manage the
  timer interrupt.
- The `get_expiration` query returns an `expiration_provider` rather than a bare
  expiration time because computing an expiration time typically involves
  calling `HAL::now()`, which entails making sure that `HAL::enable`
  has been called.
  • Loading branch information
elbeno committed Feb 20, 2025
1 parent 79db143 commit 70d7011
Show file tree
Hide file tree
Showing 13 changed files with 792 additions and 62 deletions.
5 changes: 5 additions & 0 deletions docs/schedulers.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ async::timer_mgr::service_task();
// x is now 42
----

It is also possible to create a `time_scheduler` without specifying a duration:
in this case it will use the connect receiver's environment to obtain an
`expiration_provider` in order to compute the expiration time. The xref:sender_adaptors.adoc#_periodic[`periodic`]
adapter uses such an environment to achieve drift-free periodic scheduling.

==== HAL interaction

The various HAL functions are called as follows:
Expand Down
69 changes: 66 additions & 3 deletions docs/sender_adaptors.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,64 @@ auto let_sndr = async::let_value(
This works: using the helper function `make_variant_sender`, `let_value` can
successfully make a runtime choice about which sender to proceed with.

=== `periodic`

Found in the header: `async/periodic.hpp`

`periodic` takes a sender and repeats it indefinitely according to the given
time period. When the sender completes with a value, it is reconnected and
restarted. This is useful for periodic tasks. A `periodic` sender can still be
stopped, or complete with an error.

[source,cpp]
----
auto s = time_scheduler{}.sender() | ... ;
auto p = s | async::periodic(1s);
// when p runs, s is scheduled for 1 second in the future. If s sends an error
// or is stopped, p reflects that. If s completes successfully, the result is
// discarded and s runs again, another second in the future.
----

NOTE: `periodic` works hand-in-glove with a
xref:schedulers.adoc#_time_scheduler[`time_scheduler`] sender.

=== `periodic_n`

Found in the header: `async/periodic.hpp`

`periodic_n` works the same way as `periodic`, but repeats a given number of times.

[source,cpp]
----
auto s = time_scheduler{}.sender() | ... ;
auto p = s | async::periodic_n(1s, 5);
// p repeats s 5 times
----

NOTE: `periodic_n` must always run at least once to be able to complete. So
`periodic_n(1)` repeats once, i.e. runs twice. `periodic_n(0)` runs once (thus
is redundant expect for the scheduling).

=== `periodic_until`

Found in the header: `async/periodic.hpp`

`periodic_until` works the same way as `periodic`, but repeats the sender until a
given predicate returns true.

[source,cpp]
----
auto s = time_scheduler{}.sender() | ... ;
auto p = s | async::periodic_until(1s, [] (auto&&...) { return true; });
----

NOTE: The arguments passed to the predicate are those in the value completion(s)
of the sender.

NOTE: `periodic` never completes other than by error or cancellation, but
`periodic_n` and `periodic_until` both complete successfully with the same
completion as the adapted sender.

=== `repeat`

Found in the header: `async/repeat.hpp`
Expand All @@ -191,9 +249,10 @@ auto s = some_sender | async::repeat();
// and some_sender runs again.
----

CAUTION: `repeat` can cause stack overflows if used with a scheduler that
doesn't break the callstack, like
xref:schedulers.adoc#_inline_scheduler[`inline_scheduler`].
NOTE: The difference between `periodic` and `repeat` is that `periodic`
interacts with the `time_scheduler` to eliminate any drift caused by
bookkeeping. `periodic` must work with a `time_scheduler` sender; `repeat` can
work with any sender.

=== `repeat_n`

Expand All @@ -220,6 +279,10 @@ auto s = some_sender | async::repeat_until([] (auto&&...) { return true; });
NOTE: The arguments passed to the predicate are those in the value completion(s)
of the sender.

NOTE: `repeat` never completes other than by error or cancellation, but
`repeat_n` and `repeat_until` both complete successfully with the same
completion as the adapted sender.

=== `retry`

Found in the header: `async/retry.hpp`
Expand Down
8 changes: 8 additions & 0 deletions docs/synopsis.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ by `let_error.hpp`, `let_stopped.hpp`, and `let_value.hpp`
==== https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/let_value.hpp[let_value.hpp]
* `let_value` - a xref:sender_adaptors.adoc#_let_value[sender adaptor] that can make runtime decisions on the value channel

==== https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/periodic.hpp[periodic.hpp]
* `periodic` - a xref:sender_adaptors.adoc#_periodic[sender adaptor] that repeats a sender indefinitely, periodically without drift
* `periodic_n` - a xref:sender_adaptors.adoc#_periodic_n[sender adaptor] that repeats a sender a set number of times, periodically without drift
* `periodic_until` - a xref:sender_adaptors.adoc#_periodic_until[sender adaptor] that repeats a sender until a condition becomes true, periodically without drift

==== https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/read_env.hpp[read_env.hpp]
* `get_scheduler` - a sender factory equivalent to `read_env(get_scheduler_t{})`
* `get_stop_token` - a sender factory equivalent to `read_env(get_stop_token_t{})`
Expand Down Expand Up @@ -247,6 +252,9 @@ contains traits and metaprogramming constructs used by many senders.
* `operation_state<O>` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/concepts.hpp[`#include <async/concepts.hpp>`]
* `priority_t` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/schedulers/task_manager_interface.hpp[`#include <async/schedulers/task_manager_interface.hpp>`]
* `priority_task_manager<HAL, NumPriorities>` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/schedulers/task_manager.hpp[`#include <async/schedulers/task_manager.hpp>`]
* xref:sender_adaptors.adoc#_periodic[`periodic`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/periodic.hpp[`#include <async/periodic.hpp>`]
* xref:sender_adaptors.adoc#_periodic_n[`periodic_n`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/periodic.hpp[`#include <async/periodic.hpp>`]
* xref:sender_adaptors.adoc#_periodic_until[`periodic_until`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/periodic.hpp[`#include <async/periodic.hpp>`]
* xref:sender_factories.adoc#_read_env[`read_env`] - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/read_env.hpp[`#include <async/read_env.hpp>`]
* `receiver<R>` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/concepts.hpp[`#include <async/concepts.hpp>`]
* `receiver_base` - https://github.com/intel/cpp-baremetal-senders-and-receivers/blob/main/include/async/concepts.hpp[`#include <async/concepts.hpp>`]
Expand Down
Loading

0 comments on commit 70d7011

Please sign in to comment.