diff --git a/libcaf_net/CMakeLists.txt b/libcaf_net/CMakeLists.txt index cd8d8994..ded818c4 100644 --- a/libcaf_net/CMakeLists.txt +++ b/libcaf_net/CMakeLists.txt @@ -5,28 +5,28 @@ file(GLOB_RECURSE CAF_NET_HEADERS "caf/*.hpp") # -- add consistency checks for enum to_string implementations ----------------- caf_incubator_add_enum_consistency_check("caf/net/basp/ec.hpp" - "src/basp/ec_strings.cpp") + "src/basp/ec_strings.cpp") caf_incubator_add_enum_consistency_check("caf/net/basp/message_type.hpp" - "src/basp/message_type_strings.cpp") + "src/basp/message_type_strings.cpp") caf_incubator_add_enum_consistency_check("caf/net/operation.hpp" - "src/basp/operation_strings.cpp") + "src/basp/operation_strings.cpp") # -- utility function for setting default properties --------------------------- function(caf_net_set_default_properties) - foreach (target ${ARGN}) - caf_incubator_set_default_properties(${target}) - # Make sure we find our headers plus the the generated export header. - target_include_directories(${target} PRIVATE - "${CMAKE_CURRENT_SOURCE_DIR}" - "${CMAKE_BINARY_DIR}") - target_compile_definitions(${target} PRIVATE libcaf_net_EXPORTS) - # Pull in public dependencies. - target_link_libraries(${target} PUBLIC CAF::core) - if (MSVC) - target_link_libraries(${target} PUBLIC ws2_32 iphlpapi) - endif () - endforeach () + foreach(target ${ARGN}) + caf_incubator_set_default_properties(${target}) + # Make sure we find our headers plus the the generated export header. + target_include_directories(${target} PRIVATE + "${CMAKE_CURRENT_SOURCE_DIR}" + "${CMAKE_BINARY_DIR}") + target_compile_definitions(${target} PRIVATE libcaf_net_EXPORTS) + # Pull in public dependencies. + target_link_libraries(${target} PUBLIC CAF::core) + if(MSVC) + target_link_libraries(${target} PUBLIC ws2_32 iphlpapi) + endif() + endforeach() endfunction() # -- add library targets ------------------------------------------------------- @@ -66,27 +66,27 @@ add_library(libcaf_net_obj OBJECT ${CAF_NET_HEADERS} ) add_library(libcaf_net "${PROJECT_SOURCE_DIR}/cmake/dummy.cpp" - $) + $) generate_export_header(libcaf_net - EXPORT_MACRO_NAME CAF_NET_EXPORT - EXPORT_FILE_NAME "${CMAKE_BINARY_DIR}/caf/detail/net_export.hpp") + EXPORT_MACRO_NAME CAF_NET_EXPORT + EXPORT_FILE_NAME "${CMAKE_BINARY_DIR}/caf/detail/net_export.hpp") set_property(TARGET libcaf_net_obj PROPERTY POSITION_INDEPENDENT_CODE ON) caf_net_set_default_properties(libcaf_net_obj libcaf_net) target_include_directories(libcaf_net INTERFACE - $ - $) + $ + $) add_library(CAF::net ALIAS libcaf_net) set_target_properties(libcaf_net PROPERTIES - EXPORT_NAME net - SOVERSION ${CAF_VERSION} - VERSION ${CAF_LIB_VERSION} - OUTPUT_NAME caf_net) + EXPORT_NAME net + SOVERSION ${CAF_VERSION} + VERSION ${CAF_LIB_VERSION} + OUTPUT_NAME caf_net) # -- install library and header files ------------------------------------------ @@ -106,13 +106,13 @@ install(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/caf" # -- build unit tests ---------------------------------------------------------- -if (NOT CAF_INC_ENABLE_TESTING) - return() -endif () +if(NOT CAF_INC_ENABLE_TESTING) + return() +endif() add_executable(caf-net-test - test/net-test.cpp - $) + test/net-test.cpp + $) caf_net_set_default_properties(caf-net-test) diff --git a/libcaf_net/caf/net/actor_proxy_impl.hpp b/libcaf_net/caf/net/actor_proxy_impl.hpp index 6b9f3135..75e2d346 100644 --- a/libcaf_net/caf/net/actor_proxy_impl.hpp +++ b/libcaf_net/caf/net/actor_proxy_impl.hpp @@ -20,7 +20,6 @@ #include "caf/actor_proxy.hpp" #include "caf/net/consumer.hpp" -#include "caf/net/socket_manager.hpp" namespace caf::net { diff --git a/libcaf_net/caf/net/basp/application.hpp b/libcaf_net/caf/net/basp/application.hpp index 6de970f3..c7727929 100644 --- a/libcaf_net/caf/net/basp/application.hpp +++ b/libcaf_net/caf/net/basp/application.hpp @@ -19,7 +19,6 @@ #pragma once #include -#include #include #include #include @@ -28,7 +27,6 @@ #include "caf/actor.hpp" #include "caf/actor_addr.hpp" -#include "caf/actor_clock.hpp" #include "caf/actor_system.hpp" #include "caf/actor_system_config.hpp" #include "caf/binary_deserializer.hpp" @@ -40,9 +38,6 @@ #include "caf/detail/worker_hub.hpp" #include "caf/error.hpp" #include "caf/fwd.hpp" -#include "caf/intrusive/drr_queue.hpp" -#include "caf/intrusive/fifo_inbox.hpp" -#include "caf/intrusive/singly_linked.hpp" #include "caf/mailbox_element.hpp" #include "caf/net/actor_proxy_impl.hpp" #include "caf/net/basp/constants.hpp" @@ -53,18 +48,15 @@ #include "caf/net/basp/worker.hpp" #include "caf/net/consumer.hpp" #include "caf/net/consumer_queue.hpp" -#include "caf/net/multiplexer.hpp" #include "caf/net/receive_policy.hpp" #include "caf/net/socket_manager.hpp" #include "caf/node_id.hpp" -#include "caf/policy/normal_messages.hpp" #include "caf/proxy_registry.hpp" #include "caf/response_promise.hpp" #include "caf/scoped_execution_unit.hpp" #include "caf/send.hpp" #include "caf/tag/message_oriented.hpp" #include "caf/unit.hpp" -#include "caf/variant.hpp" namespace caf::net::basp { @@ -96,34 +88,35 @@ class CAF_NET_EXPORT application : public consumer { error init(socket_manager* owner, LowerLayerPtr down, const settings& cfg) { // Initialize member variables. owner_ = owner; - system_ = &owner->mpx().system(); + system_ = &owner->system(); executor_.system_ptr(system_); executor_.proxy_registry_ptr(&proxies_); + max_throughput_ = get_or(cfg, "caf.scheduler.max-throughput", + defaults::scheduler::max_throughput); auto workers = get_or( cfg, "caf.middleman.workers", std::min(3u, std::thread::hardware_concurrency() / 4u) + 1); - max_throughput_ = get_or(system().config(), "caf.scheduler.max-throughput", - defaults::scheduler::max_throughput); for (size_t i = 0; i < workers; ++i) hub_->add_new_worker(*queue_, proxies_); // Write handshake. - return write_message( - down, header{message_type::handshake, version}, system().node(), - get_or(system().config(), "caf.middleman.app-identifiers", - application::default_app_ids())); + auto app_ids = get_or(cfg, "caf.middleman.app-identifiers", + application::default_app_ids()); + return write_message(down, header{message_type::handshake, version}, + system().node(), app_ids); } template bool prepare_send(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); if (!handshake_complete()) return true; if (auto err = dequeue_events(down)) { - CAF_LOG_ERROR("handle_events failed: " << CAF_ARG(err)); + CAF_LOG_ERROR("dequeue_events failed: " << CAF_ARG(err)); down->abort_reason(err); return false; } if (auto err = dequeue_messages(down)) { - CAF_LOG_ERROR("handle_messages failed: " << CAF_ARG(err)); + CAF_LOG_ERROR("dequeue_messages failed: " << CAF_ARG(err)); down->abort_reason(err); return false; } @@ -132,6 +125,7 @@ class CAF_NET_EXPORT application : public consumer { template ptrdiff_t consume(LowerLayerPtr& down, byte_span buffer) { + CAF_LOG_TRACE(CAF_ARG2("buffer.size", buffer.size())); if (auto err = handle(down, buffer)) { CAF_LOG_ERROR("could not handle message: " << CAF_ARG(err)); down->abort_reason(err); @@ -142,6 +136,7 @@ class CAF_NET_EXPORT application : public consumer { template bool done_sending(LowerLayerPtr&) { + CAF_LOG_TRACE(""); if (mailbox_.blocked()) return true; return (mailbox_.empty() && mailbox_.try_block()); @@ -149,6 +144,7 @@ class CAF_NET_EXPORT application : public consumer { template void abort(LowerLayerPtr&, const error&) { + CAF_LOG_TRACE(""); // nop } @@ -163,15 +159,12 @@ class CAF_NET_EXPORT application : public consumer { /// Writes a message to the message buffer of `down`. template error write_message(LowerLayerPtr& down, header hdr, Ts&&... xs) { + CAF_LOG_TRACE(CAF_ARG(hdr)); down->begin_message(); auto& buf = down->message_buffer(); binary_serializer sink{&executor_, buf}; - if (!sink.apply_object(hdr)) + if (!sink.apply_objects(hdr, xs...)) return sink.get_error(); - if constexpr (sizeof...(xs) >= 1) { - if (!sink.apply_objects(xs...)) - return sink.get_error(); - } down->end_message(); return none; } @@ -212,6 +205,7 @@ class CAF_NET_EXPORT application : public consumer { template error dequeue_events(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); if (!mailbox_.blocked()) { mailbox_.fetch_more(); auto& q = std::get<0>(mailbox_.queue().queues()); @@ -220,14 +214,14 @@ class CAF_NET_EXPORT application : public consumer { for (auto ptr = q.next(); ptr != nullptr; ptr = q.next()) { auto f = detail::make_overload( [&](consumer_queue::event::resolve_request& x) { - write_resolve_request(down, x.locator, x.listener); + write_resolve_request(down, x.path, x.listener); }, [&](consumer_queue::event::new_proxy& x) { new_proxy(down, x.id); }, [&](consumer_queue::event::local_actor_down& x) { local_actor_down(down, x.id, std::move(x.reason)); }, [&](consumer_queue::event::timeout& x) { - timeout(down, x.type, x.id); + timeout(down, std::move(x.type), x.id); }); visit(f, ptr->value); } @@ -250,27 +244,31 @@ class CAF_NET_EXPORT application : public consumer { } template - void new_proxy(LowerLayerPtr& down, actor_id id) { + void new_proxy(LowerLayerPtr& down, actor_id aid) { + CAF_LOG_TRACE(CAF_ARG(aid)); if (auto err = write_message(down, header{message_type::monitor_message, - static_cast(id)})) + static_cast(aid)})) down->abort_reason(err); } template - void local_actor_down(LowerLayerPtr& down, actor_id id, error reason) { + void local_actor_down(LowerLayerPtr& down, actor_id aid, error reason) { + CAF_LOG_TRACE(CAF_ARG(aid) << CAF_ARG(reason)); if (auto err = write_message( - down, header{message_type::down_message, static_cast(id)}, + down, header{message_type::down_message, static_cast(aid)}, reason)) down->abort_reason(err); } template void timeout(LowerLayerPtr& down, std::string type, uint64_t id) { + CAF_LOG_TRACE(CAF_ARG(type) << CAF_ARG(id)); down->timeout(std::move(type), id); } template error dequeue_messages(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); for (size_t count = 0; count < max_throughput_; ++count) { auto ptr = next_message(); if (ptr == nullptr) @@ -376,6 +374,7 @@ class CAF_NET_EXPORT application : public consumer { template error handle_actor_message(LowerLayerPtr&, header hdr, byte_span payload) { + CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); auto worker = hub_->pop(); if (worker != nullptr) { CAF_LOG_DEBUG("launch BASP worker for deserializing an actor_message"); diff --git a/libcaf_net/caf/net/consumer_queue.hpp b/libcaf_net/caf/net/consumer_queue.hpp index 2ffec320..9a11c7b6 100644 --- a/libcaf_net/caf/net/consumer_queue.hpp +++ b/libcaf_net/caf/net/consumer_queue.hpp @@ -61,7 +61,7 @@ class CAF_NET_EXPORT consumer_queue { class event final : public element { public: struct resolve_request { - std::string locator; + std::string path; actor listener; }; @@ -79,7 +79,7 @@ class CAF_NET_EXPORT consumer_queue { uint64_t id; }; - event(std::string locator, actor listener); + event(std::string path, actor listener); event(actor_id proxy_id); diff --git a/libcaf_net/caf/net/endpoint_manager_impl.hpp b/libcaf_net/caf/net/endpoint_manager_impl.hpp index f2582ec9..24be8775 100644 --- a/libcaf_net/caf/net/endpoint_manager_impl.hpp +++ b/libcaf_net/caf/net/endpoint_manager_impl.hpp @@ -91,7 +91,7 @@ class endpoint_manager_impl : public endpoint_manager { for (auto ptr = q.next(); ptr != nullptr; ptr = q.next()) { auto f = detail::make_overload( [&](consumer_queue::event::resolve_request& x) { - transport_.resolve(*this, x.locator, x.listener); + transport_.resolve(*this, x.path, x.listener); }, [&](consumer_queue::event::new_proxy& x) { transport_.new_proxy(*this, x.id); diff --git a/libcaf_net/caf/net/length_prefix_framing.hpp b/libcaf_net/caf/net/length_prefix_framing.hpp index 3ad830c2..cc046ba8 100644 --- a/libcaf_net/caf/net/length_prefix_framing.hpp +++ b/libcaf_net/caf/net/length_prefix_framing.hpp @@ -59,23 +59,18 @@ class length_prefix_framing { // -- constructors, destructors, and assignment operators -------------------- template - length_prefix_framing(Ts&&... xs) - : upper_layer_(std::forward(xs)...), - message_offset_(0), - awaiting_header_(true), - msg_size_(0) { + length_prefix_framing(Ts&&... xs) : upper_layer_(std::forward(xs)...) { // nop } - virtual ~length_prefix_framing() { - // nop - } + ~length_prefix_framing() = default; // -- initialization --------------------------------------------------------- template error init(socket_manager* owner, LowerLayerPtr& down, const settings& config) { + CAF_LOG_TRACE(""); down->configure_read(receive_policy::exactly(header_length)); auto this_layer_ptr = make_message_oriented_layer_ptr(this, down); return upper_layer_.init(owner, this_layer_ptr, config); @@ -85,6 +80,7 @@ class length_prefix_framing { template void begin_message(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); down->begin_output(); auto& buf = down->output_buffer(); message_offset_ = buf.size(); @@ -98,6 +94,7 @@ class length_prefix_framing { template void end_message(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); using detail::to_network_order; auto& buf = down->output_buffer(); auto msg_begin = buf.begin() + message_offset_; @@ -107,9 +104,11 @@ class length_prefix_framing { memcpy(std::addressof(*msg_begin), &u32_size, 4); down->end_output(); } else { - down->abort_reason(make_error( - sec::runtime_error, msg_size == 0 ? "logic error: message of size 0" - : "maximum message size exceeded")); + auto err = make_error(sec::runtime_error, + msg_size == 0 ? "logic error: message of size 0" + : "maximum message size exceeded"); + CAF_LOG_ERROR(err); + down->abort_reason(err); } } @@ -165,10 +164,10 @@ class length_prefix_framing { template ptrdiff_t consume(LowerLayerPtr& down, byte_span buffer, byte_span) { + CAF_LOG_TRACE(CAF_ARG2("buffer.size", buffer.size())); if (awaiting_header_) { using detail::from_network_order; - if (buffer.size() < header_length) - return 0; + CAF_ASSERT(buffer.size() == header_length); uint32_t u32_size = 0; memcpy(&u32_size, buffer.data(), header_length); msg_size_ = static_cast(from_network_order(u32_size)); @@ -176,10 +175,9 @@ class length_prefix_framing { awaiting_header_ = false; return header_length; } else { - if (buffer.size() < msg_size_) - return 0; + CAF_ASSERT(buffer.size() == msg_size_); auto this_layer_ptr = make_message_oriented_layer_ptr(this, down); - upper_layer_.consume(this_layer_ptr, make_span(buffer.data(), msg_size_)); + upper_layer_.consume(this_layer_ptr, buffer); down->configure_read(receive_policy::exactly(header_length)); awaiting_header_ = true; return msg_size_; @@ -187,10 +185,17 @@ class length_prefix_framing { } private: + /// Holds the upper layer. UpperLayer upper_layer_; - size_t message_offset_; - bool awaiting_header_; - size_t msg_size_; + + /// Holds the offset within the message buffer for writing the header. + size_t message_offset_ = 0; + + /// Holds the size of the next message. + size_t msg_size_ = 0; + + /// Signals wether a header or payload is expected with the next `consume`. + bool awaiting_header_ = true; }; } // namespace caf::net diff --git a/libcaf_net/caf/net/socket_manager.hpp b/libcaf_net/caf/net/socket_manager.hpp index ccc299ae..159b0bc5 100644 --- a/libcaf_net/caf/net/socket_manager.hpp +++ b/libcaf_net/caf/net/socket_manager.hpp @@ -25,6 +25,7 @@ #include "caf/make_counted.hpp" #include "caf/net/actor_shell.hpp" #include "caf/net/fwd.hpp" +#include "caf/net/multiplexer.hpp" #include "caf/net/operation.hpp" #include "caf/net/socket.hpp" #include "caf/ref_counted.hpp" @@ -78,6 +79,10 @@ class CAF_NET_EXPORT socket_manager : public ref_counted { return parent_; } + actor_system& system() noexcept { + return mpx().system(); + } + /// Returns registered operations (read, write, or both). operation mask() const noexcept { return mask_; @@ -201,7 +206,6 @@ class socket_manager_impl : public socket_manager { CAF_LOG_ERROR("failed to set nonblocking flag in socket:" << err); return err; } - register_reading(); return protocol_.init(static_cast(this), this, config); } diff --git a/libcaf_net/caf/net/stream_transport.hpp b/libcaf_net/caf/net/stream_transport.hpp index 3163aa8e..862ef613 100644 --- a/libcaf_net/caf/net/stream_transport.hpp +++ b/libcaf_net/caf/net/stream_transport.hpp @@ -26,6 +26,7 @@ #include "caf/logger.hpp" #include "caf/net/fwd.hpp" #include "caf/net/receive_policy.hpp" +#include "caf/net/socket_manager.hpp" #include "caf/net/stream_oriented_layer_ptr.hpp" #include "caf/net/stream_socket.hpp" #include "caf/sec.hpp" @@ -56,9 +57,7 @@ class stream_transport { // nop } - virtual ~stream_transport() { - // nop - } + ~stream_transport() = default; // -- interface for stream_oriented_layer_ptr -------------------------------- @@ -160,11 +159,9 @@ class stream_transport { CAF_LOG_ERROR("send_buffer_size: " << socket_buf_size.error()); return std::move(socket_buf_size.error()); } + owner->register_reading(); auto this_layer_ptr = make_stream_oriented_layer_ptr(this, parent); - if (auto err = upper_layer_.init(owner, this_layer_ptr, config)) - return err; - read_buf_.resize(max_read_size_); - return none; + return upper_layer_.init(owner, this_layer_ptr, config); } // -- event callbacks -------------------------------------------------------- diff --git a/libcaf_net/src/actor_proxy_impl.cpp b/libcaf_net/src/actor_proxy_impl.cpp index fd194daa..a114bca3 100644 --- a/libcaf_net/src/actor_proxy_impl.cpp +++ b/libcaf_net/src/actor_proxy_impl.cpp @@ -21,7 +21,6 @@ #include "caf/actor_system.hpp" #include "caf/expected.hpp" #include "caf/logger.hpp" -#include "caf/net/multiplexer.hpp" namespace caf::net { diff --git a/libcaf_net/src/basp/application.cpp b/libcaf_net/src/basp/application.cpp index 60589653..1ecb8714 100644 --- a/libcaf_net/src/basp/application.cpp +++ b/libcaf_net/src/basp/application.cpp @@ -37,11 +37,13 @@ application::application(proxy_registry& proxies) } void application::resolve(string_view path, const actor& listener) { + CAF_LOG_TRACE(CAF_ARG(path) << CAF_ARG(listener)); enqueue_event(to_string(path), listener); } strong_actor_ptr application::make_proxy(const node_id& nid, const actor_id& aid) { + CAF_LOG_TRACE(CAF_ARG(nid) << CAF_ARG(aid)); using impl_type = actor_proxy_impl; using handle_type = strong_actor_ptr; actor_config cfg; @@ -71,12 +73,14 @@ strong_actor_ptr application::resolve_local_path(string_view path) { } void application::enqueue(mailbox_element_ptr msg, strong_actor_ptr receiver) { + CAF_LOG_TRACE(CAF_ARG(msg) << CAF_ARG(receiver)); using message_type = consumer_queue::message; auto ptr = new message_type(std::move(msg), std::move(receiver)); enqueue(ptr); } bool application::enqueue(consumer_queue::element* ptr) { + CAF_LOG_TRACE(""); switch (mailbox_.push_back(ptr)) { case intrusive::inbox_result::success: return true; diff --git a/libcaf_net/src/net/consumer_queue.cpp b/libcaf_net/src/net/consumer_queue.cpp index 20e2a57a..bd27c9f7 100644 --- a/libcaf_net/src/net/consumer_queue.cpp +++ b/libcaf_net/src/net/consumer_queue.cpp @@ -24,9 +24,9 @@ consumer_queue::element::~element() { // nop } -consumer_queue::event::event(std::string locator, actor listener) +consumer_queue::event::event(std::string path, actor listener) : element(element_type::event), - value(resolve_request{std::move(locator), std::move(listener)}) { + value(resolve_request{std::move(path), std::move(listener)}) { // nop } diff --git a/libcaf_net/test/application.cpp b/libcaf_net/test/application.cpp index 68b3fbc8..7568612f 100644 --- a/libcaf_net/test/application.cpp +++ b/libcaf_net/test/application.cpp @@ -37,16 +37,17 @@ using namespace caf; using namespace caf::net; #define REQUIRE_OK(statement) \ - if (auto err = statement) \ - CAF_FAIL("failed to serialize data: " << err) + do { \ + if (auto err = statement) \ + CAF_FAIL("failed to serialize data: " << err); \ + } while (false) namespace { struct dummy_socket_manager : public socket_manager { dummy_socket_manager(socket handle, multiplexer* mpx) : socket_manager(handle, mpx) { - mask_add(operation::read); - mask_add(operation::write); + // nop } error init(const settings&) override { diff --git a/libcaf_net/test/net/actor_shell.cpp b/libcaf_net/test/net/actor_shell.cpp index accdf7a6..27237efd 100644 --- a/libcaf_net/test/net/actor_shell.cpp +++ b/libcaf_net/test/net/actor_shell.cpp @@ -218,7 +218,7 @@ constexpr std::string_view input = R"__( } // namespace CAF_TEST_FIXTURE_SCOPE(actor_shell_tests, fixture) -/* + CAF_TEST(actor shells expose their mailbox to their owners) { auto sck = testee_socket_guard.release(); auto mgr = net::make_socket_manager(sck, &mpx); @@ -231,7 +231,7 @@ CAF_TEST(actor shells expose their mailbox to their owners) { anon_send(hdl, "line 3"); run_while([&] { return app.lines.size() != 3; }); CAF_CHECK_EQUAL(app.lines, svec({"line 1", "line 2", "line 3"})); -}*/ +} CAF_TEST(actor shells can send requests and receive responses) { auto worker = sys.spawn([] { diff --git a/libcaf_net/test/stream_transport.cpp b/libcaf_net/test/stream_transport.cpp index bb205bc1..eb21e274 100644 --- a/libcaf_net/test/stream_transport.cpp +++ b/libcaf_net/test/stream_transport.cpp @@ -34,9 +34,6 @@ #include "caf/net/stream_socket.hpp" #include "caf/span.hpp" -#include "caf/net/basp/application.hpp" -#include "caf/net/length_prefix_framing.hpp" - using namespace caf; using namespace caf::net; @@ -123,7 +120,7 @@ class dummy_application { // nop } - strong_actor_ptr make_proxy(node_id, actor_id) { + strong_actor_ptr make_proxy(const node_id&, const actor_id&) { return nullptr; }