diff --git a/README.md b/README.md index e821b05..efdfe14 100644 --- a/README.md +++ b/README.md @@ -32,15 +32,13 @@ Examples boost::asio::io_context io; // Create hub to listen on 0xbee port mh::msghub hub(io.get_executor()); - if (!hub.create(0xbee)) { - std::cerr << "Couldn't create hub\n"; - } else { - // Subscribe on "any topic" - /* bool ok = */hub.subscribe("any topic", on_message); + hub.create(0xbee); - // Current or any another client - /* bool ok = */hub.publish("any topic", "new message"); - } + // Subscribe on "any topic" + hub.subscribe("any topic", on_message); + + // Current or any another client + hub.publish("any topic", "new message"); io.run(); // keep server active, if created } @@ -54,9 +52,8 @@ Examples { boost::asio::io_context io; mh::msghub hub(io.get_executor()); - if (hub.connect("localhost", 0xbee)) { - /*bool ok = */hub.publish("any topic", "new message"); - } + hub.connect("localhost", 0xbee); + hub.publish("any topic", "new message"); hub.stop(); io.run(); @@ -72,9 +69,8 @@ Examples { boost::asio::thread_pool io(5); // count optional mh::msghub hub(io.get_executor()); - if (hub.connect("localhost", 0xbee)) { - /*bool ok = */hub.publish("any topic", "new message"); - } + hub.connect("localhost", 0xbee); + hub.publish("any topic", "new message"); hub.stop(); io.join(); diff --git a/pub/hub_error.h b/pub/hub_error.h new file mode 100644 index 0000000..0fb0f22 --- /dev/null +++ b/pub/hub_error.h @@ -0,0 +1,45 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace msghublib { + using boost::system::error_code; + using boost::system::error_category; + using boost::system::system_error; + + enum hub_errc { + hub_connection_failed = 1, + hub_creation_failed = 2, + hub_not_connected = 3, + }; +} + +template <> struct boost::system::is_error_code_enum + : std::true_type {}; + +namespace msghublib { + struct msghub_category : error_category { + virtual char const* name() const noexcept override { return "msghub"; } + virtual std::string message(int ev) const override { + switch (ev) { + case hub_errc::hub_connection_failed: return "hub_connection_failed"; + case hub_errc::hub_creation_failed: return "hub_creation_failed"; + case hub_errc::hub_not_connected: return "hub_not_connected"; + } + return "unknown"; + } + }; + + static inline error_category const& msghub_category() { + static constexpr struct msghub_category s_msghub_category_instance; + return s_msghub_category_instance; + }; + + [[maybe_unused]] static inline error_code make_error_code(hub_errc e) { + return {e, msghub_category()}; + } +} + diff --git a/pub/msghub.h b/pub/msghub.h index 00975e5..e072060 100644 --- a/pub/msghub.h +++ b/pub/msghub.h @@ -1,10 +1,13 @@ #pragma once +#include +#include #include #include #include #include #include "span.h" +#include "hub_error.h" namespace msghublib { @@ -17,21 +20,37 @@ namespace msghublib { explicit msghub(boost::asio::any_io_executor); ~msghub(); - bool connect(const std::string& hostip, uint16_t port); - bool create(uint16_t port); + //void connect(const std::string& hostip, uint16_t port); + void connect(const std::string& hostip, uint16_t port, error_code& ec); + void create(uint16_t port, error_code& ec); - bool unsubscribe(const std::string& topic); - bool subscribe(const std::string& topic, onmessage handler); - bool publish(std::string_view topic, span message); + void unsubscribe(const std::string& topic, error_code& ec); + void subscribe(const std::string& topic, onmessage handler, error_code& ec); + void publish(std::string_view topic, span message, error_code& ec); // Treat string literals specially, not including the terminating NUL template - bool publish(std::string_view topic, char const (&literal)[N]) { + void publish(std::string_view topic, char const (&literal)[N], error_code& ec) { static_assert(N>0); - return publish(topic, span{literal, N-1}); + publish(topic, span{literal, N-1}, ec); } void stop(); + + // convenience throwing wrappers + void connect(const std::string& hostip, uint16_t port); + void create(uint16_t port); + void unsubscribe(const std::string& topic); + void subscribe(const std::string& topic, onmessage handler); + void publish(std::string_view topic, span message); + + // Treat string literals specially, not including the terminating NUL + template + void publish(std::string_view topic, char const (&literal)[N]) { + error_code ec; + publish(topic, literal, ec); + if (ec) throw system_error(ec); + } private: class impl; diff --git a/src/hubclient.cpp b/src/hubclient.cpp index a99b73a..38f859f 100644 --- a/src/hubclient.cpp +++ b/src/hubclient.cpp @@ -27,7 +27,7 @@ namespace msghublib::detail { }); } - void hubclient::write(const hubmessage& msg) + void hubclient::send(const hubmessage& msg) { post(socket_.get_executor(), [this, msg, self=shared_from_this()] () mutable { bool write_in_progress = !outmsg_queue_.empty(); diff --git a/src/hubclient.h b/src/hubclient.h index a226c13..3fccde8 100644 --- a/src/hubclient.h +++ b/src/hubclient.h @@ -24,7 +24,7 @@ class hubclient : public std::enable_shared_from_this tcp::socket& socket(); void start(); void stop(); - void write(const hubmessage& msg); + void send(const hubmessage& msg); private: using error_code = boost::system::error_code; void handle_read_header(error_code /*error*/); diff --git a/src/hubconnection.cpp b/src/hubconnection.cpp index 2a5f6b1..0cd53da 100644 --- a/src/hubconnection.cpp +++ b/src/hubconnection.cpp @@ -1,7 +1,8 @@ #include "hubconnection.h" +#include +#include namespace msghublib::detail { - using boost::asio::ip::tcp; auto hubconnection::bind(void (hubconnection::*handler)(error_code)) { #pragma GCC diagnostic ignored "-Wdeprecated" // implicit this-capture @@ -10,92 +11,75 @@ namespace msghublib::detail { }; } - bool hubconnection::init(const std::string& host, uint16_t port) - { - try - { + void hubconnection::init(const std::string& host, uint16_t port, error_code& ec) { + try { + error_code ec; tcp::resolver resolver(socket_.get_executor()); - tcp::resolver::results_type results = resolver.resolve(host, std::to_string(port)); + tcp::resolver::results_type results = + resolver.resolve(host, std::to_string(port), ec); - // Do blocking connect (connection is more important than subscription here) - connect(socket_, results); + // Do blocking connect (connection is more important than + // subscription here) + if (!ec) + connect(socket_, results, ec); // Schedule packet read - async_read(socket_, inmsg_.header_buf(), bind(&hubconnection::handle_read_header)); - } - catch (std::exception&) - { - return false; + if (!ec) + async_read(socket_, inmsg_.header_buf(), + bind(&hubconnection::handle_read_header)); + } catch (system_error const& se) { + ec = se.code(); + } catch (...) { + ec = hub_errc::hub_connection_failed; } - - return true; } - bool hubconnection::write(const hubmessage& msg, bool wait) - { - try - { - if (wait) - { - boost::asio::write(socket_, msg.on_the_wire()); - } - else - { + void hubconnection::async_send(const hubmessage& msg) { #pragma GCC diagnostic ignored "-Wdeprecated" // implicit this-capture - post(socket_.get_executor(), [=, self=shared_from_this()] () mutable - { do_write(std::move(msg)); }); - } - return true; - } - catch (std::exception&) - { - return false; - } + post(socket_.get_executor(), [=, self = shared_from_this()]() mutable { + do_send(std::move(msg)); + }); } - void hubconnection::close(bool forced) - { + void hubconnection::send(const hubmessage& msg, error_code& ec) { + write(socket_, msg.on_the_wire(), ec); + } + + void hubconnection::close(bool forced) { #pragma GCC diagnostic ignored "-Wdeprecated" // implicit this-capture - post(socket_.get_executor(), [=, self=shared_from_this()] - { do_close(forced); }); + post(socket_.get_executor(), + [=, self = shared_from_this()] { do_close(forced); }); } - void hubconnection::handle_read_header(error_code error) - { - if (!error && inmsg_.verify()) - { - async_read(socket_, inmsg_.payload_area(), bind(&hubconnection::handle_read_body)); - } - else - { + void hubconnection::handle_read_header(error_code error) { + if (!error && inmsg_.verify()) { + async_read(socket_, inmsg_.payload_area(), + bind(&hubconnection::handle_read_body)); + } else { do_close(true); } } - void hubconnection::handle_read_body(error_code error) - { - if (!error) - { + void hubconnection::handle_read_body(error_code error) { + if (!error) { courier_.deliver(inmsg_); - async_read(socket_, inmsg_.header_buf(), bind(&hubconnection::handle_read_header)); - } - else - { + async_read(socket_, inmsg_.header_buf(), + bind(&hubconnection::handle_read_header)); + } else { do_close(true); } } - void hubconnection::do_write(hubmessage msg) - { - if (outmsg_queue_.push_back(std::move(msg)); 1 == outmsg_queue_.size()) + void hubconnection::do_send(hubmessage msg) { + if (outmsg_queue_.push_back(std::move(msg)); + 1 == outmsg_queue_.size()) { - async_write(socket_, - outmsg_queue_.front().on_the_wire(), - bind(&hubconnection::handle_write)); + async_write(socket_, outmsg_queue_.front().on_the_wire(), + bind(&hubconnection::handle_send)); } } - void hubconnection::handle_write(error_code error) + void hubconnection::handle_send(error_code error) { if (!error) { @@ -103,7 +87,7 @@ namespace msghublib::detail { { async_write(socket_, outmsg_queue_.front().on_the_wire(), - bind(&hubconnection::handle_write)); + bind(&hubconnection::handle_send)); } else if (is_closing) { do_close(false); } @@ -122,7 +106,7 @@ namespace msghublib::detail { if (forced || outmsg_queue_.empty()) { if (socket_.is_open()) { - boost::system::error_code ec; + error_code ec; socket_.close(ec); } } diff --git a/src/hubconnection.h b/src/hubconnection.h index 9c7a969..0801627 100644 --- a/src/hubconnection.h +++ b/src/hubconnection.h @@ -2,7 +2,9 @@ #include "ihub.h" #include "hubmessage.h" +#include "hub_error.h" +#include #include #include #include @@ -23,25 +25,25 @@ class hubconnection : public std::enable_shared_from_this , is_closing(false) {} - bool init(const std::string& host, uint16_t port); - bool write(const hubmessage& msg, bool wait = false); + void init(const std::string& host, uint16_t port, error_code& ec); + void async_send(const hubmessage& msg); + void send(const hubmessage& msg, error_code& ec); void close(bool forced); private: - using error_code = boost::system::error_code; auto bind(void (hubconnection::* /*handler*/)(error_code)); void handle_read_header(error_code error); void handle_read_body(error_code error); - void do_write(hubmessage msg); - void handle_write(error_code error); + void do_send(hubmessage msg); + void handle_send(error_code error); void do_close(bool forced); tcp::socket socket_; ihub& courier_; hubmessage inmsg_; hubmessage_queue outmsg_queue_; - boost::atomic_bool is_closing; + std::atomic_bool is_closing; }; } // namespace detail diff --git a/src/msghub.cpp b/src/msghub.cpp index c59bb5e..9c8f1f9 100644 --- a/src/msghub.cpp +++ b/src/msghub.cpp @@ -1,5 +1,7 @@ #include "msghub.h" +#include +#include #include #include "hubclient.h" @@ -30,7 +32,7 @@ namespace msghublib { std::multimap> remote_subs_; public: - explicit impl(boost::asio::any_io_executor const& executor) + explicit impl(any_io_executor const& executor) : executor_(executor) , acceptor_(make_strand(executor)) {} @@ -59,73 +61,76 @@ namespace msghublib { ~impl() { stop(); } - bool connect(const std::string& hostip, uint16_t port) { + void connect(const std::string& hostip, uint16_t port, error_code& ec) { + ec = {}; auto p = std::make_shared(executor_, *this); - if (p->init(hostip, port)) { + if (p->init(hostip, port, ec); !ec) { atomic_store(&remote_hub_, p); - return true; } - - return false; } - bool create(uint16_t port) { + void create(uint16_t port, error_code& ec) { + ec = {}; try { - acceptor_.open(tcp::v4()); - acceptor_.set_option(tcp::acceptor::reuse_address(true)); - acceptor_.bind({ {}, port }); - acceptor_.listen(); + acceptor_.open(tcp::v4(), ec); + if (!ec) acceptor_.set_option(tcp::acceptor::reuse_address(true), ec); + if (!ec) acceptor_.bind({ {}, port }, ec); + if (!ec) acceptor_.listen(acceptor_.max_listen_connections, ec); - accept_next(); + if (!ec) { + accept_next(); - auto p = std::make_shared(executor_, *this); + auto p = std::make_shared(executor_, *this); - if (p->init("localhost", port)) { - atomic_store(&remote_hub_, p); - return true; + if (p->init("localhost", port, ec); !ec) { + atomic_store(&remote_hub_, p); + } } - } catch (boost::system::system_error const&) { + } catch (system_error const& se) { + ec = se.code(); } - return false; } - bool publish(std::string_view topic, span message) { + void publish(std::string_view topic, span message, error_code& ec) { + ec = {}; if (auto p = atomic_load(&remote_hub_)) { - return p->write({ hubmessage::action::publish, topic, message }); + p->async_send({ hubmessage::action::publish, topic, message }); + } else { + ec = hub_errc::hub_not_connected; } - return false; } - bool unsubscribe(const std::string& topic) { + void unsubscribe(const std::string& topic, error_code& ec) { + ec = {}; std::unique_lock lk(mutex_); if (auto it = local_subs_.find(topic); it != local_subs_.end()) { /*it =*/local_subs_.erase(it); lk.unlock(); if (auto p = atomic_load(&remote_hub_)) { - return p->write({ hubmessage::action::unsubscribe, topic }, - true); + p->send({ hubmessage::action::unsubscribe, topic }, ec); + } else { + ec = hub_errc::hub_not_connected; } } - return false; } - bool subscribe(const std::string& topic, const msghub::onmessage& handler) { + void subscribe(const std::string& topic, const msghub::onmessage& handler, error_code& ec) { + ec = {}; std::unique_lock lk(mutex_); if (auto [it, ins] = local_subs_.emplace(topic, handler); ins) { lk.unlock(); if (auto p = atomic_load(&remote_hub_)) { - return p->write({ hubmessage::action::subscribe, topic }, true); + p->send({ hubmessage::action::subscribe, topic }, ec); // TODO(sehe): wait feedback from server here? + } else { + ec = hub_errc::hub_not_connected; } } else { // just update the handler it->second = handler; // overwrite - return true; } - - return false; } private: @@ -137,8 +142,7 @@ namespace msghublib { return (it == local_subs_.end()) ? no_handler : it->second; } - void distribute(std::shared_ptr const& subscriber, - hubmessage const& msg) override { + void distribute(std::shared_ptr const& subscriber, hubmessage const& msg) override { std::string topic(msg.topic()); std::unique_lock lk(mutex_); auto range = remote_subs_.equal_range(topic); @@ -147,7 +151,7 @@ namespace msghublib { case hubmessage::action::publish: for (auto it = range.first; it != range.second;) { if (auto alive = it->second.lock()) { - alive->write(msg); + alive->send(msg); ++it; } else { it = remote_subs_.erase(it); @@ -157,8 +161,7 @@ namespace msghublib { case hubmessage::action::subscribe: #if __cpp_lib_erase_if // allows us to write that in one go: - std::erase_if(remote_subs_, - [](auto& p) { return p.second.expired(); }); + std::erase_if(remote_subs_, [](auto& p) { return p.second.expired(); }); #endif remote_subs_.emplace(topic, subscriber); break; @@ -190,19 +193,17 @@ namespace msghublib { // Schedule next accept acceptor_.async_accept( subscriber->socket(), - [=, this, self = shared_from_this()](boost::system::error_code ec) { + [=, this, self = shared_from_this()](error_code ec) { handle_accept(subscriber, ec); }); } - void handle_accept(std::shared_ptr const& client, - const boost::system::error_code& error) { + void handle_accept(std::shared_ptr const& client, error_code error) { if (!error) { client->start(); accept_next(); } else { //// TODO: Handle IO error - on thread exit - // int e = error.value(); } } }; @@ -216,11 +217,49 @@ namespace msghublib { msghub::~msghub() = default; - void msghub::stop() { return pimpl->stop(); } - bool msghub::connect(const std::string& hostip, uint16_t port) { return pimpl->connect(hostip, port); } - bool msghub::create(uint16_t port) { return pimpl->create(port); } - bool msghub::unsubscribe(const std::string& topic) { return pimpl->unsubscribe(topic); } - bool msghub::subscribe(const std::string& topic, onmessage handler) { return pimpl->subscribe(topic, std::move(handler)); } - bool msghub::publish(std::string_view topic, span message) { return pimpl->publish(topic, message); } + // pimpl relays + void msghub::stop() + { return pimpl->stop(); } + void msghub::connect(const std::string& hostip, uint16_t port, error_code& ec) + { pimpl->connect(hostip, port, ec); } + void msghub::create(uint16_t port, error_code& ec) + { pimpl->create(port, ec); } + void msghub::unsubscribe(const std::string& topic, error_code& ec) + { pimpl->unsubscribe(topic, ec); } + void msghub::subscribe(const std::string& topic, onmessage handler, error_code& ec) + { pimpl->subscribe(topic, std::move(handler), ec); } + void msghub::publish(std::string_view topic, span message, error_code& ec) + { pimpl->publish(topic, message, ec); } + + // convenience throwing wrappers + void msghub::connect(const std::string& hostip, uint16_t port) { + error_code ec; + connect(hostip, port, ec); + if (ec) throw system_error(ec); + } + + void msghub::create(uint16_t port) { + error_code ec; + create(port, ec); + if (ec) throw system_error(ec); + } + + void msghub::unsubscribe(const std::string& topic) { + error_code ec; + unsubscribe(topic, ec); + if (ec) throw system_error(ec); + } + + void msghub::subscribe(const std::string& topic, onmessage handler) { + error_code ec; + subscribe(topic, handler, ec); + if (ec) throw system_error(ec); + } + + void msghub::publish(std::string_view topic, span message) { + error_code ec; + publish(topic, message, ec); + if (ec) throw system_error(ec); + } } // namespace msghublib diff --git a/test/connect.cpp b/test/connect.cpp index e4370d6..8be88cd 100644 --- a/test/connect.cpp +++ b/test/connect.cpp @@ -11,9 +11,9 @@ void test_connect() { boost::asio::io_context io; - mh::msghub msghub1(io.get_executor()); - BOOST_CHECK(msghub1.create(0xBEE)); + mh::msghub hub(io.get_executor()); - BOOST_CHECK(msghub1.connect("localhost", 0xBEE)); + BOOST_CHECK_NO_THROW(hub.create(0xBEE)); + BOOST_CHECK_NO_THROW(hub.connect("localhost", 0xBEE)); io.stop(); } diff --git a/test/create.cpp b/test/create.cpp index f510a25..c7b4ee2 100644 --- a/test/create.cpp +++ b/test/create.cpp @@ -1,8 +1,5 @@ #include "msghub.h" -#include -#include - #include #include @@ -11,14 +8,26 @@ void test_create() boost::asio::io_context io; msghublib::msghub msghub1(io.get_executor()); - BOOST_CHECK(msghub1.create(0xBEE)); - - msghublib::msghub msghub2(io.get_executor()); + BOOST_CHECK_NO_THROW(msghub1.create(0xBEE)); // Fail as port is in use by previous instance (-SO_REUSEPORT, issue on Windows) - //BOOST_CHECK(!msghub2.create(0xBEE)); + using SE = msghublib::system_error; + auto holds = [](auto code) { + return [=](SE const& se) { + return se.code() == code; + }; + }; + + { + msghublib::msghub msghub2(io.get_executor()); + BOOST_CHECK_EXCEPTION(msghub2.create(0xBEE), SE, holds(boost::asio::error::address_in_use)); + BOOST_CHECK_EXCEPTION(msghub2.create(0xB0B), SE, holds(boost::asio::error::already_open)); + } - BOOST_CHECK(msghub2.create(0xB0B)); + { + msghublib::msghub msghub3(io.get_executor()); + BOOST_CHECK_NO_THROW(msghub3.create(0xB0B)); + } //io.run(); //io_service2.run(); io.stop(); diff --git a/test/server_oncleintfailure.cpp b/test/server_onclientfailure.cpp similarity index 100% rename from test/server_oncleintfailure.cpp rename to test/server_onclientfailure.cpp diff --git a/test/subscribe.cpp b/test/subscribe.cpp index e476bc1..785415e 100644 --- a/test/subscribe.cpp +++ b/test/subscribe.cpp @@ -1,5 +1,7 @@ #include "msghub.h" +#include +#include #include #include @@ -35,10 +37,9 @@ void test_subscribe() boost::asio::thread_pool io(1); msghublib::msghub hub(io.get_executor()); - BOOST_CHECK(hub.create(0xBEE)); - - BOOST_CHECK(hub.subscribe("test_topic", test_create_on_message)); - BOOST_CHECK(hub.publish("test_topic", "$testmessage$")); + BOOST_CHECK_NO_THROW(hub.create(0xBEE)); + BOOST_CHECK_NO_THROW(hub.subscribe("test_topic", test_create_on_message)); + BOOST_CHECK_NO_THROW(hub.publish("test_topic", "$testmessage$")); { std::unique_lock lock(mx);