Skip to content

Commit

Permalink
Add support for immediate executors
Browse files Browse the repository at this point in the history
Summary: related to T13767

Reviewers: ivica

Reviewed By: ivica

Subscribers: iljazovic, miljen

Differential Revision: https://repo.mireo.local/D30809
  • Loading branch information
ksimicevic committed Aug 8, 2024
1 parent 7bc1ccf commit b55ec67
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 89 deletions.
30 changes: 20 additions & 10 deletions include/async_mqtt5/detail/cancellable_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/associated_cancellation_slot.hpp>
#include <boost/asio/associated_immediate_executor.hpp>
#include <boost/asio/cancellation_state.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/prepend.hpp>

#include <async_mqtt5/detail/async_traits.hpp>
Expand Down Expand Up @@ -41,11 +42,6 @@ class cancellable_handler {
cancellable_handler(cancellable_handler&&) = default;
cancellable_handler(const cancellable_handler&) = delete;

using executor_type = tracking_type<Handler, Executor>;
executor_type get_executor() const noexcept {
return _handler_ex;
}

using allocator_type = asio::associated_allocator_t<Handler>;
allocator_type get_allocator() const noexcept {
return asio::get_associated_allocator(_handler);
Expand All @@ -56,7 +52,20 @@ class cancellable_handler {
return _cancellation_state.slot();
}

asio::cancellation_type_t cancelled() const noexcept {
using executor_type = tracking_type<Handler, Executor>;
executor_type get_executor() const noexcept {
return _handler_ex;
}

using immediate_executor_type =
asio::associated_immediate_executor_t<Handler, Executor>;
immediate_executor_type get_immediate_executor() const noexcept {
// get_associated_immediate_executor will require asio::execution::blocking.never
// on the default executor.
return asio::get_associated_immediate_executor(_handler, _executor);
}

asio::cancellation_type_t cancelled() const {
return _cancellation_state.cancelled();
}

Expand All @@ -67,10 +76,11 @@ class cancellable_handler {
}

template <typename... Args>
void complete_post(Args&&... args) {
void complete_immediate(Args&&... args) {
asio::get_associated_cancellation_slot(_handler).clear();
asio::post(
_executor,
auto ex = get_immediate_executor();
asio::dispatch(
ex,
asio::prepend(std::move(_handler), std::forward<Args>(args)...)
);
}
Expand Down
31 changes: 20 additions & 11 deletions include/async_mqtt5/impl/disconnect_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#ifndef ASYNC_MQTT5_DISCONNECT_OP_HPP
#define ASYNC_MQTT5_DISCONNECT_OP_HPP

#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/associated_cancellation_slot.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/prepend.hpp>
Expand Down Expand Up @@ -65,20 +68,23 @@ class disconnect_op {
disconnect_op(disconnect_op&&) = default;
disconnect_op(const disconnect_op&) = delete;

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}
disconnect_op& operator=(disconnect_op&&) noexcept = default;
disconnect_op& operator=(const disconnect_op&) = delete;

using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept {
return asio::get_associated_allocator(_handler);
}

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}

void perform() {
error_code ec = validate_disconnect(_context.props);
if (ec)
return complete_post(ec);
return complete_immediate(ec);

auto disconnect = control_packet<allocator_type>::of(
no_pid, get_allocator(),
Expand Down Expand Up @@ -162,8 +168,8 @@ class disconnect_op {
_handler.complete(ec);
}

void complete_post(error_code ec) {
_handler.complete_post(ec);
void complete_immediate(error_code ec) {
_handler.complete_immediate(ec);
}
};

Expand Down Expand Up @@ -194,10 +200,8 @@ class terminal_disconnect_op {
terminal_disconnect_op(terminal_disconnect_op&&) = default;
terminal_disconnect_op(const terminal_disconnect_op&) = delete;

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}
terminal_disconnect_op& operator=(terminal_disconnect_op&&) noexcept = default;
terminal_disconnect_op& operator=(const terminal_disconnect_op&) = delete;

using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept {
Expand All @@ -209,6 +213,11 @@ class terminal_disconnect_op {
return asio::get_associated_cancellation_slot(_handler);
}

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}

template <typename DisconnectContext>
void perform(DisconnectContext&& context) {
namespace asioex = boost::asio::experimental;
Expand Down
29 changes: 17 additions & 12 deletions include/async_mqtt5/impl/publish_send_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#ifndef ASYNC_MQTT5_PUBLISH_SEND_OP_HPP
#define ASYNC_MQTT5_PUBLISH_SEND_OP_HPP

#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/prepend.hpp>

Expand Down Expand Up @@ -86,19 +88,22 @@ class publish_send_op {
});
}

publish_send_op(publish_send_op&& other) = default;
publish_send_op(publish_send_op&&) = default;
publish_send_op(const publish_send_op&) = delete;

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}
publish_send_op& operator=(publish_send_op&&) noexcept = default;
publish_send_op& operator=(const publish_send_op&) = delete;

using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept {
return asio::get_associated_allocator(_handler);
}

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}

void perform(
std::string topic, std::string payload,
retain_e retain, const publish_props& props
Expand All @@ -107,12 +112,12 @@ class publish_send_op {
if constexpr (qos_type != qos_e::at_most_once) {
packet_id = _svc_ptr->allocate_pid();
if (packet_id == 0)
return complete_post(client::error::pid_overrun, packet_id);
return complete_immediate(client::error::pid_overrun, packet_id);
}

auto ec = validate_publish(topic, payload, retain, props);
if (ec)
return complete_post(ec, packet_id);
return complete_immediate(ec, packet_id);

_serial_num = _svc_ptr->next_serial_num();

Expand All @@ -126,7 +131,7 @@ class publish_send_op {
auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
.value_or(default_max_send_size);
if (publish.size() > max_packet_size)
return complete_post(client::error::packet_too_large, packet_id);
return complete_immediate(client::error::packet_too_large, packet_id);

send_publish(std::move(publish));
}
Expand Down Expand Up @@ -429,8 +434,8 @@ class publish_send_op {
qos_e q = qos_type,
std::enable_if_t<q == qos_e::at_most_once, bool> = true
>
void complete_post(error_code ec, uint16_t) {
_handler.complete_post(ec);
void complete_immediate(error_code ec, uint16_t) {
_handler.complete_immediate(ec);
}

template <
Expand All @@ -457,10 +462,10 @@ class publish_send_op {
bool
> = true
>
void complete_post(error_code ec, uint16_t packet_id) {
void complete_immediate(error_code ec, uint16_t packet_id) {
if (packet_id != 0)
_svc_ptr->free_pid(packet_id, false);
_handler.complete_post(ec, reason_codes::empty, Props {});
_handler.complete_immediate(ec, reason_codes::empty, Props {});
}
};

Expand Down
25 changes: 15 additions & 10 deletions include/async_mqtt5/impl/subscribe_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <algorithm>
#include <cstdint>

#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/detached.hpp>

#include <async_mqtt5/error.hpp>
Expand Down Expand Up @@ -66,16 +68,19 @@ class subscribe_op {
subscribe_op(subscribe_op&&) = default;
subscribe_op(const subscribe_op&) = delete;

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}
subscribe_op& operator=(subscribe_op&&) noexcept = default;
subscribe_op& operator=(const subscribe_op&) = delete;

using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept {
return asio::get_associated_allocator(_handler);
}

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}

void perform(
const std::vector<subscribe_topic>& topics,
const subscribe_props& props
Expand All @@ -84,14 +89,14 @@ class subscribe_op {

uint16_t packet_id = _svc_ptr->allocate_pid();
if (packet_id == 0)
return complete_post(client::error::pid_overrun, packet_id);
return complete_immediate(client::error::pid_overrun, packet_id);

if (_num_topics == 0)
return complete_post(client::error::invalid_topic, packet_id);
return complete_immediate(client::error::invalid_topic, packet_id);

auto ec = validate_subscribe(topics, props);
if (ec)
return complete_post(ec, packet_id);
return complete_immediate(ec, packet_id);

auto subscribe = control_packet<allocator_type>::of(
with_pid, get_allocator(),
Expand All @@ -102,7 +107,7 @@ class subscribe_op {
auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
.value_or(default_max_send_size);
if (subscribe.size() > max_packet_size)
return complete_post(client::error::packet_too_large, packet_id);
return complete_immediate(client::error::packet_too_large, packet_id);

send_subscribe(std::move(subscribe));
}
Expand Down Expand Up @@ -267,10 +272,10 @@ class subscribe_op {
);
}

void complete_post(error_code ec, uint16_t packet_id) {
void complete_immediate(error_code ec, uint16_t packet_id) {
if (packet_id != 0)
_svc_ptr->free_pid(packet_id);
_handler.complete_post(
_handler.complete_immediate(
ec, std::vector<reason_code>(_num_topics, reason_codes::empty),
suback_props {}
);
Expand Down
24 changes: 14 additions & 10 deletions include/async_mqtt5/impl/unsubscribe_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#define ASYNC_MQTT5_UNSUBSCRIBE_OP_HPP

#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/detached.hpp>

#include <async_mqtt5/error.hpp>
Expand Down Expand Up @@ -61,16 +62,19 @@ class unsubscribe_op {
unsubscribe_op(unsubscribe_op&&) = default;
unsubscribe_op(const unsubscribe_op&) = delete;

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}
unsubscribe_op& operator=(unsubscribe_op&&) noexcept = default;
unsubscribe_op& operator=(const unsubscribe_op&) = delete;

using allocator_type = asio::associated_allocator_t<handler_type>;
allocator_type get_allocator() const noexcept {
return asio::get_associated_allocator(_handler);
}

using executor_type = asio::associated_executor_t<handler_type>;
executor_type get_executor() const noexcept {
return asio::get_associated_executor(_handler);
}

void perform(
const std::vector<std::string>& topics,
const unsubscribe_props& props
Expand All @@ -79,14 +83,14 @@ class unsubscribe_op {

uint16_t packet_id = _svc_ptr->allocate_pid();
if (packet_id == 0)
return complete_post(client::error::pid_overrun, packet_id);
return complete_immediate(client::error::pid_overrun, packet_id);

if (_num_topics == 0)
return complete_post(client::error::invalid_topic, packet_id);
return complete_immediate(client::error::invalid_topic, packet_id);

auto ec = validate_unsubscribe(topics, props);
if (ec)
return complete_post(ec, packet_id);
return complete_immediate(ec, packet_id);

auto unsubscribe = control_packet<allocator_type>::of(
with_pid, get_allocator(),
Expand All @@ -97,7 +101,7 @@ class unsubscribe_op {
auto max_packet_size = _svc_ptr->connack_property(prop::maximum_packet_size)
.value_or(default_max_send_size);
if (unsubscribe.size() > max_packet_size)
return complete_post(client::error::packet_too_large, packet_id);
return complete_immediate(client::error::packet_too_large, packet_id);

send_unsubscribe(std::move(unsubscribe));
}
Expand Down Expand Up @@ -212,10 +216,10 @@ class unsubscribe_op {
);
}

void complete_post(error_code ec, uint16_t packet_id) {
void complete_immediate(error_code ec, uint16_t packet_id) {
if (packet_id != 0)
_svc_ptr->free_pid(packet_id);
_handler.complete_post(
_handler.complete_immediate(
ec, std::vector<reason_code>(_num_topics, reason_codes::empty),
unsuback_props {}
);
Expand Down
Loading

0 comments on commit b55ec67

Please sign in to comment.