diff --git a/src/hubclient.cpp b/src/hubclient.cpp index 5933b7d..a99b73a 100644 --- a/src/hubclient.cpp +++ b/src/hubclient.cpp @@ -1,6 +1,6 @@ #include "hubclient.h" -namespace msghublib { namespace detail { +namespace msghublib::detail { using boost::asio::ip::tcp; auto hubclient::bind(void (hubclient::*handler)(error_code)) { @@ -29,7 +29,7 @@ namespace msghublib { namespace detail { void hubclient::write(const hubmessage& msg) { - post(socket_.get_executor(), [this, msg, self=shared_from_this()]{ + post(socket_.get_executor(), [this, msg, self=shared_from_this()] () mutable { bool write_in_progress = !outmsg_queue_.empty(); outmsg_queue_.push_back(std::move(msg)); if (!write_in_progress) @@ -48,7 +48,7 @@ namespace msghublib { namespace detail { async_read(socket_, inmsg_.payload_area(), bind(&hubclient::handle_read_body)); } - // TODO handle invalid headers (connection reset?) + // TODO(sehe): handle invalid headers (connection reset?) } void hubclient::handle_read_body(error_code error) @@ -59,7 +59,7 @@ namespace msghublib { namespace detail { // Get next async_read(socket_, inmsg_.header_buf(), bind(&hubclient::handle_read_header)); } - // TODO handle IO failure + // TODO(sehe): handle IO failure } void hubclient::handle_write(error_code error) @@ -68,7 +68,8 @@ namespace msghublib { namespace detail { outmsg_queue_.pop_front(); if (!outmsg_queue_.empty()) { - // Write next from queue // TODO remove duplication + // Write next from queue + // TODO(sehe) remove duplication async_write(socket_, outmsg_queue_.front().on_the_wire(), bind(&hubclient::handle_write)); @@ -77,4 +78,4 @@ namespace msghublib { namespace detail { // error TODO handling } -} } +} // namespace msghublib::detail diff --git a/src/hubclient.h b/src/hubclient.h index 12d6346..a226c13 100644 --- a/src/hubclient.h +++ b/src/hubclient.h @@ -27,16 +27,17 @@ class hubclient : public std::enable_shared_from_this void write(const hubmessage& msg); private: using error_code = boost::system::error_code; - void handle_read_header(error_code); - void handle_read_body(error_code); - void handle_write(error_code); + void handle_read_header(error_code /*error*/); + void handle_read_body(error_code /*error*/); + void handle_write(error_code /*error*/); - auto bind(void (hubclient::*)(error_code)); - private: + auto bind(void (hubclient::* /*handler*/)(error_code)); + tcp::socket socket_; ihub& distributor_; hubmessage inmsg_; hubmessage_queue outmsg_queue_; }; -} } +} // namespace detail +} // namespace msghublib diff --git a/src/hubconnection.cpp b/src/hubconnection.cpp index ba55b4d..2a5f6b1 100644 --- a/src/hubconnection.cpp +++ b/src/hubconnection.cpp @@ -1,6 +1,6 @@ #include "hubconnection.h" -namespace msghublib { namespace detail { +namespace msghublib::detail { using boost::asio::ip::tcp; auto hubconnection::bind(void (hubconnection::*handler)(error_code)) { @@ -42,7 +42,7 @@ namespace msghublib { namespace detail { else { #pragma GCC diagnostic ignored "-Wdeprecated" // implicit this-capture - post(socket_.get_executor(), [=, self=shared_from_this()] + post(socket_.get_executor(), [=, self=shared_from_this()] () mutable { do_write(std::move(msg)); }); } return true; @@ -118,7 +118,7 @@ namespace msghublib { namespace detail { { is_closing = true; // atomic - // TODO: Unsubscribe? + // TODO(sehe): Unsubscribe? if (forced || outmsg_queue_.empty()) { if (socket_.is_open()) { @@ -128,4 +128,4 @@ namespace msghublib { namespace detail { } } -} } +} // namespace msghublib::detail diff --git a/src/hubconnection.h b/src/hubconnection.h index 51f0ab1..9c7a969 100644 --- a/src/hubconnection.h +++ b/src/hubconnection.h @@ -29,7 +29,7 @@ class hubconnection : public std::enable_shared_from_this private: using error_code = boost::system::error_code; - auto bind(void (hubconnection::*)(error_code)); + auto bind(void (hubconnection::* /*handler*/)(error_code)); void handle_read_header(error_code error); void handle_read_body(error_code error); @@ -37,7 +37,6 @@ class hubconnection : public std::enable_shared_from_this void handle_write(error_code error); void do_close(bool forced); -private: tcp::socket socket_; ihub& courier_; hubmessage inmsg_; @@ -45,4 +44,5 @@ class hubconnection : public std::enable_shared_from_this boost::atomic_bool is_closing; }; -} } +} // namespace detail +} // namespace msghublib diff --git a/src/hubmessage.cpp b/src/hubmessage.cpp index 0d864bb..0243d79 100644 --- a/src/hubmessage.cpp +++ b/src/hubmessage.cpp @@ -15,7 +15,7 @@ namespace msghublib { headers_.msgaction = action_; headers_.magic = cookie; - auto out = payload_.data(); + auto *out = payload_.data(); out = std::copy_n(topic.data(), topic.size(), out); out = std::copy_n(msg.data(), msg.size(), out); } @@ -41,4 +41,4 @@ namespace msghublib { .substr(headers_.topiclen, headers_.bodylen); } -} +} // namespace msghublib diff --git a/src/hubmessage.h b/src/hubmessage.h index fd7f130..4c97aa5 100644 --- a/src/hubmessage.h +++ b/src/hubmessage.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -12,18 +13,20 @@ namespace msghublib { class hubmessage { public: + // the following affect on-the-wire compatiblity enum action : char { subscribe, unsubscribe, publish }; enum { version = 0x1 }; enum { cookie = 0xF00D ^ (version << 8) }; enum { messagesize = 0x2000 }; + // the following does NOT affect on-the-wire compatiblity + enum { preallocated = 196 }; hubmessage(action a={}, std::string_view topic={}, span msg = {}); - bool verify() const; - - action get_action() const; - std::string_view topic() const; - span body() const; + [[nodiscard]] bool verify() const; + [[nodiscard]] action get_action() const; + [[nodiscard]] std::string_view topic() const; + [[nodiscard]] span body() const; private: #pragma pack(push, 1) @@ -36,7 +39,7 @@ class hubmessage #pragma pack(pop) headers_t headers_; - boost::container::small_vector payload_; + boost::container::small_vector payload_; public: // input buffer views @@ -50,14 +53,14 @@ class hubmessage } // output buffer views - auto on_the_wire() const { - return std::vector { + [[nodiscard]] auto on_the_wire() const { + return std::array { boost::asio::buffer(&headers_, sizeof(headers_)), boost::asio::buffer(payload_.data(), payload_.size()) }; } }; -typedef std::deque hubmessage_queue; +using hubmessage_queue = std::deque; -} +} // namespace msghublib diff --git a/src/ihub.h b/src/ihub.h index a525460..77e2cda 100644 --- a/src/ihub.h +++ b/src/ihub.h @@ -15,4 +15,4 @@ namespace msghublib { virtual void deliver(hubmessage const& msg) = 0; }; } -} +} // namespace msghublib diff --git a/src/msghub.cpp b/src/msghub.cpp index 8596de2..c59bb5e 100644 --- a/src/msghub.cpp +++ b/src/msghub.cpp @@ -1,5 +1,7 @@ #include "msghub.h" +#include + #include "hubclient.h" #include "hubconnection.h" #include "ihub.h" @@ -28,7 +30,7 @@ namespace msghublib { std::multimap> remote_subs_; public: - impl(boost::asio::any_io_executor executor) + explicit impl(boost::asio::any_io_executor const& executor) : executor_(executor) , acceptor_(make_strand(executor)) {} @@ -36,25 +38,20 @@ namespace msghublib { void stop() { { std::shared_ptr rhub; - if (auto p = std::atomic_exchange(&remote_hub_, rhub)) + if (auto p = std::atomic_exchange(&remote_hub_, rhub)) { p->close(false); + } } if (!weak_from_this().expired()) { - post(acceptor_.get_executor(), [this, self = shared_from_this()] { - if (acceptor_.is_open()) - acceptor_.cancel(); - }); - } else { - if (acceptor_.is_open()) - acceptor_.cancel(); - } - - if (0) { - std::lock_guard lk(mutex_); - for (auto& [_, client] : remote_subs_) - if (auto alive = client.lock()) - alive->stop(); + post(acceptor_.get_executor(), + [this, self = shared_from_this()] { + if (acceptor_.is_open()) { + acceptor_.cancel(); + } + }); + } else if (acceptor_.is_open()) { + acceptor_.cancel(); } work_.reset(); @@ -114,13 +111,13 @@ namespace msghublib { return false; } - bool subscribe(const std::string& topic, msghub::onmessage handler) { + bool subscribe(const std::string& topic, const msghub::onmessage& handler) { std::unique_lock lk(mutex_); if (auto [it, ins] = local_subs_.emplace(topic, handler); ins) { lk.unlock(); if (auto p = atomic_load(&remote_hub_)) { return p->write({ hubmessage::action::subscribe, topic }, true); - // TODO: wait feedback from server here? + // TODO(sehe): wait feedback from server here? } } else { // just update the handler @@ -133,7 +130,7 @@ namespace msghublib { private: msghub::onmessage const& lookup_handler(std::string_view topic) const { - static const msghub::onmessage no_handler = [](auto...) {}; + static const msghub::onmessage no_handler = [](auto... /*unused*/) {}; std::lock_guard lk(mutex_); auto it = local_subs_.find(std::string(topic)); @@ -141,7 +138,7 @@ namespace msghublib { } void distribute(std::shared_ptr const& subscriber, - hubmessage const& msg) { + hubmessage const& msg) override { std::string topic(msg.topic()); std::unique_lock lk(mutex_); auto range = remote_subs_.equal_range(topic); @@ -152,8 +149,9 @@ namespace msghublib { if (auto alive = it->second.lock()) { alive->write(msg); ++it; - } else + } else { it = remote_subs_.erase(it); + } } break; @@ -168,10 +166,11 @@ namespace msghublib { case hubmessage::action::unsubscribe: for (auto it = range.first; it != range.second;) { if (auto alive = it->second.lock(); - !alive || alive == subscriber) + !alive || alive == subscriber) { it = remote_subs_.erase(it); - else + } else { ++it; + } } break; @@ -180,7 +179,7 @@ namespace msghublib { } } - void deliver(hubmessage const& msg) { + void deliver(hubmessage const& msg) override { lookup_handler(msg.topic())(msg.topic(), msg.body()); } @@ -221,7 +220,7 @@ namespace msghublib { bool msghub::connect(const std::string& hostip, uint16_t port) { return pimpl->connect(hostip, port); } bool msghub::create(uint16_t port) { return pimpl->create(port); } bool msghub::unsubscribe(const std::string& topic) { return pimpl->unsubscribe(topic); } - bool msghub::subscribe(const std::string& topic, onmessage handler) { return pimpl->subscribe(topic, handler); } + bool msghub::subscribe(const std::string& topic, onmessage handler) { return pimpl->subscribe(topic, std::move(handler)); } bool msghub::publish(std::string_view topic, span message) { return pimpl->publish(topic, message); } } // namespace msghublib diff --git a/test/main.cpp b/test/main.cpp index 78bd280..baf95bc 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -15,9 +15,9 @@ void test_server_oncleintfailure(); void test_toobigmsg(); void test_emptymsg(); -test_suite* init_unit_test_suite(int, char**) +test_suite* init_unit_test_suite(int /*unused*/, char** /*unused*/) { - test_suite *test = BOOST_TEST_SUITE("messagehub test"); + auto *test = BOOST_TEST_SUITE("messagehub test"); test->add(BOOST_TEST_CASE(&test_create)); test->add(BOOST_TEST_CASE(&test_connect)); diff --git a/test/subscribe.cpp b/test/subscribe.cpp index 3cc39b8..e476bc1 100644 --- a/test/subscribe.cpp +++ b/test/subscribe.cpp @@ -28,7 +28,7 @@ namespace { BOOST_TEST(expected == message, boost::test_tools::per_element()); newmessage.notify_one(); } -} +} // namespace void test_subscribe() {