From c8b7fde8784aa6464403c4edaaa4067da02a70eb Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 23 Dec 2023 13:51:24 +0100 Subject: [PATCH 01/38] Adds the skeleton of a hub test application which will verify the performance of a hub including packet loss. --- applications/hub_test/Makefile | 3 + applications/hub_test/README.md | 36 ++++ applications/hub_test/config.mk | 1 + applications/hub_test/subdirs | 2 + applications/hub_test/targets/Makefile | 4 + .../hub_test/targets/linux.x86/.gitignore | 1 + .../hub_test/targets/linux.x86/Makefile | 2 + .../hub_test/targets/linux.x86/main.cxx | 204 ++++++++++++++++++ src/utils/ClientConnection.hxx | 1 + 9 files changed, 254 insertions(+) create mode 100644 applications/hub_test/Makefile create mode 100644 applications/hub_test/README.md create mode 120000 applications/hub_test/config.mk create mode 100644 applications/hub_test/subdirs create mode 100644 applications/hub_test/targets/Makefile create mode 100644 applications/hub_test/targets/linux.x86/.gitignore create mode 100644 applications/hub_test/targets/linux.x86/Makefile create mode 100644 applications/hub_test/targets/linux.x86/main.cxx diff --git a/applications/hub_test/Makefile b/applications/hub_test/Makefile new file mode 100644 index 000000000..f5cedde67 --- /dev/null +++ b/applications/hub_test/Makefile @@ -0,0 +1,3 @@ +SUBDIRS = targets +-include config.mk +include $(OPENMRNPATH)/etc/recurse.mk diff --git a/applications/hub_test/README.md b/applications/hub_test/README.md new file mode 100644 index 000000000..781d7e1e5 --- /dev/null +++ b/applications/hub_test/README.md @@ -0,0 +1,36 @@ +Hub throughput testing application {#hub_test_application} +================================== + +[TOC] + +# Goal + +The purpose of Hub testing is to verify an OpenLCB link or bus for qualitative +and quantitative performance characteristics. These are as follows. + +Qualitative: +- The link has to be lossless. +- There is only limited reordering allowed within the link. +- There has to be proper throttling for ingress of packets to the link. + +Quantitative: +- What is the maximum throughut of packets that can be accepted on the link. +- What is the round-trip-time of the link. +- What is the jitter on the timing of the packet round-trips. + +For hubs, a relevant question is also how do these metrics change as the number +of clients on the hub are increased. + +## Testing methodology + +The application contains a load generator, and a receiver. For a given test, +one generator and one or more receivers have to be attached to the same bus. + +The generator supplies a given load to the bus. This load may be a fixes +traffic, or the maximum traffic that the bus is willing to accept. + +The receiver watches for the traffic coming from the generator, verifies the +qualitative attributes, and measures the quantitative attributes. Statistics +about this are printed to the screen. + + diff --git a/applications/hub_test/config.mk b/applications/hub_test/config.mk new file mode 120000 index 000000000..e270c0389 --- /dev/null +++ b/applications/hub_test/config.mk @@ -0,0 +1 @@ +../default_config.mk \ No newline at end of file diff --git a/applications/hub_test/subdirs b/applications/hub_test/subdirs new file mode 100644 index 000000000..4e0254829 --- /dev/null +++ b/applications/hub_test/subdirs @@ -0,0 +1,2 @@ +SUBDIRS = \ + diff --git a/applications/hub_test/targets/Makefile b/applications/hub_test/targets/Makefile new file mode 100644 index 000000000..2eb855acc --- /dev/null +++ b/applications/hub_test/targets/Makefile @@ -0,0 +1,4 @@ +SUBDIRS = \ + linux.86 \ + +include $(OPENMRNPATH)/etc/recurse.mk diff --git a/applications/hub_test/targets/linux.x86/.gitignore b/applications/hub_test/targets/linux.x86/.gitignore new file mode 100644 index 000000000..67effd68d --- /dev/null +++ b/applications/hub_test/targets/linux.x86/.gitignore @@ -0,0 +1 @@ +hub_test diff --git a/applications/hub_test/targets/linux.x86/Makefile b/applications/hub_test/targets/linux.x86/Makefile new file mode 100644 index 000000000..c6386dac3 --- /dev/null +++ b/applications/hub_test/targets/linux.x86/Makefile @@ -0,0 +1,2 @@ +-include ../../config.mk +include $(OPENMRNPATH)/etc/prog.mk diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx new file mode 100644 index 000000000..d2e4d63fa --- /dev/null +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -0,0 +1,204 @@ +/** \copyright + * Copyright (c) 2023, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file main.cxx + * + * Main file for the hub tester application. + * + * @author Balazs Racz + * @date 23 Dec 2023 + */ + +#include "nmranet_config.h" +#include "os/os.h" + +#include "utils/ClientConnection.hxx" +#include "utils/StringPrintf.hxx" + +OVERRIDE_CONST(gc_generate_newlines, 1); + +Executor<1> g_executor("g_executor", 0, 1024); +Service g_service(&g_executor); + +struct Link +{ + const char *device_path = nullptr; + int upstream_port = 12021; + const char *upstream_host = nullptr; + std::unique_ptr can_hub {new CanHubFlow(&g_service)}; +}; + +/// We generate packets to this interface. +Link output_port; +/// Traffic to generate, default is to saturate the link. +int pkt_per_sec = -1; + +/// Temporary variable used during args parsing, to hold the -Q value until the +/// next -U comes. +int in_upstream_port = 12021; +/// All the input ports to listen to for incoming traffic. +std::vector input_ports; + +/// All network connections (both input and outputs). +vector> connections; + +/// Used for error printing the usage. +const char* arg0 = "hub_test"; + +void usage(const char *e) +{ + fprintf(stderr, + "Usage: %s (-d device_path | [-q upstream_port] -u upstream_host) [-s " + "speed]\n\t(-D device_path | [-Q upstream_port] -U " + "upstream_host)...\n\n", + e); + fprintf(stderr, + "\tdevice_path is a path to a physical device doing " + "serial-CAN or USB-CAN.\n"); + fprintf(stderr, + "\tupstream_host is the host name for an upstream " + "hub.\n"); + fprintf( + stderr, "\tupstream_port is the port number for the upstream hub.\n"); + fprintf(stderr, + "\t-d -q -u specifies the output port, -D -Q -U specifies input ports. " + "-Q must be before -U. Multiple input ports can be specified.\n"); + fprintf(stderr, + "\t-s speed is the packets/sec to generate. Set to -1 for utomatic " + "(saturation).\n"); + exit(1); +} + +void parse_args(int argc, char *argv[]) +{ + int opt; + while ((opt = getopt(argc, argv, "hd:u:q:s:D:U:Q:")) >= 0) + { + switch (opt) + { + case 'h': + usage(argv[0]); + break; + case 'd': + output_port.device_path = optarg; + break; + case 'u': + output_port.upstream_host = optarg; + break; + case 'q': + output_port.upstream_port = atoi(optarg); + break; + case 'D': + input_ports.emplace_back(); + input_ports.back().device_path = optarg; + break; + case 'U': + input_ports.emplace_back(); + input_ports.back().upstream_host = optarg; + input_ports.back().upstream_port = in_upstream_port; + break; + case 'Q': + in_upstream_port = atoi(optarg); + break; + case 's': + pkt_per_sec = atoi(optarg); + break; + default: + fprintf(stderr, "Unknown option %c\n", opt); + usage(argv[0]); + } + } +} + +/// Establishes a new connection. +void add_link(Link *link) +{ + static int id = 0; + if (link->upstream_host) + { + connections.emplace_back( + new UpstreamConnectionClient(StringPrintf("upstream%d", id), + link->can_hub.get(), link->upstream_host, link->upstream_port)); + } + else + + if (link->device_path) + { + connections.emplace_back( + new DeviceConnectionClient(StringPrintf("device%d", id), + link->can_hub.get(), link->device_path)); + } + else + { + usage(arg0); + } + ++id; +} + +class PacketGenTimer : public ::Timer +{ +public: + PacketGenTimer() + : Timer(g_executor.active_timers()) + { } + + long long timeout() override + { + /// @todo send output + // stack.send_event(0x0501010114DD1234); + return RESTART; + } +} pkt_gen_timer; + +/** Entry point to application. + * @param argc number of command line arguments + * @param argv array of command line arguments + * @return 0, should never return + */ +int appl_main(int argc, char *argv[]) +{ + arg0 = argv[0]; + parse_args(argc, argv); + + g_executor.start_thread("executor_thread", 0, 5000); + + if (pkt_per_sec > 0) + { + long long diff = 1000000000ULL / pkt_per_sec; + pkt_gen_timer.start(diff); + } + + while (1) + { + for (const auto &p : connections) + { + p->ping(); + } + sleep(1); + } + + return 0; +} diff --git a/src/utils/ClientConnection.hxx b/src/utils/ClientConnection.hxx index 35121c2bd..b29ef0928 100644 --- a/src/utils/ClientConnection.hxx +++ b/src/utils/ClientConnection.hxx @@ -38,6 +38,7 @@ #include #include /* tc* functions */ #include +#include #include "utils/GridConnectHub.hxx" #include "utils/socket_listener.hxx" From 8cbfb2a8c753a926aac257d1302a0cad986c8bbd Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 23 Dec 2023 16:58:42 +0100 Subject: [PATCH 02/38] Adds an accessor to the last expiry time of a timer. --- src/executor/Timer.hxx | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/executor/Timer.hxx b/src/executor/Timer.hxx index 61047d2b1..04ab1eeb1 100644 --- a/src/executor/Timer.hxx +++ b/src/executor/Timer.hxx @@ -168,6 +168,15 @@ public: * @returns the new timer period, or one of the above special values. */ virtual long long timeout() = 0; + /// @return the time when this timer is triggered. This may be in the past, + /// if the timer has just woken up and we are in the timeout() function. It + /// may be in the future if this timer is scheduled. It may be zero if this + /// timer was never scheduled. + long long schedule_time() + { + return when_; + } + /** Starts a timer. The timer must not be active, and neither expired at * the time of call. * @param period period in nanoseconds before expiration. If not specified, From 822bad702dbe7ed43a8ad60e7ec77741a5736a39 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 23 Dec 2023 16:58:52 +0100 Subject: [PATCH 03/38] Starts adding the send flow. --- .../hub_test/targets/linux.x86/main.cxx | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx index d2e4d63fa..08f80c191 100644 --- a/applications/hub_test/targets/linux.x86/main.cxx +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -68,6 +68,11 @@ vector> connections; /// Used for error printing the usage. const char* arg0 = "hub_test"; +// Dynamic variables tracking the traffic. + +/// Index of the next packet to generate. +unsigned g_next_packet = 1; + void usage(const char *e) { fprintf(stderr, @@ -158,6 +163,25 @@ void add_link(Link *link) ++id; } +struct SendPacketInfo { + /// Packet number to output (sequence number). + unsigned index; + /// When was this packet generated by the timer. + uint64_t generateTsNsec; +}; + +class SendFlow : public StateFlow, QList<1> > { +public: + SendFlow() + : StateFlow, QList<1>>(&g_service) + { } + + Action entry() override { + + } + +} g_send_flow; + class PacketGenTimer : public ::Timer { public: @@ -167,8 +191,10 @@ class PacketGenTimer : public ::Timer long long timeout() override { - /// @todo send output - // stack.send_event(0x0501010114DD1234); + auto* b = g_send_flow.alloc(); + b->data()->index = g_next_packet++; + b->data()->generateTsNsec = this->schedule_time(); + g_send_flow->send(b); return RESTART; } } pkt_gen_timer; @@ -187,7 +213,7 @@ int appl_main(int argc, char *argv[]) if (pkt_per_sec > 0) { - long long diff = 1000000000ULL / pkt_per_sec; + long long diff = SEC_TO_NSEC(1) / pkt_per_sec; pkt_gen_timer.start(diff); } From 0b088d22ac8c48c86bd710d5b53ed3934f4493a4 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 25 Dec 2023 09:49:21 +0100 Subject: [PATCH 04/38] Adds code for filling in the output frames. Adds a structure for recording timestamps per packet. --- .../hub_test/targets/linux.x86/main.cxx | 89 ++++++++++++++++--- 1 file changed, 79 insertions(+), 10 deletions(-) diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx index 08f80c191..57f178bec 100644 --- a/applications/hub_test/targets/linux.x86/main.cxx +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -37,6 +37,7 @@ #include "utils/ClientConnection.hxx" #include "utils/StringPrintf.hxx" +#include "utils/LimitedPool.hxx" OVERRIDE_CONST(gc_generate_newlines, 1); @@ -56,6 +57,15 @@ Link output_port; /// Traffic to generate, default is to saturate the link. int pkt_per_sec = -1; +/// How many pending buffers we can allow for the send flow. +static constexpr unsigned SEND_PARALLELISM = 10; + +/// The output frames will go wioth this CAN ID. +static constexpr uint32_t SEND_HEADER = 0x195b4ffe; +/// The output frames will go with this CAN data bytes. This is NMRA ID 1, +/// which is not assigned. Lower four bytes are the id. +static constexpr uint8_t SEND_PAYLOAD[8] = {0x9, 0x0, 0x01, 0x39, 0, 0, 0, 0}; + /// Temporary variable used during args parsing, to hold the -Q value until the /// next -U comes. int in_upstream_port = 12021; @@ -66,13 +76,21 @@ std::vector input_ports; vector> connections; /// Used for error printing the usage. -const char* arg0 = "hub_test"; +const char *arg0 = "hub_test"; // Dynamic variables tracking the traffic. /// Index of the next packet to generate. unsigned g_next_packet = 1; +/// How many buffers have been sent and waiting for the notification on them. +unsigned g_pending_buffers = 0; + +/// How many packets we have sent to the output iface. +unsigned g_num_packets_sent = 0; +/// How many sent packets got their barrier notified. +unsigned g_num_packets_accepted = 0; + void usage(const char *e) { fprintf(stderr, @@ -148,9 +166,7 @@ void add_link(Link *link) new UpstreamConnectionClient(StringPrintf("upstream%d", id), link->can_hub.get(), link->upstream_host, link->upstream_port)); } - else - - if (link->device_path) + else if (link->device_path) { connections.emplace_back( new DeviceConnectionClient(StringPrintf("device%d", id), @@ -163,23 +179,76 @@ void add_link(Link *link) ++id; } -struct SendPacketInfo { +struct PacketInfo : private Notifiable { + unsigned index_; + + /// Timestamp when the timer ticked to send this packet. + long long timerTs_{0}; + /// Timestamp when the flow started working on this packet. + long long flowTs_{0}; + /// Timestamp when the buffer was handed over to the output hub. + long long sendTs_{0}; + /// Timestamp when the notifiable on the buffer triggered. + long long confirmTs_{0}; + + /// Send frame notification barrier. + BarrierNotifiable bn_{this}; +private: + void notify() override { + confirmTs_ = os_get_time_monotonic(); + g_pending_buffers--; + g_num_packets_accepted++; + } +}; + +std::map g_packet_data; + +struct SendPacketRequest +{ /// Packet number to output (sequence number). unsigned index; /// When was this packet generated by the timer. uint64_t generateTsNsec; }; -class SendFlow : public StateFlow, QList<1> > { +class SendFlow : public StateFlow, QList<1>> +{ public: SendFlow() - : StateFlow, QList<1>>(&g_service) + : StateFlow, QList<1>>(&g_service) { } - Action entry() override { + Action entry() override + { + pinfo_ = &g_packet_data[message()->data()->index]; + pinfo_->index_ = message()->data()->index; + pinfo_->flowTs_ = os_get_time_monotonic(); + pinfo_->timerTs_ = message()->data()->generateTsNsec; + return allocate_and_call( + output_port.can_hub.get(), STATE(have_buffer), &pool_); + } + Action have_buffer() + { + BufferType *b = get_allocation_result(output_port.can_hub.get()); + b->set_done(&pinfo_->bn_); + auto &f = *b->data()->mutable_frame(); + SET_CAN_FRAME_ID_EFF(f, SEND_HEADER); + f.can_dlc = 8; + memcpy(f.data, SEND_PAYLOAD, 8); + uint32_t idx_be = htobe32(pinfo_->index_); + memcpy(f.data + 4, &idx_be, 4); + b->data()->skipMember_ = nullptr; + output_port.can_hub->send(b); + pinfo_->sendTs_ = os_get_time_monotonic(); + g_num_packets_sent++; + return release_and_exit(); } +private: + using BufferType = CanHubFlow::buffer_type; + LimitedPool pool_{sizeof(BufferType), SEND_PARALLELISM}; + PacketInfo* pinfo_; } g_send_flow; class PacketGenTimer : public ::Timer @@ -191,10 +260,10 @@ class PacketGenTimer : public ::Timer long long timeout() override { - auto* b = g_send_flow.alloc(); + auto *b = g_send_flow.alloc(); b->data()->index = g_next_packet++; b->data()->generateTsNsec = this->schedule_time(); - g_send_flow->send(b); + g_send_flow.send(b); return RESTART; } } pkt_gen_timer; From e0ef33673b10a1bed64d1cd6b9d2770d6517212f Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 25 Dec 2023 10:53:31 +0100 Subject: [PATCH 05/38] Releases packets as the receive notifications are coming in. Adds stats printing timer. --- .../hub_test/targets/linux.x86/main.cxx | 132 ++++++++++++++++-- 1 file changed, 118 insertions(+), 14 deletions(-) diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx index 57f178bec..9675fa122 100644 --- a/applications/hub_test/targets/linux.x86/main.cxx +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -36,8 +36,8 @@ #include "os/os.h" #include "utils/ClientConnection.hxx" -#include "utils/StringPrintf.hxx" #include "utils/LimitedPool.hxx" +#include "utils/StringPrintf.hxx" OVERRIDE_CONST(gc_generate_newlines, 1); @@ -80,6 +80,9 @@ const char *arg0 = "hub_test"; // Dynamic variables tracking the traffic. +/// How many live links are there (upstream + downstream). +int g_num_live_links = 1; + /// Index of the next packet to generate. unsigned g_next_packet = 1; @@ -88,8 +91,11 @@ unsigned g_pending_buffers = 0; /// How many packets we have sent to the output iface. unsigned g_num_packets_sent = 0; -/// How many sent packets got their barrier notified. +/// How many sent packets got their (send) barrier notified. unsigned g_num_packets_accepted = 0; +/// How many sent packets got their (receive) barrier notified and removed from +/// storage. +unsigned g_num_packets_all_received = 0; void usage(const char *e) { @@ -157,7 +163,7 @@ void parse_args(int argc, char *argv[]) } /// Establishes a new connection. -void add_link(Link *link) +void add_link(Link *link, bool is_receiver) { static int id = 0; if (link->upstream_host) @@ -179,30 +185,55 @@ void add_link(Link *link) ++id; } -struct PacketInfo : private Notifiable { +/// Call this function when a packet has all its receivers confirmed. +/// @param index packet number. +void packet_complete(unsigned index); + +struct PacketInfo : private Notifiable +{ unsigned index_; /// Timestamp when the timer ticked to send this packet. - long long timerTs_{0}; + long long timerTs_ {0}; /// Timestamp when the flow started working on this packet. - long long flowTs_{0}; + long long flowTs_ {0}; /// Timestamp when the buffer was handed over to the output hub. - long long sendTs_{0}; + long long sendTs_ {0}; /// Timestamp when the notifiable on the buffer triggered. - long long confirmTs_{0}; - + long long confirmTs_ {0}; + /// Send frame notification barrier. - BarrierNotifiable bn_{this}; + BarrierNotifiable bn_ {this}; + + /// Number of receivers that have not yet gotten this message. + unsigned pendingReceivers_ {0}; + private: - void notify() override { + void notify() override + { confirmTs_ = os_get_time_monotonic(); g_pending_buffers--; g_num_packets_accepted++; + if (pendingReceivers_) + { + --pendingReceivers_; + } + if (!pendingReceivers_) + { + // Confirmed by all receivers. Removes from storage. + g_num_packets_all_received++; + packet_complete(index_); + } } }; std::map g_packet_data; +void packet_complete(unsigned index) +{ + g_packet_data.erase(index); +} + struct SendPacketRequest { /// Packet number to output (sequence number). @@ -211,6 +242,8 @@ struct SendPacketRequest uint64_t generateTsNsec; }; +/// Implements the state machine to acquire a buffer and send an outgoing +/// packet. class SendFlow : public StateFlow, QList<1>> { public: @@ -241,16 +274,19 @@ class SendFlow : public StateFlow, QList<1>> b->data()->skipMember_ = nullptr; output_port.can_hub->send(b); pinfo_->sendTs_ = os_get_time_monotonic(); + pinfo_->pendingReceivers_ = std::max(0, g_num_live_links - 1); g_num_packets_sent++; return release_and_exit(); } private: using BufferType = CanHubFlow::buffer_type; - LimitedPool pool_{sizeof(BufferType), SEND_PARALLELISM}; - PacketInfo* pinfo_; + LimitedPool pool_ {sizeof(BufferType), SEND_PARALLELISM}; + PacketInfo *pinfo_; } g_send_flow; +/// This timer triggers packets to be sent by sending messages to the +/// SendFlow. class PacketGenTimer : public ::Timer { public: @@ -268,6 +304,69 @@ class PacketGenTimer : public ::Timer } } pkt_gen_timer; +/// This timer prints stats every second. It is responsible for generating +/// deltas from absolute counters. +class StatsTimer : public ::Timer +{ +public: + StatsTimer() + : Timer(g_executor.active_timers()) + { + start(SEC_TO_NSEC(1)); + } + + long long timeout() override + { + print_stats(); + return RESTART; + } + + void print_stats() + { + string ret; + ret += "Send: "; + unsigned d = update_diff(g_next_packet, &next_packet); + ret += StringPrintf("+%d gen", d); + d = update_diff(g_num_packets_accepted, &num_packets_accepted); + ret += StringPrintf(" +%d send complete", d); + d = update_diff(g_num_packets_all_received, &num_packets_accepted); + ret += StringPrintf(" +%d recv complete", d); + // @todo add stats about send queues. + // @todo add stats about receivers. + // @todo print using hubdeviceselect instead of blocking output. + printf("%s\n", ret.c_str()); + } + +private: + /// Creates a delta from a counter. + /// @param global_value current value of the global counter. + /// @param local_value a local shadow variable for the same counter. It + /// will be updated with the global value every time the function gets + /// called. + /// @return diff (number of counts elapsed since last call). + unsigned update_diff(unsigned global_value, unsigned *local_value) + { + unsigned diff = global_value - *local_value; + *local_value = global_value; + return diff; + } + + /// Index of the next packet to generate. + unsigned next_packet = 1; + /// How many buffers have been sent and waiting for the notification on + /// them. + //unsigned pending_buffers = 0; + + /// How many packets we have sent to the output iface. + //unsigned num_packets_sent = 0; + /// How many sent packets got their (send) barrier notified. + unsigned num_packets_accepted = 0; + /// How many sent packets got their (receive) barrier notified and removed + /// from storage. + unsigned num_packets_all_received = 0; + +} stats_timer; + /** Entry point to application. * @param argc number of command line arguments * @param argv array of command line arguments @@ -288,10 +387,15 @@ int appl_main(int argc, char *argv[]) while (1) { + int num_links = 0; for (const auto &p : connections) { - p->ping(); + if (p->ping()) + { + num_links++; + } } + g_num_live_links = num_links; sleep(1); } From b05236bee37e6930f2445ea742a7388e86cb07f8 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Wed, 27 Dec 2023 10:29:43 +0100 Subject: [PATCH 06/38] Adds receiver class to hub_test. --- .../hub_test/targets/linux.x86/main.cxx | 134 ++++++++++++++++-- 1 file changed, 120 insertions(+), 14 deletions(-) diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx index 9675fa122..c138ab99e 100644 --- a/applications/hub_test/targets/linux.x86/main.cxx +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -44,12 +44,23 @@ OVERRIDE_CONST(gc_generate_newlines, 1); Executor<1> g_executor("g_executor", 0, 1024); Service g_service(&g_executor); +class Receiver; + struct Link { + /// Config (cmdline): device file to use const char *device_path = nullptr; + /// Config (cmdline) TCP target port number int upstream_port = 12021; + /// Config (cmdline) TCP target hostname const char *upstream_host = nullptr; + /// The CAN hub for this link. std::unique_ptr can_hub {new CanHubFlow(&g_service)}; + /// The receiver object keeping track of the inputs and stats. Will be + /// created for every link, but only used for the input ports. + std::unique_ptr receiver; + + Link(); }; /// We generate packets to this interface. @@ -84,7 +95,7 @@ const char *arg0 = "hub_test"; int g_num_live_links = 1; /// Index of the next packet to generate. -unsigned g_next_packet = 1; +uint32_t g_next_packet = 1; /// How many buffers have been sent and waiting for the notification on them. unsigned g_pending_buffers = 0; @@ -162,6 +173,92 @@ void parse_args(int argc, char *argv[]) } } + +class Receiver : public CanHubPortInterface +{ +public: + Receiver(CanHubFlow *hub) + { } + + void send(message_type *buf, unsigned prio) override + { + auto rb = get_buffer_deleter(buf); + const struct can_frame &f = buf->data()->frame(); + if (f.can_dlc != 8 || !IS_CAN_FRAME_EFF(f)) + { + // not interesting frame + ++numUnknownFrames_; + return; + } + auto id = GET_CAN_FRAME_ID_EFF(f); + if (id != SEND_HEADER) + { + // not interesting frame + ++numUnknownFrames_; + return; + } + if (memcmp(f.data, SEND_PAYLOAD, 4) != 0) + { + // not interesting frame + ++numUnknownFrames_; + return; + } + ++numFrames_; + uint32_t cnt = 0; + memcpy(&cnt, f.data + 4, 4); + cnt = be32toh(cnt); + if (cnt == nextPacket_) + { + ++numInOrder_; + ++nextPacket_; + } + else if (cnt > nextPacket_) + { + numMissed_ = cnt - nextPacket_; + nextPacket_ = cnt + 1; + } + else + { + numOutOfOrder_++; + // we don't adjust next packet here + } + } + + /// Gets a stats line for this input. + string get_stats() + { + string ret; + ret += StringPrintf("\tReceiver: +%d recv, +%d unk, +%d OK, +%d " + "out-of-order, +%d missing\n", + numFrames_, numUnknownFrames_, numInOrder_, numOutOfOrder_, + numMissed_); + numFrames_ = numUnknownFrames_ = numInOrder_ = numOutOfOrder_ = + numMissed_ = 0; + return ret; + } + +public: + /// Number of incoming frames that we don't recognize. + unsigned numUnknownFrames_ = 0; + /// Number of successful frames received. + unsigned numFrames_ = 0; + /// Number of in-order frames. + unsigned numInOrder_ = 0; + /// Number of frames missed. These may be lost or will arrive later out of + /// order. + unsigned numMissed_ = 0; + /// Number of frames that arrived late / out of order. + unsigned numOutOfOrder_ = 0; + +private: + /// Which packet index are we expecting next. + uint32_t nextPacket_ = 1; +}; + +Link::Link() + : receiver {new Receiver(can_hub.get())} +{ } + /// Establishes a new connection. void add_link(Link *link, bool is_receiver) { @@ -208,12 +305,9 @@ struct PacketInfo : private Notifiable /// Number of receivers that have not yet gotten this message. unsigned pendingReceivers_ {0}; -private: - void notify() override + /// Notifies that this packet was received by a receiver link. + void notify_reception() { - confirmTs_ = os_get_time_monotonic(); - g_pending_buffers--; - g_num_packets_accepted++; if (pendingReceivers_) { --pendingReceivers_; @@ -225,6 +319,14 @@ struct PacketInfo : private Notifiable packet_complete(index_); } } + +private: + void notify() override + { + confirmTs_ = os_get_time_monotonic(); + g_pending_buffers--; + g_num_packets_accepted++; + } }; std::map g_packet_data; @@ -326,13 +428,17 @@ class StatsTimer : public ::Timer string ret; ret += "Send: "; unsigned d = update_diff(g_next_packet, &next_packet); - ret += StringPrintf("+%d gen", d); + ret += StringPrintf("+%" PRIu32 " gen", d); d = update_diff(g_num_packets_accepted, &num_packets_accepted); ret += StringPrintf(" +%d send complete", d); - d = update_diff(g_num_packets_all_received, &num_packets_accepted); + d = update_diff(g_num_packets_all_received, &num_packets_all_received); ret += StringPrintf(" +%d recv complete", d); // @todo add stats about send queues. - // @todo add stats about receivers. + ret += "\n"; + for (const auto &lnk : input_ports) + { + ret += lnk.receiver->get_stats(); + } // @todo print using hubdeviceselect instead of blocking output. printf("%s\n", ret.c_str()); } @@ -344,21 +450,21 @@ class StatsTimer : public ::Timer /// will be updated with the global value every time the function gets /// called. /// @return diff (number of counts elapsed since last call). - unsigned update_diff(unsigned global_value, unsigned *local_value) + template T update_diff(T global_value, T *local_value) { - unsigned diff = global_value - *local_value; + T diff = global_value - *local_value; *local_value = global_value; return diff; } /// Index of the next packet to generate. - unsigned next_packet = 1; + uint32_t next_packet = 1; /// How many buffers have been sent and waiting for the notification on /// them. - //unsigned pending_buffers = 0; + // unsigned pending_buffers = 0; /// How many packets we have sent to the output iface. - //unsigned num_packets_sent = 0; + // unsigned num_packets_sent = 0; /// How many sent packets got their (send) barrier notified. unsigned num_packets_accepted = 0; /// How many sent packets got their (receive) barrier notified and removed From 9130af0d27111123f44dd0308116d04f01f13ebb Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:09:47 +0100 Subject: [PATCH 07/38] Adds constants for TCP socket options fo SO_SNDBUF and SO_RCVBUF --- include/nmranet_config.h | 8 ++++++++ src/utils/constants.cxx | 6 +++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/nmranet_config.h b/include/nmranet_config.h index c39a176dd..5f09d9008 100644 --- a/include/nmranet_config.h +++ b/include/nmranet_config.h @@ -104,6 +104,14 @@ DECLARE_CONST(gridconnect_bridge_max_incoming_packets); /// output socket cannot send the data fast enough. DECLARE_CONST(gridconnect_bridge_max_outgoing_packets); +/// TCP receive buffer size in bytes for gridconnect hubs. Used via +/// setsockopt(SO_RCVBUF). Set to 1 (default) to not bound it. +DECLARE_CONST(gridconnect_tcp_rcv_buffer_size); + +/// TCP send buffer size in bytes for gridconnect hubs. Used via +/// setsockopt(SO_SENDBUF). Set to 1 (default) to not bound it. +DECLARE_CONST(gridconnect_tcp_snd_buffer_size); + /** Number of bytes of gridconnect data to buffer before sending off the * lowlevel system (such as TCP socket). */ DECLARE_CONST(gridconnect_buffer_size); diff --git a/src/utils/constants.cxx b/src/utils/constants.cxx index d25f90ca1..bef6b033f 100644 --- a/src/utils/constants.cxx +++ b/src/utils/constants.cxx @@ -150,6 +150,10 @@ DEFAULT_CONST(gridconnect_port_max_incoming_packets, 6); DEFAULT_CONST(gridconnect_bridge_max_incoming_packets, 1); /// 1 = infinite DEFAULT_CONST(gridconnect_bridge_max_outgoing_packets, 1); +/// 1 = don't set +DEFAULT_CONST(gridconnect_tcp_rcv_buffer_size, 1); +/// 1 = don't set +DEFAULT_CONST(gridconnect_tcp_snd_buffer_size, 1); DEFAULT_CONST_FALSE(gridconnect_tcp_use_select); @@ -163,4 +167,4 @@ DEFAULT_CONST(socket_listener_backlog, 1); DEFAULT_CONST(socket_listener_stack_size, 1000); /// Allow up to five sockets to be pending for accept() in SocketListener. DEFAULT_CONST(socket_listener_backlog, 5); -#endif \ No newline at end of file +#endif From 649844199d9d9155265040d1f62571abd34b8253 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:10:17 +0100 Subject: [PATCH 08/38] Optimizes the memory and buffer use of the hub application. --- applications/hub/main.cxx | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/applications/hub/main.cxx b/applications/hub/main.cxx index 67315b3ea..ab2c12f1c 100644 --- a/applications/hub/main.cxx +++ b/applications/hub/main.cxx @@ -56,6 +56,12 @@ CanHubFlow can_hub0(&g_service); OVERRIDE_CONST(gc_generate_newlines, 1); OVERRIDE_CONST(gridconnect_buffer_size, 1300); OVERRIDE_CONST(gridconnect_buffer_delay_usec, 2000); +OVERRIDE_CONST(gridconnect_bridge_max_incoming_packets, 5); +OVERRIDE_CONST(gridconnect_bridge_max_outgoing_packets, 5); +OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 8192); +OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 3100); + +OVERRIDE_CONST_TRUE(gridconnect_tcp_use_select); int port = 12021; From 84b480833e4b18e3f5e0194183a28c1fd9a2b578 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:10:49 +0100 Subject: [PATCH 09/38] Ensures that an executor thread can not be created twice. --- src/os/OS.hxx | 1 + 1 file changed, 1 insertion(+) diff --git a/src/os/OS.hxx b/src/os/OS.hxx index e4cb385d8..50edeb6cd 100644 --- a/src/os/OS.hxx +++ b/src/os/OS.hxx @@ -77,6 +77,7 @@ public: */ void start(const char *name, int priority, size_t stack_size) { + HASSERT(!is_created()); os_thread_create(&handle, name, priority, stack_size, start, this); } From df9671407f97db7a130e3444b52dd7dd2ae6c1b4 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:16:22 +0100 Subject: [PATCH 10/38] hub-test: Implements links and prints statistics about sent data. --- .../hub_test/targets/linux.x86/main.cxx | 192 +++++++++++------- 1 file changed, 120 insertions(+), 72 deletions(-) diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx index c138ab99e..27d31d440 100644 --- a/applications/hub_test/targets/linux.x86/main.cxx +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -32,14 +32,23 @@ * @date 23 Dec 2023 */ +#include + #include "nmranet_config.h" #include "os/os.h" +#include "os/sleep.h" +#include "utils/Buffer.hxx" #include "utils/ClientConnection.hxx" #include "utils/LimitedPool.hxx" #include "utils/StringPrintf.hxx" +#include "utils/Stats.hxx" OVERRIDE_CONST(gc_generate_newlines, 1); +OVERRIDE_CONST(gridconnect_bridge_max_outgoing_packets, 2); +OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 1024); +OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 8192); + Executor<1> g_executor("g_executor", 0, 1024); Service g_service(&g_executor); @@ -52,15 +61,13 @@ struct Link const char *device_path = nullptr; /// Config (cmdline) TCP target port number int upstream_port = 12021; - /// Config (cmdline) TCP target hostname + /// Config (cmdline) TCP target hostname const char *upstream_host = nullptr; /// The CAN hub for this link. std::unique_ptr can_hub {new CanHubFlow(&g_service)}; /// The receiver object keeping track of the inputs and stats. Will be /// created for every link, but only used for the input ports. std::unique_ptr receiver; - - Link(); }; /// We generate packets to this interface. @@ -173,15 +180,66 @@ void parse_args(int argc, char *argv[]) } } +/// Call this function when a packet has all its receivers confirmed. +/// @param index packet number. +void packet_complete(unsigned index); + +struct PacketInfo : private Notifiable +{ + unsigned index_; + + /// Timestamp when the timer ticked to send this packet. + long long timerTs_ {0}; + /// Timestamp when the flow started working on this packet. + long long flowTs_ {0}; + /// Timestamp when the buffer was handed over to the output hub. + long long sendTs_ {0}; + /// Timestamp when the notifiable on the buffer triggered. + long long confirmTs_ {0}; + + /// Send frame notification barrier. + BarrierNotifiable bn_ {this}; + + /// Number of receivers that have not yet gotten this message. + unsigned pendingReceivers_ {0}; + + /// Notifies that this packet was received by a receiver link. + void notify_reception() + { + if (pendingReceivers_) + { + --pendingReceivers_; + } + if (!pendingReceivers_) + { + // Confirmed by all receivers. Removes from storage. + g_num_packets_all_received++; + packet_complete(index_); + } + } + +private: + void notify() override + { + confirmTs_ = os_get_time_monotonic(); + g_pending_buffers--; + g_num_packets_accepted++; + } +}; + +std::map g_packet_data; class Receiver : public CanHubPortInterface { public: Receiver(CanHubFlow *hub) - { } + { + hub->register_port(this); + } void send(message_type *buf, unsigned prio) override { + auto ts = os_get_time_monotonic(); auto rb = get_buffer_deleter(buf); const struct can_frame &f = buf->data()->frame(); if (f.can_dlc != 8 || !IS_CAN_FRAME_EFF(f)) @@ -222,6 +280,13 @@ class Receiver : public CanHubPortInterface numOutOfOrder_++; // we don't adjust next packet here } + auto it = g_packet_data.find(cnt); + if (it != g_packet_data.end()) + { + long long rtt_usec = NSEC_TO_USEC(ts - it->second.confirmTs_); + rttUsec_.add(rtt_usec); + it->second.notify_reception(); + } } /// Gets a stats line for this input. @@ -229,11 +294,16 @@ class Receiver : public CanHubPortInterface { string ret; ret += StringPrintf("\tReceiver: +%d recv, +%d unk, +%d OK, +%d " - "out-of-order, +%d missing\n", + "out-of-order, +%d missing", numFrames_, numUnknownFrames_, numInOrder_, numOutOfOrder_, numMissed_); numFrames_ = numUnknownFrames_ = numInOrder_ = numOutOfOrder_ = numMissed_ = 0; + + ret += StringPrintf("|RTT %.1f msec +- %.1f\n", + rttUsec_.favg()/1000, rttUsec_.stddev() / 1000); + + rttUsec_.clear(); return ret; } @@ -253,12 +323,11 @@ class Receiver : public CanHubPortInterface private: /// Which packet index are we expecting next. uint32_t nextPacket_ = 1; + /// Statistics about the RTT of the packet (from send-confirm to + /// receive). + Stats rttUsec_; }; -Link::Link() - : receiver {new Receiver(can_hub.get())} -{ } - /// Establishes a new connection. void add_link(Link *link, bool is_receiver) { @@ -279,57 +348,12 @@ void add_link(Link *link, bool is_receiver) { usage(arg0); } - ++id; -} - -/// Call this function when a packet has all its receivers confirmed. -/// @param index packet number. -void packet_complete(unsigned index); - -struct PacketInfo : private Notifiable -{ - unsigned index_; - - /// Timestamp when the timer ticked to send this packet. - long long timerTs_ {0}; - /// Timestamp when the flow started working on this packet. - long long flowTs_ {0}; - /// Timestamp when the buffer was handed over to the output hub. - long long sendTs_ {0}; - /// Timestamp when the notifiable on the buffer triggered. - long long confirmTs_ {0}; - - /// Send frame notification barrier. - BarrierNotifiable bn_ {this}; - - /// Number of receivers that have not yet gotten this message. - unsigned pendingReceivers_ {0}; - - /// Notifies that this packet was received by a receiver link. - void notify_reception() - { - if (pendingReceivers_) - { - --pendingReceivers_; - } - if (!pendingReceivers_) - { - // Confirmed by all receivers. Removes from storage. - g_num_packets_all_received++; - packet_complete(index_); - } - } - -private: - void notify() override + if (is_receiver) { - confirmTs_ = os_get_time_monotonic(); - g_pending_buffers--; - g_num_packets_accepted++; + link->receiver.reset(new Receiver(link->can_hub.get())); } -}; - -std::map g_packet_data; + ++id; +} void packet_complete(unsigned index) { @@ -339,9 +363,9 @@ void packet_complete(unsigned index) struct SendPacketRequest { /// Packet number to output (sequence number). - unsigned index; + unsigned index = 0; /// When was this packet generated by the timer. - uint64_t generateTsNsec; + uint64_t generateTsNsec = 0; }; /// Implements the state machine to acquire a buffer and send an outgoing @@ -355,6 +379,17 @@ class SendFlow : public StateFlow, QList<1>> Action entry() override { + auto it = g_packet_data.find(message()->data()->index); + if (it != g_packet_data.end()) + { + LOG(FATAL, + "duplicate packet index %d, %d, old send Ts %lld, new send ts " + "%" PRId64, + message()->data()->index, it->second.index_, + it->second.timerTs_, message()->data()->generateTsNsec); + DIE("duplicate packet"); + } + HASSERT(it == g_packet_data.end()); pinfo_ = &g_packet_data[message()->data()->index]; pinfo_->index_ = message()->data()->index; pinfo_->flowTs_ = os_get_time_monotonic(); @@ -365,7 +400,8 @@ class SendFlow : public StateFlow, QList<1>> Action have_buffer() { - BufferType *b = get_allocation_result(output_port.can_hub.get()); + g_pending_buffers++; + auto *b = get_allocation_result(output_port.can_hub.get()); b->set_done(&pinfo_->bn_); auto &f = *b->data()->mutable_frame(); SET_CAN_FRAME_ID_EFF(f, SEND_HEADER); @@ -374,16 +410,16 @@ class SendFlow : public StateFlow, QList<1>> uint32_t idx_be = htobe32(pinfo_->index_); memcpy(f.data + 4, &idx_be, 4); b->data()->skipMember_ = nullptr; - output_port.can_hub->send(b); - pinfo_->sendTs_ = os_get_time_monotonic(); pinfo_->pendingReceivers_ = std::max(0, g_num_live_links - 1); g_num_packets_sent++; + pinfo_->sendTs_ = os_get_time_monotonic(); + output_port.can_hub->send(b); return release_and_exit(); } private: - using BufferType = CanHubFlow::buffer_type; - LimitedPool pool_ {sizeof(BufferType), SEND_PARALLELISM}; + LimitedPool pool_ { + sizeof(Buffer>), SEND_PARALLELISM}; PacketInfo *pinfo_; } g_send_flow; @@ -399,6 +435,7 @@ class PacketGenTimer : public ::Timer long long timeout() override { auto *b = g_send_flow.alloc(); + HASSERT(b); b->data()->index = g_next_packet++; b->data()->generateTsNsec = this->schedule_time(); g_send_flow.send(b); @@ -413,9 +450,7 @@ class StatsTimer : public ::Timer public: StatsTimer() : Timer(g_executor.active_timers()) - { - start(SEC_TO_NSEC(1)); - } + { } long long timeout() override { @@ -428,12 +463,18 @@ class StatsTimer : public ::Timer string ret; ret += "Send: "; unsigned d = update_diff(g_next_packet, &next_packet); - ret += StringPrintf("+%" PRIu32 " gen", d); + ret += StringPrintf("|+%" PRIu32 " gen", d); d = update_diff(g_num_packets_accepted, &num_packets_accepted); - ret += StringPrintf(" +%d send complete", d); + ret += StringPrintf("|+%d send complete", d); d = update_diff(g_num_packets_all_received, &num_packets_all_received); - ret += StringPrintf(" +%d recv complete", d); + ret += StringPrintf("|+%d recv complete", d); + int send_q = -2; + ::ioctl(connections[0]->fd(), TIOCOUTQ, &send_q); // @todo add stats about send queues. + ret += StringPrintf( + "||sendq %d|pending buf %u| sent>acc %u| acc>recv %d|", send_q, + g_pending_buffers, g_num_packets_sent - g_num_packets_accepted, + g_num_packets_accepted - g_num_packets_all_received); ret += "\n"; for (const auto &lnk : input_ports) { @@ -482,14 +523,21 @@ int appl_main(int argc, char *argv[]) { arg0 = argv[0]; parse_args(argc, argv); + add_link(&output_port, false); + for (auto &l : input_ports) + { + add_link(&l, true); + } - g_executor.start_thread("executor_thread", 0, 5000); + // g_executor.start_thread("executor_thread", 0, 5000); if (pkt_per_sec > 0) { long long diff = SEC_TO_NSEC(1) / pkt_per_sec; pkt_gen_timer.start(diff); } + microsleep(100); + stats_timer.start(SEC_TO_NSEC(1)); while (1) { From 954956d85625772a50a5882ed2bfe176a7b03a7b Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:17:24 +0100 Subject: [PATCH 11/38] gc-tcp-hub: applies SO_SNDBUF and SO_RCVBUF options to sockets. --- src/utils/GcTcpHub.cxx | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/utils/GcTcpHub.cxx b/src/utils/GcTcpHub.cxx index fe87501a0..043b9f216 100644 --- a/src/utils/GcTcpHub.cxx +++ b/src/utils/GcTcpHub.cxx @@ -31,10 +31,11 @@ * @date 26 Apr 2014 */ -#include - #include "utils/GcTcpHub.hxx" +#include +#include + #include "nmranet_config.h" #include "utils/GridConnectHub.hxx" @@ -46,6 +47,16 @@ void GcTcpHub::on_new_connection(int fd) AtomicHolder h(this); numClients_++; } + const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); + if (rcvbuf > 1) + { + ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); + } + const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); + if (sndbuf > 1) + { + ::setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); + } create_gc_port_for_can_hub(canHub_, fd, this, use_select); } From a85ca9f384273470a89f67e979e1eaa79955e442 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:18:16 +0100 Subject: [PATCH 12/38] client connection util: exposes the file descriptor when available. Applies the gridconnect use select link option. Applies the SO_SNDBUF/RCVBUF link option. --- src/utils/ClientConnection.cxx | 77 ++++++++++++++++++++++++++++++++++ src/utils/ClientConnection.hxx | 15 ++++--- 2 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 src/utils/ClientConnection.cxx diff --git a/src/utils/ClientConnection.cxx b/src/utils/ClientConnection.cxx new file mode 100644 index 000000000..9f002ac27 --- /dev/null +++ b/src/utils/ClientConnection.cxx @@ -0,0 +1,77 @@ +/** \copyright + * Copyright (c) 2023, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file ClientConnection.cxx + * + * Utilities for managing can-hub connections as a client application. + * + * @author Balazs Racz + * @date 27 Dec 2023 + */ + +#include "utils/ClientConnection.hxx" + +#include "netinet/in.h" +#include "netinet/tcp.h" +#include "nmranet_config.h" + +/** Callback from try_connect to donate the file descriptor. @param fd is + * the file destriptor of the connection freshly opened. */ +void GCFdConnectionClient::connection_complete(int fd) +{ + struct stat statbuf; + fstat(fd, &statbuf); + + const bool use_select = + (config_gridconnect_tcp_use_select() == CONSTANT_TRUE); + fd_ = fd; + const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); + if (rcvbuf > 1) + { + ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); + } + const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); + if (sndbuf > 1) + { + ::setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); + int ret = 0; + socklen_t retsize = sizeof(ret); + ::getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ret, &retsize); + LOG(ALWAYS, "fd %d sndbuf %d", fd, ret); + } + const int lowat = 4096; + if (lowat > 1 && S_ISSOCK(statbuf.st_mode)) + { + ERRNOCHECK("tcp lowat", + ::setsockopt( + fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &lowat, sizeof(lowat))); + int ret = 0; + socklen_t retsize = sizeof(ret); + ::getsockopt(fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &ret, &retsize); + LOG(ALWAYS, "fd %d lowat %d", fd, ret); + } + create_gc_port_for_can_hub(hub_, fd, &closedNotify_, use_select); +} diff --git a/src/utils/ClientConnection.hxx b/src/utils/ClientConnection.hxx index b29ef0928..8e9753520 100644 --- a/src/utils/ClientConnection.hxx +++ b/src/utils/ClientConnection.hxx @@ -51,6 +51,10 @@ public: * is dead. @returns true if there is a live connection. */ virtual bool ping() = 0; + /// @return the file descriptor, if this connection has one, or -1 if the + /// connection is down or doesn't have an fd. + virtual int fd() { return -1; } + virtual ~ConnectionClient() { } @@ -114,6 +118,11 @@ public: return fd_ >= 0; } + int fd() override + { + return fd_; + } + protected: /// Abstrct base function to attempt to connect (or open device) to the /// destination. @@ -121,11 +130,7 @@ protected: /** Callback from try_connect to donate the file descriptor. @param fd is * the file destriptor of the connection freshly opened. */ - void connection_complete(int fd) - { - fd_ = fd; - create_gc_port_for_can_hub(hub_, fd, &closedNotify_); - } + void connection_complete(int fd); private: /// Will be called when the descriptor experiences an error (typivcally From 8ee55a09d520f24633bb70e073f513970ea3ab6e Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:20:24 +0100 Subject: [PATCH 13/38] Limits end to end buffering in gridconnect bridge. When a dispatcher was used to send traffic to a gridconnect bridge, an infinite number of packets were allocated in the dispatcher when more than one client was connected. By adding a limitedpool to the bridge object, we can ensure that the dispatcher is blocked when the output queue pushes back. --- src/utils/GridConnectHub.cxx | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/utils/GridConnectHub.cxx b/src/utils/GridConnectHub.cxx index b148d06ae..79408b756 100644 --- a/src/utils/GridConnectHub.cxx +++ b/src/utils/GridConnectHub.cxx @@ -135,6 +135,14 @@ class GCAdapter : public GCAdapterBase , skipMember_(skip_member) , double_bytes_(double_bytes) { + const int cnt = config_gridconnect_bridge_max_outgoing_packets(); + if (cnt > 1) { + ownedPool_.reset( + new LimitedPool(sizeof(Buffer), cnt)); + pool_ = ownedPool_.get(); + } else { + pool_ = mainBufferPool; + } } /// @return where to write the packets to. @@ -143,10 +151,27 @@ class GCAdapter : public GCAdapterBase return destination_; } - bool shutdown() { - return delayPort_.shutdown(); + /// @return Triggers releasing all memory after a close. Returns true + /// if it's safe to delete this. + bool shutdown() + { + const int cnt = config_gridconnect_bridge_max_outgoing_packets(); + bool state_delay = delayPort_.shutdown(); + bool state_pool = true; + if (ownedPool_) + { + state_pool = (int(ownedPool_->free_items()) == cnt); + } + return state_delay && state_pool; } - + + /// The dispatcher will be using this pool to allocate frames when a + /// hub needs to make a copy for an outgoing queue. + Pool *pool() override + { + return pool_; + } + Action entry() override { LOG(VERBOSE, "can packet arrived: %" PRIx32, @@ -185,6 +210,10 @@ class GCAdapter : public GCAdapterBase /// Helper class that assembles larger outgoing packets from the /// individual packets by delaying data a little bit. BufferPort delayPort_; + /// If we want frame limits, this pool can do that for us. + std::unique_ptr ownedPool_; + /// The allocation buffer pool to use for outgoing frames. + Pool* pool_; /// Destination buffer (characters). char dbuf_[56]; /// Pipe to send data to. From 0d0cefa2719e60b22378c7a6221a0d073a9f8a98 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:20:43 +0100 Subject: [PATCH 14/38] Adds a utility class for computing simple statistics. --- src/utils/Stats.hxx | 98 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 src/utils/Stats.hxx diff --git a/src/utils/Stats.hxx b/src/utils/Stats.hxx new file mode 100644 index 000000000..36f9fbb9c --- /dev/null +++ b/src/utils/Stats.hxx @@ -0,0 +1,98 @@ +/** \copyright + * Copyright (c) 2023, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file Stats.hxx + * + * Utility class for collecting statistics. + * + * @author Balazs Racz + * @date 28 Dec 2023 + */ + +#ifndef _UTILS_STATS_HXX_ +#define _UTILS_STATS_HXX_ + +#include +#include + +class Stats +{ +public: + using FloatType = double; + + Stats() + { + clear(); + } + + // Clear the statistics (erase all data points). + void clear() + { + sum_ = 0; + count_ = 0; + qsum_ = 0; + } + + /// Appends a data point to the statistics. + /// @param value the data point. + void add(int32_t value) + { + ++count_; + sum_ += value; + FloatType fval = value; + qsum_ += fval * fval; + } + + /// @return the average + FloatType favg() + { + return FloatType(sum_) / count_; + } + + /// @return the average (rounded down to nearest integer). + int32_t avg() + { + return sum_ / count_; + } + + /// @return the sample standard deviation (uncorrected). + FloatType stddev() + { + // Formula: sqrt(N*qsum - sum^2) / N + FloatType sum(sum_); + return sqrt(qsum_ * count_ - sum * sum) / count_; + } + +private: + /// Number of samples added. + uint32_t count_; + /// Sum of sample values added. + int64_t sum_; + /// Sum of squares of sample values added. + FloatType qsum_; +}; + +#endif // _UTILS_STATS_HXX_ From 97552790a86540fc279b87fc26715bbfd07fa2fa Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:21:11 +0100 Subject: [PATCH 15/38] Ensures that the ClientCOnnection.cxx is built and linked. --- src/utils/sources | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils/sources b/src/utils/sources index 8e7a00a40..86c2ea8e4 100644 --- a/src/utils/sources +++ b/src/utils/sources @@ -7,6 +7,7 @@ CXXSRCS += \ Base64.cxx \ Blinker.cxx \ CanIf.cxx \ + ClientConnection.cxx \ Crc.cxx \ StringPrintf.cxx \ Buffer.cxx \ From e2b97692137fb71a83f0490e6fc81265e9dd7bae Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:21:26 +0100 Subject: [PATCH 16/38] Sorts lines in sources. --- src/utils/sources | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/utils/sources b/src/utils/sources index 86c2ea8e4..5ab782c84 100644 --- a/src/utils/sources +++ b/src/utils/sources @@ -18,19 +18,19 @@ CXXSRCS += \ GcTcpHub.cxx \ GridConnect.cxx \ GridConnectHub.cxx \ - format_utils.cxx \ HubDevice.cxx \ HubDeviceSelect.cxx \ - Queue.cxx \ JSHubPort.cxx \ + Queue.cxx \ ReflashBootloader.cxx \ + ServiceLocator.cxx \ SocketCan.cxx \ + SocketClient.cxx \ constants.cxx \ + format_utils.cxx \ gc_format.cxx \ logging.cxx \ - SocketClient.cxx \ socket_listener.cxx \ - ServiceLocator.cxx \ CXXTESTSRCS += BufferQueue.cxxtest \ From 47cb9810f16afbf7726a959463e0d4ac3f4a09d3 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:10:49 +0100 Subject: [PATCH 17/38] Ensures that an executor thread can not be created twice. --- src/os/OS.hxx | 1 + 1 file changed, 1 insertion(+) diff --git a/src/os/OS.hxx b/src/os/OS.hxx index e4cb385d8..50edeb6cd 100644 --- a/src/os/OS.hxx +++ b/src/os/OS.hxx @@ -77,6 +77,7 @@ public: */ void start(const char *name, int priority, size_t stack_size) { + HASSERT(!is_created()); os_thread_create(&handle, name, priority, stack_size, start, this); } From 4bd04c6c81b987a82f5f399bd104fb0eeb9b2f7b Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:20:24 +0100 Subject: [PATCH 18/38] Limits end to end buffering in gridconnect bridge. When a dispatcher was used to send traffic to a gridconnect bridge, an infinite number of packets were allocated in the dispatcher when more than one client was connected. By adding a limitedpool to the bridge object, we can ensure that the dispatcher is blocked when the output queue pushes back. --- src/utils/GridConnectHub.cxx | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/utils/GridConnectHub.cxx b/src/utils/GridConnectHub.cxx index b148d06ae..79408b756 100644 --- a/src/utils/GridConnectHub.cxx +++ b/src/utils/GridConnectHub.cxx @@ -135,6 +135,14 @@ class GCAdapter : public GCAdapterBase , skipMember_(skip_member) , double_bytes_(double_bytes) { + const int cnt = config_gridconnect_bridge_max_outgoing_packets(); + if (cnt > 1) { + ownedPool_.reset( + new LimitedPool(sizeof(Buffer), cnt)); + pool_ = ownedPool_.get(); + } else { + pool_ = mainBufferPool; + } } /// @return where to write the packets to. @@ -143,10 +151,27 @@ class GCAdapter : public GCAdapterBase return destination_; } - bool shutdown() { - return delayPort_.shutdown(); + /// @return Triggers releasing all memory after a close. Returns true + /// if it's safe to delete this. + bool shutdown() + { + const int cnt = config_gridconnect_bridge_max_outgoing_packets(); + bool state_delay = delayPort_.shutdown(); + bool state_pool = true; + if (ownedPool_) + { + state_pool = (int(ownedPool_->free_items()) == cnt); + } + return state_delay && state_pool; } - + + /// The dispatcher will be using this pool to allocate frames when a + /// hub needs to make a copy for an outgoing queue. + Pool *pool() override + { + return pool_; + } + Action entry() override { LOG(VERBOSE, "can packet arrived: %" PRIx32, @@ -185,6 +210,10 @@ class GCAdapter : public GCAdapterBase /// Helper class that assembles larger outgoing packets from the /// individual packets by delaying data a little bit. BufferPort delayPort_; + /// If we want frame limits, this pool can do that for us. + std::unique_ptr ownedPool_; + /// The allocation buffer pool to use for outgoing frames. + Pool* pool_; /// Destination buffer (characters). char dbuf_[56]; /// Pipe to send data to. From 46cfb1cf3da06780bea262b07293d9dde8297cbb Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:56:55 +0100 Subject: [PATCH 19/38] fix whitespace --- src/utils/GridConnectHub.cxx | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/utils/GridConnectHub.cxx b/src/utils/GridConnectHub.cxx index 79408b756..b23bdb4f1 100644 --- a/src/utils/GridConnectHub.cxx +++ b/src/utils/GridConnectHub.cxx @@ -136,11 +136,14 @@ class GCAdapter : public GCAdapterBase , double_bytes_(double_bytes) { const int cnt = config_gridconnect_bridge_max_outgoing_packets(); - if (cnt > 1) { + if (cnt > 1) + { ownedPool_.reset( new LimitedPool(sizeof(Buffer), cnt)); pool_ = ownedPool_.get(); - } else { + } + else + { pool_ = mainBufferPool; } } @@ -213,7 +216,7 @@ class GCAdapter : public GCAdapterBase /// If we want frame limits, this pool can do that for us. std::unique_ptr ownedPool_; /// The allocation buffer pool to use for outgoing frames. - Pool* pool_; + Pool *pool_; /// Destination buffer (characters). char dbuf_[56]; /// Pipe to send data to. From 567021afb189a7646681d1eaf4fe31b1e016d088 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 11:49:30 +0100 Subject: [PATCH 20/38] Refactors the fd optimization code to FdUtils class. --- src/utils/ClientConnection.cxx | 38 +++------- src/utils/ClientConnection.hxx | 13 +--- src/utils/FdUtils.cxx | 126 +++++++++++++++++++++++++++++++++ src/utils/FdUtils.hxx | 16 +++++ src/utils/GcTcpHub.cxx | 13 +--- src/utils/sources | 1 + 6 files changed, 156 insertions(+), 51 deletions(-) create mode 100644 src/utils/FdUtils.cxx diff --git a/src/utils/ClientConnection.cxx b/src/utils/ClientConnection.cxx index 9f002ac27..b73345e9b 100644 --- a/src/utils/ClientConnection.cxx +++ b/src/utils/ClientConnection.cxx @@ -38,40 +38,18 @@ #include "netinet/tcp.h" #include "nmranet_config.h" -/** Callback from try_connect to donate the file descriptor. @param fd is - * the file destriptor of the connection freshly opened. */ +#include "utils/FdUtils.hxx" + +/// Callback from try_connect to donate the file descriptor. +/// @param fd is the file destriptor of the connection freshly opened. void GCFdConnectionClient::connection_complete(int fd) { - struct stat statbuf; - fstat(fd, &statbuf); - const bool use_select = (config_gridconnect_tcp_use_select() == CONSTANT_TRUE); + + // Applies kernel parameters like socket options. + FdUtils::optimize_fd(fd); + fd_ = fd; - const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); - if (rcvbuf > 1) - { - ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); - } - const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); - if (sndbuf > 1) - { - ::setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); - int ret = 0; - socklen_t retsize = sizeof(ret); - ::getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ret, &retsize); - LOG(ALWAYS, "fd %d sndbuf %d", fd, ret); - } - const int lowat = 4096; - if (lowat > 1 && S_ISSOCK(statbuf.st_mode)) - { - ERRNOCHECK("tcp lowat", - ::setsockopt( - fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &lowat, sizeof(lowat))); - int ret = 0; - socklen_t retsize = sizeof(ret); - ::getsockopt(fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &ret, &retsize); - LOG(ALWAYS, "fd %d lowat %d", fd, ret); - } create_gc_port_for_can_hub(hub_, fd, &closedNotify_, use_select); } diff --git a/src/utils/ClientConnection.hxx b/src/utils/ClientConnection.hxx index 8e9753520..553e7fa72 100644 --- a/src/utils/ClientConnection.hxx +++ b/src/utils/ClientConnection.hxx @@ -36,12 +36,12 @@ #define _UTILS_CLIENTCONNECTION_HXX_ #include -#include /* tc* functions */ #include #include #include "utils/GridConnectHub.hxx" #include "utils/socket_listener.hxx" +#include "utils/FdUtils.hxx" /// Abstract base class for the Hub's connections. class ConnectionClient @@ -171,18 +171,9 @@ private: int fd = ::open(dev_.c_str(), O_RDWR); if (fd >= 0) { - // Sets up the terminal in raw mode. Otherwise linux might echo - // characters coming in from the device and that will make - // packets go back to where they came from. - HASSERT(!tcflush(fd, TCIOFLUSH)); - struct termios settings; - HASSERT(!tcgetattr(fd, &settings)); - cfmakeraw(&settings); - cfsetspeed(&settings, B115200); - HASSERT(!tcsetattr(fd, TCSANOW, &settings)); + FdUtils::optimize_tty_fd(fd); LOG(INFO, "Opened device %s.\n", dev_.c_str()); connection_complete(fd); - // } else { diff --git a/src/utils/FdUtils.cxx b/src/utils/FdUtils.cxx new file mode 100644 index 000000000..c1f62f251 --- /dev/null +++ b/src/utils/FdUtils.cxx @@ -0,0 +1,126 @@ +/** \copyright + * Copyright (c) 2023, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file FdUtils.cxx + * + * Helper functions for dealing with posix fds. + * + * @author Balazs Racz + * @date 29 Dec 2023 + */ + +#include "FdUtils.hxx" + +#include +#include +#include +#include /* tc* functions */ + +#include "nmranet_config.h" + +#define PCALL_LOGERR(where, callfn, fd, args...) \ + do \ + { \ + int ret = callfn(fd, args); \ + if (ret < 0) \ + { \ + char buf[256]; \ + strerror_r(errno, buf, sizeof(buf)); \ + LOG_ERROR("fd %d %s: %s", fd, where, buf); \ + } \ + } while (0) + +/// Optimizes the kernel settings like socket and TCP options for an fd +/// that is an outgoing TCP socket. +/// @param fd socket file descriptor. +void FdUtils::optimize_socket_fd(int fd) +{ +#ifdef __linux__ + const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); + if (rcvbuf > 1) + { + PCALL_LOGERR("setsockopt SO_RCVBUF", ::setsockopt, fd, SOL_SOCKET, + SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); + } + const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); + if (sndbuf > 1) + { + PCALL_LOGERR("setsockopt SO_SNDBUF", ::setsockopt, fd, SOL_SOCKET, + SO_SNDBUF, &sndbuf, sizeof(sndbuf)); + int ret = 0; + socklen_t retsize = sizeof(ret); + ::getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ret, &retsize); + LOG(ALWAYS, "fd %d sndbuf %d", fd, ret); + } + const int lowat = 4096; + if (lowat > 1) + { + PCALL_LOGERR("setsockopt tcp_notsent_lowat", ::setsockopt, fd, + IPPROTO_TCP, TCP_NOTSENT_LOWAT, &lowat, sizeof(lowat)); + int ret = 0; + socklen_t retsize = sizeof(ret); + ::getsockopt(fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &ret, &retsize); + LOG(ALWAYS, "fd %d lowat %d", fd, ret); + } +#endif +} + +/// Sets the kernel settings like queuing and terminal settings for an fd +/// that is an outgoing tty. +/// @param fd tty file descriptor. +void FdUtils::optimize_tty_fd(int fd) +{ +#ifdef __linux__ + // Sets up the terminal in raw mode. Otherwise linux might echo + // characters coming in from the device and that will make + // packets go back to where they came from. + HASSERT(!tcflush(fd, TCIOFLUSH)); + struct termios settings; + HASSERT(!tcgetattr(fd, &settings)); + cfmakeraw(&settings); + cfsetspeed(&settings, B115200); + HASSERT(!tcsetattr(fd, TCSANOW, &settings)); +#endif +} + +/// For an fd that is an outgoing link, detects what kind of file +/// descriptor this is and calls the appropriate optimize call for it. +void FdUtils::optimize_fd(int fd) +{ +#ifdef __linux__ + struct stat statbuf; + fstat(fd, &statbuf); + + if (S_ISSOCK(statbuf.st_mode)) + { + optimize_socket_fd(fd); + } + else if (isatty(fd)) + { + optimize_tty_fd(fd); + } +#endif +} diff --git a/src/utils/FdUtils.hxx b/src/utils/FdUtils.hxx index 452894f7e..6ed4b6dc9 100644 --- a/src/utils/FdUtils.hxx +++ b/src/utils/FdUtils.hxx @@ -35,6 +35,8 @@ #ifndef _UTILS_FD_UTILS_HXX_ #define _UTILS_FD_UTILS_HXX_ +#include + #include "utils/logging.h" #include "utils/macros.h" @@ -81,6 +83,20 @@ struct FdUtils dst += ret; } } + + /// Optimizes the kernel settings like socket and TCP options for an fd + /// that is an outgoing TCP socket. + /// @param fd socket file descriptor. + static void optimize_socket_fd(int fd); + + /// Sets the kernel settings like queuing and terminal settings for an fd + /// that is an outgoing tty. + /// @param fd tty file descriptor. + static void optimize_tty_fd(int fd); + + /// For an fd that is an outgoing link, detects what kind of file + /// descriptor this is and calls the appropriate optimize call for it. + static void optimize_fd(int fd); }; #endif // _UTILS_FD_UTILS_HXX_ diff --git a/src/utils/GcTcpHub.cxx b/src/utils/GcTcpHub.cxx index 043b9f216..acded59ad 100644 --- a/src/utils/GcTcpHub.cxx +++ b/src/utils/GcTcpHub.cxx @@ -38,6 +38,7 @@ #include "nmranet_config.h" #include "utils/GridConnectHub.hxx" +#include "utils/FdUtils.hxx" void GcTcpHub::on_new_connection(int fd) { @@ -47,16 +48,8 @@ void GcTcpHub::on_new_connection(int fd) AtomicHolder h(this); numClients_++; } - const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); - if (rcvbuf > 1) - { - ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); - } - const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); - if (sndbuf > 1) - { - ::setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); - } + // Applies kernel parameters like socket options. + FdUtils::optimize_socket_fd(fd); create_gc_port_for_can_hub(canHub_, fd, this, use_select); } diff --git a/src/utils/sources b/src/utils/sources index 5ab782c84..e9fed0849 100644 --- a/src/utils/sources +++ b/src/utils/sources @@ -12,6 +12,7 @@ CXXSRCS += \ StringPrintf.cxx \ Buffer.cxx \ ConfigUpdateListener.cxx \ + FdUtils.cxx \ FileUtils.cxx \ ForwardAllocator.cxx \ GcStreamParser.cxx \ From 52bd450b12203405a0c7b4c1f4b4bb6c455c181b Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:09:47 +0100 Subject: [PATCH 21/38] Adds constants for TCP socket options fo SO_SNDBUF and SO_RCVBUF --- include/nmranet_config.h | 8 ++++++++ src/utils/constants.cxx | 6 +++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/nmranet_config.h b/include/nmranet_config.h index c39a176dd..5f09d9008 100644 --- a/include/nmranet_config.h +++ b/include/nmranet_config.h @@ -104,6 +104,14 @@ DECLARE_CONST(gridconnect_bridge_max_incoming_packets); /// output socket cannot send the data fast enough. DECLARE_CONST(gridconnect_bridge_max_outgoing_packets); +/// TCP receive buffer size in bytes for gridconnect hubs. Used via +/// setsockopt(SO_RCVBUF). Set to 1 (default) to not bound it. +DECLARE_CONST(gridconnect_tcp_rcv_buffer_size); + +/// TCP send buffer size in bytes for gridconnect hubs. Used via +/// setsockopt(SO_SENDBUF). Set to 1 (default) to not bound it. +DECLARE_CONST(gridconnect_tcp_snd_buffer_size); + /** Number of bytes of gridconnect data to buffer before sending off the * lowlevel system (such as TCP socket). */ DECLARE_CONST(gridconnect_buffer_size); diff --git a/src/utils/constants.cxx b/src/utils/constants.cxx index d25f90ca1..bef6b033f 100644 --- a/src/utils/constants.cxx +++ b/src/utils/constants.cxx @@ -150,6 +150,10 @@ DEFAULT_CONST(gridconnect_port_max_incoming_packets, 6); DEFAULT_CONST(gridconnect_bridge_max_incoming_packets, 1); /// 1 = infinite DEFAULT_CONST(gridconnect_bridge_max_outgoing_packets, 1); +/// 1 = don't set +DEFAULT_CONST(gridconnect_tcp_rcv_buffer_size, 1); +/// 1 = don't set +DEFAULT_CONST(gridconnect_tcp_snd_buffer_size, 1); DEFAULT_CONST_FALSE(gridconnect_tcp_use_select); @@ -163,4 +167,4 @@ DEFAULT_CONST(socket_listener_backlog, 1); DEFAULT_CONST(socket_listener_stack_size, 1000); /// Allow up to five sockets to be pending for accept() in SocketListener. DEFAULT_CONST(socket_listener_backlog, 5); -#endif \ No newline at end of file +#endif From cf52f646682c3d1fbb30d06e5344ffbc90f51789 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:10:17 +0100 Subject: [PATCH 22/38] Optimizes the memory and buffer use of the hub application. --- applications/hub/main.cxx | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/applications/hub/main.cxx b/applications/hub/main.cxx index 67315b3ea..ab2c12f1c 100644 --- a/applications/hub/main.cxx +++ b/applications/hub/main.cxx @@ -56,6 +56,12 @@ CanHubFlow can_hub0(&g_service); OVERRIDE_CONST(gc_generate_newlines, 1); OVERRIDE_CONST(gridconnect_buffer_size, 1300); OVERRIDE_CONST(gridconnect_buffer_delay_usec, 2000); +OVERRIDE_CONST(gridconnect_bridge_max_incoming_packets, 5); +OVERRIDE_CONST(gridconnect_bridge_max_outgoing_packets, 5); +OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 8192); +OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 3100); + +OVERRIDE_CONST_TRUE(gridconnect_tcp_use_select); int port = 12021; From c664b910479d0f485b026fb13d333b304da0157c Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:17:24 +0100 Subject: [PATCH 23/38] gc-tcp-hub: applies SO_SNDBUF and SO_RCVBUF options to sockets. --- src/utils/GcTcpHub.cxx | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/utils/GcTcpHub.cxx b/src/utils/GcTcpHub.cxx index fe87501a0..043b9f216 100644 --- a/src/utils/GcTcpHub.cxx +++ b/src/utils/GcTcpHub.cxx @@ -31,10 +31,11 @@ * @date 26 Apr 2014 */ -#include - #include "utils/GcTcpHub.hxx" +#include +#include + #include "nmranet_config.h" #include "utils/GridConnectHub.hxx" @@ -46,6 +47,16 @@ void GcTcpHub::on_new_connection(int fd) AtomicHolder h(this); numClients_++; } + const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); + if (rcvbuf > 1) + { + ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); + } + const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); + if (sndbuf > 1) + { + ::setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); + } create_gc_port_for_can_hub(canHub_, fd, this, use_select); } From 06c6bb44103d28ed97cc6ba9ecc1a0018b7a1968 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:18:16 +0100 Subject: [PATCH 24/38] client connection util: exposes the file descriptor when available. Applies the gridconnect use select link option. Applies the SO_SNDBUF/RCVBUF link option. --- src/utils/ClientConnection.cxx | 77 ++++++++++++++++++++++++++++++++++ src/utils/ClientConnection.hxx | 15 ++++--- 2 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 src/utils/ClientConnection.cxx diff --git a/src/utils/ClientConnection.cxx b/src/utils/ClientConnection.cxx new file mode 100644 index 000000000..9f002ac27 --- /dev/null +++ b/src/utils/ClientConnection.cxx @@ -0,0 +1,77 @@ +/** \copyright + * Copyright (c) 2023, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file ClientConnection.cxx + * + * Utilities for managing can-hub connections as a client application. + * + * @author Balazs Racz + * @date 27 Dec 2023 + */ + +#include "utils/ClientConnection.hxx" + +#include "netinet/in.h" +#include "netinet/tcp.h" +#include "nmranet_config.h" + +/** Callback from try_connect to donate the file descriptor. @param fd is + * the file destriptor of the connection freshly opened. */ +void GCFdConnectionClient::connection_complete(int fd) +{ + struct stat statbuf; + fstat(fd, &statbuf); + + const bool use_select = + (config_gridconnect_tcp_use_select() == CONSTANT_TRUE); + fd_ = fd; + const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); + if (rcvbuf > 1) + { + ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); + } + const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); + if (sndbuf > 1) + { + ::setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); + int ret = 0; + socklen_t retsize = sizeof(ret); + ::getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ret, &retsize); + LOG(ALWAYS, "fd %d sndbuf %d", fd, ret); + } + const int lowat = 4096; + if (lowat > 1 && S_ISSOCK(statbuf.st_mode)) + { + ERRNOCHECK("tcp lowat", + ::setsockopt( + fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &lowat, sizeof(lowat))); + int ret = 0; + socklen_t retsize = sizeof(ret); + ::getsockopt(fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &ret, &retsize); + LOG(ALWAYS, "fd %d lowat %d", fd, ret); + } + create_gc_port_for_can_hub(hub_, fd, &closedNotify_, use_select); +} diff --git a/src/utils/ClientConnection.hxx b/src/utils/ClientConnection.hxx index 35121c2bd..84802dd78 100644 --- a/src/utils/ClientConnection.hxx +++ b/src/utils/ClientConnection.hxx @@ -50,6 +50,10 @@ public: * is dead. @returns true if there is a live connection. */ virtual bool ping() = 0; + /// @return the file descriptor, if this connection has one, or -1 if the + /// connection is down or doesn't have an fd. + virtual int fd() { return -1; } + virtual ~ConnectionClient() { } @@ -113,6 +117,11 @@ public: return fd_ >= 0; } + int fd() override + { + return fd_; + } + protected: /// Abstrct base function to attempt to connect (or open device) to the /// destination. @@ -120,11 +129,7 @@ protected: /** Callback from try_connect to donate the file descriptor. @param fd is * the file destriptor of the connection freshly opened. */ - void connection_complete(int fd) - { - fd_ = fd; - create_gc_port_for_can_hub(hub_, fd, &closedNotify_); - } + void connection_complete(int fd); private: /// Will be called when the descriptor experiences an error (typivcally From f8db2a0adf9b3fcffbb3773f564bbf8a4c899666 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:21:11 +0100 Subject: [PATCH 25/38] Ensures that the ClientCOnnection.cxx is built and linked. --- src/utils/sources | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils/sources b/src/utils/sources index 8e7a00a40..86c2ea8e4 100644 --- a/src/utils/sources +++ b/src/utils/sources @@ -7,6 +7,7 @@ CXXSRCS += \ Base64.cxx \ Blinker.cxx \ CanIf.cxx \ + ClientConnection.cxx \ Crc.cxx \ StringPrintf.cxx \ Buffer.cxx \ From c573f41ff469e88bb26f0c6dcf88a30e8caf9c69 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 10:21:26 +0100 Subject: [PATCH 26/38] Sorts lines in sources. --- src/utils/sources | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/utils/sources b/src/utils/sources index 86c2ea8e4..5ab782c84 100644 --- a/src/utils/sources +++ b/src/utils/sources @@ -18,19 +18,19 @@ CXXSRCS += \ GcTcpHub.cxx \ GridConnect.cxx \ GridConnectHub.cxx \ - format_utils.cxx \ HubDevice.cxx \ HubDeviceSelect.cxx \ - Queue.cxx \ JSHubPort.cxx \ + Queue.cxx \ ReflashBootloader.cxx \ + ServiceLocator.cxx \ SocketCan.cxx \ + SocketClient.cxx \ constants.cxx \ + format_utils.cxx \ gc_format.cxx \ logging.cxx \ - SocketClient.cxx \ socket_listener.cxx \ - ServiceLocator.cxx \ CXXTESTSRCS += BufferQueue.cxxtest \ From ea41047b1bc27c2cc29e4b5ba303b9bb10096009 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 11:49:30 +0100 Subject: [PATCH 27/38] Refactors the fd optimization code to FdUtils class. --- src/utils/ClientConnection.cxx | 38 +++------- src/utils/ClientConnection.hxx | 13 +--- src/utils/FdUtils.cxx | 126 +++++++++++++++++++++++++++++++++ src/utils/FdUtils.hxx | 16 +++++ src/utils/GcTcpHub.cxx | 13 +--- src/utils/sources | 1 + 6 files changed, 156 insertions(+), 51 deletions(-) create mode 100644 src/utils/FdUtils.cxx diff --git a/src/utils/ClientConnection.cxx b/src/utils/ClientConnection.cxx index 9f002ac27..b73345e9b 100644 --- a/src/utils/ClientConnection.cxx +++ b/src/utils/ClientConnection.cxx @@ -38,40 +38,18 @@ #include "netinet/tcp.h" #include "nmranet_config.h" -/** Callback from try_connect to donate the file descriptor. @param fd is - * the file destriptor of the connection freshly opened. */ +#include "utils/FdUtils.hxx" + +/// Callback from try_connect to donate the file descriptor. +/// @param fd is the file destriptor of the connection freshly opened. void GCFdConnectionClient::connection_complete(int fd) { - struct stat statbuf; - fstat(fd, &statbuf); - const bool use_select = (config_gridconnect_tcp_use_select() == CONSTANT_TRUE); + + // Applies kernel parameters like socket options. + FdUtils::optimize_fd(fd); + fd_ = fd; - const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); - if (rcvbuf > 1) - { - ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); - } - const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); - if (sndbuf > 1) - { - ::setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); - int ret = 0; - socklen_t retsize = sizeof(ret); - ::getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ret, &retsize); - LOG(ALWAYS, "fd %d sndbuf %d", fd, ret); - } - const int lowat = 4096; - if (lowat > 1 && S_ISSOCK(statbuf.st_mode)) - { - ERRNOCHECK("tcp lowat", - ::setsockopt( - fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &lowat, sizeof(lowat))); - int ret = 0; - socklen_t retsize = sizeof(ret); - ::getsockopt(fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &ret, &retsize); - LOG(ALWAYS, "fd %d lowat %d", fd, ret); - } create_gc_port_for_can_hub(hub_, fd, &closedNotify_, use_select); } diff --git a/src/utils/ClientConnection.hxx b/src/utils/ClientConnection.hxx index 84802dd78..2641ee2de 100644 --- a/src/utils/ClientConnection.hxx +++ b/src/utils/ClientConnection.hxx @@ -36,11 +36,11 @@ #define _UTILS_CLIENTCONNECTION_HXX_ #include -#include /* tc* functions */ #include #include "utils/GridConnectHub.hxx" #include "utils/socket_listener.hxx" +#include "utils/FdUtils.hxx" /// Abstract base class for the Hub's connections. class ConnectionClient @@ -170,18 +170,9 @@ private: int fd = ::open(dev_.c_str(), O_RDWR); if (fd >= 0) { - // Sets up the terminal in raw mode. Otherwise linux might echo - // characters coming in from the device and that will make - // packets go back to where they came from. - HASSERT(!tcflush(fd, TCIOFLUSH)); - struct termios settings; - HASSERT(!tcgetattr(fd, &settings)); - cfmakeraw(&settings); - cfsetspeed(&settings, B115200); - HASSERT(!tcsetattr(fd, TCSANOW, &settings)); + FdUtils::optimize_tty_fd(fd); LOG(INFO, "Opened device %s.\n", dev_.c_str()); connection_complete(fd); - // } else { diff --git a/src/utils/FdUtils.cxx b/src/utils/FdUtils.cxx new file mode 100644 index 000000000..c1f62f251 --- /dev/null +++ b/src/utils/FdUtils.cxx @@ -0,0 +1,126 @@ +/** \copyright + * Copyright (c) 2023, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file FdUtils.cxx + * + * Helper functions for dealing with posix fds. + * + * @author Balazs Racz + * @date 29 Dec 2023 + */ + +#include "FdUtils.hxx" + +#include +#include +#include +#include /* tc* functions */ + +#include "nmranet_config.h" + +#define PCALL_LOGERR(where, callfn, fd, args...) \ + do \ + { \ + int ret = callfn(fd, args); \ + if (ret < 0) \ + { \ + char buf[256]; \ + strerror_r(errno, buf, sizeof(buf)); \ + LOG_ERROR("fd %d %s: %s", fd, where, buf); \ + } \ + } while (0) + +/// Optimizes the kernel settings like socket and TCP options for an fd +/// that is an outgoing TCP socket. +/// @param fd socket file descriptor. +void FdUtils::optimize_socket_fd(int fd) +{ +#ifdef __linux__ + const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); + if (rcvbuf > 1) + { + PCALL_LOGERR("setsockopt SO_RCVBUF", ::setsockopt, fd, SOL_SOCKET, + SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); + } + const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); + if (sndbuf > 1) + { + PCALL_LOGERR("setsockopt SO_SNDBUF", ::setsockopt, fd, SOL_SOCKET, + SO_SNDBUF, &sndbuf, sizeof(sndbuf)); + int ret = 0; + socklen_t retsize = sizeof(ret); + ::getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ret, &retsize); + LOG(ALWAYS, "fd %d sndbuf %d", fd, ret); + } + const int lowat = 4096; + if (lowat > 1) + { + PCALL_LOGERR("setsockopt tcp_notsent_lowat", ::setsockopt, fd, + IPPROTO_TCP, TCP_NOTSENT_LOWAT, &lowat, sizeof(lowat)); + int ret = 0; + socklen_t retsize = sizeof(ret); + ::getsockopt(fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &ret, &retsize); + LOG(ALWAYS, "fd %d lowat %d", fd, ret); + } +#endif +} + +/// Sets the kernel settings like queuing and terminal settings for an fd +/// that is an outgoing tty. +/// @param fd tty file descriptor. +void FdUtils::optimize_tty_fd(int fd) +{ +#ifdef __linux__ + // Sets up the terminal in raw mode. Otherwise linux might echo + // characters coming in from the device and that will make + // packets go back to where they came from. + HASSERT(!tcflush(fd, TCIOFLUSH)); + struct termios settings; + HASSERT(!tcgetattr(fd, &settings)); + cfmakeraw(&settings); + cfsetspeed(&settings, B115200); + HASSERT(!tcsetattr(fd, TCSANOW, &settings)); +#endif +} + +/// For an fd that is an outgoing link, detects what kind of file +/// descriptor this is and calls the appropriate optimize call for it. +void FdUtils::optimize_fd(int fd) +{ +#ifdef __linux__ + struct stat statbuf; + fstat(fd, &statbuf); + + if (S_ISSOCK(statbuf.st_mode)) + { + optimize_socket_fd(fd); + } + else if (isatty(fd)) + { + optimize_tty_fd(fd); + } +#endif +} diff --git a/src/utils/FdUtils.hxx b/src/utils/FdUtils.hxx index 452894f7e..6ed4b6dc9 100644 --- a/src/utils/FdUtils.hxx +++ b/src/utils/FdUtils.hxx @@ -35,6 +35,8 @@ #ifndef _UTILS_FD_UTILS_HXX_ #define _UTILS_FD_UTILS_HXX_ +#include + #include "utils/logging.h" #include "utils/macros.h" @@ -81,6 +83,20 @@ struct FdUtils dst += ret; } } + + /// Optimizes the kernel settings like socket and TCP options for an fd + /// that is an outgoing TCP socket. + /// @param fd socket file descriptor. + static void optimize_socket_fd(int fd); + + /// Sets the kernel settings like queuing and terminal settings for an fd + /// that is an outgoing tty. + /// @param fd tty file descriptor. + static void optimize_tty_fd(int fd); + + /// For an fd that is an outgoing link, detects what kind of file + /// descriptor this is and calls the appropriate optimize call for it. + static void optimize_fd(int fd); }; #endif // _UTILS_FD_UTILS_HXX_ diff --git a/src/utils/GcTcpHub.cxx b/src/utils/GcTcpHub.cxx index 043b9f216..acded59ad 100644 --- a/src/utils/GcTcpHub.cxx +++ b/src/utils/GcTcpHub.cxx @@ -38,6 +38,7 @@ #include "nmranet_config.h" #include "utils/GridConnectHub.hxx" +#include "utils/FdUtils.hxx" void GcTcpHub::on_new_connection(int fd) { @@ -47,16 +48,8 @@ void GcTcpHub::on_new_connection(int fd) AtomicHolder h(this); numClients_++; } - const int rcvbuf = config_gridconnect_tcp_rcv_buffer_size(); - if (rcvbuf > 1) - { - ::setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); - } - const int sndbuf = config_gridconnect_tcp_snd_buffer_size(); - if (sndbuf > 1) - { - ::setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); - } + // Applies kernel parameters like socket options. + FdUtils::optimize_socket_fd(fd); create_gc_port_for_can_hub(canHub_, fd, this, use_select); } diff --git a/src/utils/sources b/src/utils/sources index 5ab782c84..e9fed0849 100644 --- a/src/utils/sources +++ b/src/utils/sources @@ -12,6 +12,7 @@ CXXSRCS += \ StringPrintf.cxx \ Buffer.cxx \ ConfigUpdateListener.cxx \ + FdUtils.cxx \ FileUtils.cxx \ ForwardAllocator.cxx \ GcStreamParser.cxx \ From 976af9e3a2e6c1efdb0df571ef0853345acf30db Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 11:59:53 +0100 Subject: [PATCH 28/38] Makes tcp_lowat also configurable. --- include/nmranet_config.h | 4 ++++ src/utils/FdUtils.cxx | 2 +- src/utils/constants.cxx | 2 ++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/include/nmranet_config.h b/include/nmranet_config.h index 5f09d9008..58207fa4d 100644 --- a/include/nmranet_config.h +++ b/include/nmranet_config.h @@ -112,6 +112,10 @@ DECLARE_CONST(gridconnect_tcp_rcv_buffer_size); /// setsockopt(SO_SENDBUF). Set to 1 (default) to not bound it. DECLARE_CONST(gridconnect_tcp_snd_buffer_size); +/// TCP_NOTSENT_LOWAT kernel parameter (in bytes) for TCP links. Used via +/// setsockopt. Set to 1 (default) to not bound it. +DECLARE_CONST(gridconnect_tcp_notsent_lowat_buffer_size); + /** Number of bytes of gridconnect data to buffer before sending off the * lowlevel system (such as TCP socket). */ DECLARE_CONST(gridconnect_buffer_size); diff --git a/src/utils/FdUtils.cxx b/src/utils/FdUtils.cxx index c1f62f251..b380193b0 100644 --- a/src/utils/FdUtils.cxx +++ b/src/utils/FdUtils.cxx @@ -75,7 +75,7 @@ void FdUtils::optimize_socket_fd(int fd) ::getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ret, &retsize); LOG(ALWAYS, "fd %d sndbuf %d", fd, ret); } - const int lowat = 4096; + const int lowat = config_gridconnect_tcp_notsent_lowat_buffer_size(); if (lowat > 1) { PCALL_LOGERR("setsockopt tcp_notsent_lowat", ::setsockopt, fd, diff --git a/src/utils/constants.cxx b/src/utils/constants.cxx index bef6b033f..1d70bbd49 100644 --- a/src/utils/constants.cxx +++ b/src/utils/constants.cxx @@ -154,6 +154,8 @@ DEFAULT_CONST(gridconnect_bridge_max_outgoing_packets, 1); DEFAULT_CONST(gridconnect_tcp_rcv_buffer_size, 1); /// 1 = don't set DEFAULT_CONST(gridconnect_tcp_snd_buffer_size, 1); +/// 1 = don't set +DEFAULT_CONST(gridconnect_tcp_notsent_lowat_buffer_size, 1); DEFAULT_CONST_FALSE(gridconnect_tcp_use_select); From f3842150a1f62c56b210bf5fb86cd7f139738d04 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 11:59:58 +0100 Subject: [PATCH 29/38] Fix compile error. --- src/utils/ClientConnection.hxx | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils/ClientConnection.hxx b/src/utils/ClientConnection.hxx index 2641ee2de..553e7fa72 100644 --- a/src/utils/ClientConnection.hxx +++ b/src/utils/ClientConnection.hxx @@ -37,6 +37,7 @@ #include #include +#include #include "utils/GridConnectHub.hxx" #include "utils/socket_listener.hxx" From 7bbc54558622a4f0fa98054f1cf9085d870ed013 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 12:06:43 +0100 Subject: [PATCH 30/38] Adds comment. --- src/utils/FdUtils.cxx | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/utils/FdUtils.cxx b/src/utils/FdUtils.cxx index b380193b0..e9dc07dcf 100644 --- a/src/utils/FdUtils.cxx +++ b/src/utils/FdUtils.cxx @@ -41,6 +41,12 @@ #include "nmranet_config.h" +/// Performs a system call on an fd. If an error is returned, prints the error +/// using the log mechanism, but otherwise ignores it. +/// @param where user-readable text printed with the error, e.g. "setsockopt" +/// @param callfn system call, like ::setsockopt +/// @param fd file descriptor (int) +/// @param args... all other arguments to callfn #define PCALL_LOGERR(where, callfn, fd, args...) \ do \ { \ From 6ffb38e1729a60435677c4268b3455fb49b44068 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 14:09:12 +0100 Subject: [PATCH 31/38] Switch gridconnect hub to LimitedPool instead of FixedPool. This needs no dedicated memory, but uses the regular mainBufferPool. Add a check that all outgoing packets are released before deleting the pool. This solves hub crashing when a client disconnects. --- src/utils/GridConnectHub.cxx | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/utils/GridConnectHub.cxx b/src/utils/GridConnectHub.cxx index b23bdb4f1..094dc3de8 100644 --- a/src/utils/GridConnectHub.cxx +++ b/src/utils/GridConnectHub.cxx @@ -109,7 +109,8 @@ class GCAdapter : public GCAdapterBase bool shutdown() OVERRIDE { unregister(); - return formatter_.shutdown() && parser_.is_waiting() && formatter_.is_waiting(); + return formatter_.shutdown() && parser_.shutdown_ready() && + formatter_.is_waiting(); } /// HubPort (on a CAN-typed hub) that turns a binary CAN packet into a @@ -250,11 +251,28 @@ class GCAdapter : public GCAdapterBase int max_frames_to_parse = config_gridconnect_bridge_max_incoming_packets(); if (max_frames_to_parse > 1) { - frameAllocator_.reset(new FixedPool( + frameAllocator_.reset(new LimitedPool( sizeof(CanHubFlow::buffer_type), max_frames_to_parse)); } } + /// @return true when this object can be deleted. This is typically + /// once all outgoing packets are released back to the pool, and there + /// is no incoming data processing happening. + bool shutdown_ready() + { + int max_frames_to_parse = + config_gridconnect_bridge_max_incoming_packets(); + if (max_frames_to_parse > 1) + { + if (frameAllocator_->free_items() < (size_t)max_frames_to_parse) + { + return false; + } + } + return is_waiting(); + } + /// @return the destination to write data to. CanHubFlow *destination() { @@ -317,7 +335,7 @@ class GCAdapter : public GCAdapterBase // Allocator to get the frame from. If NULL, the target's default // buffer pool will be used. - std::unique_ptr frameAllocator_; + std::unique_ptr frameAllocator_; // ==== static data ==== From b2eb101b4592d136192023170c5376714f673769 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 14:09:34 +0100 Subject: [PATCH 32/38] Tune TCP options. --- applications/hub/main.cxx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/applications/hub/main.cxx b/applications/hub/main.cxx index ab2c12f1c..43b57cd81 100644 --- a/applications/hub/main.cxx +++ b/applications/hub/main.cxx @@ -59,7 +59,8 @@ OVERRIDE_CONST(gridconnect_buffer_delay_usec, 2000); OVERRIDE_CONST(gridconnect_bridge_max_incoming_packets, 5); OVERRIDE_CONST(gridconnect_bridge_max_outgoing_packets, 5); OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 8192); -OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 3100); +OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 8192); +OVERRIDE_CONST(gridconnect_tcp_notsent_lowat_buffer_size, 1024); OVERRIDE_CONST_TRUE(gridconnect_tcp_use_select); From 993c2b03e61db3e580a2a8e927dfef9c0fc949bb Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 14:09:43 +0100 Subject: [PATCH 33/38] hubtest: tune TCP options. --- applications/hub_test/targets/linux.x86/main.cxx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx index 27d31d440..49bb6484c 100644 --- a/applications/hub_test/targets/linux.x86/main.cxx +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -46,8 +46,9 @@ OVERRIDE_CONST(gc_generate_newlines, 1); OVERRIDE_CONST(gridconnect_bridge_max_outgoing_packets, 2); -OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 1024); +OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 8192); OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 8192); +OVERRIDE_CONST(gridconnect_tcp_notsent_lowat_buffer_size, 1024); Executor<1> g_executor("g_executor", 0, 1024); From f52ca25c526df373bed23b0840b6733efe273d7d Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 14:09:12 +0100 Subject: [PATCH 34/38] Switch gridconnect hub to LimitedPool instead of FixedPool. This needs no dedicated memory, but uses the regular mainBufferPool. Add a check that all outgoing packets are released before deleting the pool. This solves hub crashing when a client disconnects. --- src/utils/GridConnectHub.cxx | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/utils/GridConnectHub.cxx b/src/utils/GridConnectHub.cxx index b148d06ae..3f99a62ee 100644 --- a/src/utils/GridConnectHub.cxx +++ b/src/utils/GridConnectHub.cxx @@ -109,7 +109,8 @@ class GCAdapter : public GCAdapterBase bool shutdown() OVERRIDE { unregister(); - return formatter_.shutdown() && parser_.is_waiting() && formatter_.is_waiting(); + return formatter_.shutdown() && parser_.shutdown_ready() && + formatter_.is_waiting(); } /// HubPort (on a CAN-typed hub) that turns a binary CAN packet into a @@ -218,11 +219,28 @@ class GCAdapter : public GCAdapterBase int max_frames_to_parse = config_gridconnect_bridge_max_incoming_packets(); if (max_frames_to_parse > 1) { - frameAllocator_.reset(new FixedPool( + frameAllocator_.reset(new LimitedPool( sizeof(CanHubFlow::buffer_type), max_frames_to_parse)); } } + /// @return true when this object can be deleted. This is typically + /// once all outgoing packets are released back to the pool, and there + /// is no incoming data processing happening. + bool shutdown_ready() + { + int max_frames_to_parse = + config_gridconnect_bridge_max_incoming_packets(); + if (max_frames_to_parse > 1) + { + if (frameAllocator_->free_items() < (size_t)max_frames_to_parse) + { + return false; + } + } + return is_waiting(); + } + /// @return the destination to write data to. CanHubFlow *destination() { @@ -285,7 +303,7 @@ class GCAdapter : public GCAdapterBase // Allocator to get the frame from. If NULL, the target's default // buffer pool will be used. - std::unique_ptr frameAllocator_; + std::unique_ptr frameAllocator_; // ==== static data ==== From 3e6e3ac57ea5ed21a8040fd04f8f88ed097d0a64 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 29 Dec 2023 14:09:34 +0100 Subject: [PATCH 35/38] Tune TCP options. --- applications/hub/main.cxx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/applications/hub/main.cxx b/applications/hub/main.cxx index ab2c12f1c..43b57cd81 100644 --- a/applications/hub/main.cxx +++ b/applications/hub/main.cxx @@ -59,7 +59,8 @@ OVERRIDE_CONST(gridconnect_buffer_delay_usec, 2000); OVERRIDE_CONST(gridconnect_bridge_max_incoming_packets, 5); OVERRIDE_CONST(gridconnect_bridge_max_outgoing_packets, 5); OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 8192); -OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 3100); +OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 8192); +OVERRIDE_CONST(gridconnect_tcp_notsent_lowat_buffer_size, 1024); OVERRIDE_CONST_TRUE(gridconnect_tcp_use_select); From 6e31c7e2d0a2ea5c28fc7c67d4f5306fa39ded4d Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 1 Jan 2024 18:08:15 +0100 Subject: [PATCH 36/38] Changes TCP tuning params: no snd/recv buf limitation, but notsent_lowat with 1460. --- applications/hub_test/targets/linux.x86/main.cxx | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx index 49bb6484c..20eae00b3 100644 --- a/applications/hub_test/targets/linux.x86/main.cxx +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -46,10 +46,14 @@ OVERRIDE_CONST(gc_generate_newlines, 1); OVERRIDE_CONST(gridconnect_bridge_max_outgoing_packets, 2); -OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 8192); -OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 8192); -OVERRIDE_CONST(gridconnect_tcp_notsent_lowat_buffer_size, 1024); - +//OVERRIDE_CONST(gridconnect_tcp_snd_buffer_size, 8192); +//OVERRIDE_CONST(gridconnect_tcp_rcv_buffer_size, 8192); +OVERRIDE_CONST(gridconnect_tcp_notsent_lowat_buffer_size, 1460); + +// Maximum 100 TCP packets per second. +OVERRIDE_CONST(gridconnect_buffer_delay_usec, 10000); +// Or one full packet. +OVERRIDE_CONST(gridconnect_buffer_size, 1460); Executor<1> g_executor("g_executor", 0, 1024); Service g_service(&g_executor); From a10430c1171c7e3aad04ac4da9a62d7a98b23faf Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 26 Jan 2024 21:09:09 +0100 Subject: [PATCH 37/38] Adds new features to the hub_test: - support creating multiple TCP clients with -N number - support sending different packets, like identify producer and verify node ID which allows measuring response latency of a node. --- .../hub_test/targets/linux.x86/main.cxx | 165 ++++++++++++++++-- 1 file changed, 148 insertions(+), 17 deletions(-) diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx index 20eae00b3..e626c2537 100644 --- a/applications/hub_test/targets/linux.x86/main.cxx +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -43,6 +43,8 @@ #include "utils/LimitedPool.hxx" #include "utils/StringPrintf.hxx" #include "utils/Stats.hxx" +#include "openlcb/Convert.hxx" + OVERRIDE_CONST(gc_generate_newlines, 1); OVERRIDE_CONST(gridconnect_bridge_max_outgoing_packets, 2); @@ -75,16 +77,43 @@ struct Link std::unique_ptr receiver; }; +/// Defines which packet type we should send to the output interface. +enum class PacketType { + /// Send an event report, listen for the same message coming back. + EVENT_REPORT, + /// Send an Identify Consumer, listen for Consumer Identified + /// valid/invalid/unknown. + EVENT_IDENTIFY, + /// Ping: send an addressed Verify Node ID, listen for Node ID Verified. + NODE_ID_VERIFY, +}; + /// We generate packets to this interface. Link output_port; /// Traffic to generate, default is to saturate the link. int pkt_per_sec = -1; +/// Which packet to send/expect. +PacketType pkt_type = PacketType::EVENT_REPORT; +/// If we are using an addressed ping, which node should we ping. +openlcb::NodeID destination_nodeid; + +/// We use this special value in the "upstream port" argument to denote a +/// receiver link that should be a loopback. +static constexpr int LOOPBACK_PORT_NUMBER = -999; /// How many pending buffers we can allow for the send flow. static constexpr unsigned SEND_PARALLELISM = 10; /// The output frames will go wioth this CAN ID. -static constexpr uint32_t SEND_HEADER = 0x195b4ffe; +static constexpr uint32_t SEND_HEADER_EVENT = 0x195b4ffe; +static constexpr uint32_t SEND_HEADER_IDENT = 0x198f4ffe; +static constexpr uint32_t SEND_HEADER_NODEID = 0x19490ffe; +/// The response frames come with the CAN ID (masked). +static constexpr uint32_t RECV_HEADER_IDENT = 0x194c7fff; +static constexpr uint32_t RECV_MASK_IDENT = ~0x00003fff; +static constexpr uint32_t RECV_HEADER_NODEID = 0x19170fff; +static constexpr uint32_t RECV_MASK_NODEID = ~0x00000fff; + /// The output frames will go with this CAN data bytes. This is NMRA ID 1, /// which is not assigned. Lower four bytes are the id. static constexpr uint8_t SEND_PAYLOAD[8] = {0x9, 0x0, 0x01, 0x39, 0, 0, 0, 0}; @@ -125,7 +154,7 @@ void usage(const char *e) fprintf(stderr, "Usage: %s (-d device_path | [-q upstream_port] -u upstream_host) [-s " "speed]\n\t(-D device_path | [-Q upstream_port] -U " - "upstream_host)...\n\n", + "upstream_host) [-N repeat] [-i | -n node_id]...\n\n", e); fprintf(stderr, "\tdevice_path is a path to a physical device doing " @@ -139,15 +168,23 @@ void usage(const char *e) "\t-d -q -u specifies the output port, -D -Q -U specifies input ports. " "-Q must be before -U. Multiple input ports can be specified.\n"); fprintf(stderr, - "\t-s speed is the packets/sec to generate. Set to -1 for utomatic " + "\t-s speed is the packets/sec to generate. Set to -1 for automatic " "(saturation).\n"); + fprintf(stderr, + "\t-N repeat will open this many input ports with the last settings.\n"); + fprintf(stderr, + "\t-i selects using event identify packets to test node response latency.\n"); + fprintf(stderr, + "\t-n node_id selects using verify node id global packets to test node response latency. These packets are not ordered. Node id should be like 0x0501010118FF\n"); + fprintf(stderr, + "\t-l adds a loopback receiver. This makes sense with -i or -n packet types.\n"); exit(1); } void parse_args(int argc, char *argv[]) { int opt; - while ((opt = getopt(argc, argv, "hd:u:q:s:D:U:Q:")) >= 0) + while ((opt = getopt(argc, argv, "hd:u:q:s:D:U:Q:in:N:l")) >= 0) { switch (opt) { @@ -172,12 +209,40 @@ void parse_args(int argc, char *argv[]) input_ports.back().upstream_host = optarg; input_ports.back().upstream_port = in_upstream_port; break; + case 'l': + input_ports.emplace_back(); + input_ports.back().upstream_port = LOOPBACK_PORT_NUMBER; + break; + case 'N': + { + int rept = strtol(optarg, nullptr, 0); + unsigned ref = input_ports.size() - 1; + for (int idx = 0; idx < rept - 1; ++idx) + { + if (!input_ports[ref].upstream_host) + { + DIE("-N multiplicity can not be used with device link " + "(-D /dev/ttyXXX)"); + } + input_ports.emplace_back(); + input_ports.back().upstream_host = input_ports[ref].upstream_host; + input_ports.back().upstream_port = input_ports[ref].upstream_port; + } + break; + } case 'Q': in_upstream_port = atoi(optarg); break; case 's': pkt_per_sec = atoi(optarg); break; + case 'i': + pkt_type = PacketType::EVENT_IDENTIFY; + break; + case 'n': + pkt_type = PacketType::NODE_ID_VERIFY; + destination_nodeid = strtoll(optarg, nullptr, 16); + break; default: fprintf(stderr, "Unknown option %c\n", opt); usage(argv[0]); @@ -242,34 +307,77 @@ class Receiver : public CanHubPortInterface hub->register_port(this); } + // This function handles an incoming CAN frame. void send(message_type *buf, unsigned prio) override { auto ts = os_get_time_monotonic(); auto rb = get_buffer_deleter(buf); const struct can_frame &f = buf->data()->frame(); - if (f.can_dlc != 8 || !IS_CAN_FRAME_EFF(f)) + if (!IS_CAN_FRAME_EFF(f)) { // not interesting frame ++numUnknownFrames_; return; } auto id = GET_CAN_FRAME_ID_EFF(f); - if (id != SEND_HEADER) - { - // not interesting frame - ++numUnknownFrames_; - return; + uint32_t cnt = 0; + bool skip = true; + switch(pkt_type) { + case PacketType::EVENT_REPORT: { + if (id != SEND_HEADER_EVENT) { + break; + } + if (f.can_dlc != 8 || memcmp(f.data, SEND_PAYLOAD, 4) != 0) + { + break; + } + memcpy(&cnt, f.data + 4, 4); + cnt = be32toh(cnt); + skip = false; + break; + } + case PacketType::EVENT_IDENTIFY: { + if ((id & RECV_MASK_IDENT) != + (RECV_HEADER_IDENT & RECV_MASK_IDENT)) + { + break; + } + if (f.can_dlc != 8 || memcmp(f.data, SEND_PAYLOAD, 4) != 0) + { + break; + } + memcpy(&cnt, f.data + 4, 4); + cnt = be32toh(cnt); + skip = false; + break; + } + case PacketType::NODE_ID_VERIFY: { + if ((id & RECV_MASK_NODEID) != + (RECV_HEADER_NODEID & RECV_MASK_NODEID)) + { + LOG(VERBOSE, "response with wrong header %x", id); + break; + } + openlcb::NodeID actual = openlcb::data_to_node_id(f.data); + if (actual != destination_nodeid) { + + LOG(INFO, "response with wrong node id %012" PRIx64 " vs %012" PRIx64, actual, destination_nodeid); + break; + } + // This packet type does not carry a counter. We just assume + // this is the next correct packet. + cnt = nextPacket_; + skip = false; + break; + } } - if (memcmp(f.data, SEND_PAYLOAD, 4) != 0) + if (skip) { // not interesting frame ++numUnknownFrames_; return; } ++numFrames_; - uint32_t cnt = 0; - memcpy(&cnt, f.data + 4, 4); - cnt = be32toh(cnt); if (cnt == nextPacket_) { ++numInOrder_; @@ -349,6 +457,12 @@ void add_link(Link *link, bool is_receiver) new DeviceConnectionClient(StringPrintf("device%d", id), link->can_hub.get(), link->device_path)); } + else if (link->upstream_port == LOOPBACK_PORT_NUMBER) + { + // Loopback port is connected to the can hub of the output. + link->receiver.reset(new Receiver(output_port.can_hub.get())); + is_receiver = false; + } else { usage(arg0); @@ -409,11 +523,28 @@ class SendFlow : public StateFlow, QList<1>> auto *b = get_allocation_result(output_port.can_hub.get()); b->set_done(&pinfo_->bn_); auto &f = *b->data()->mutable_frame(); - SET_CAN_FRAME_ID_EFF(f, SEND_HEADER); f.can_dlc = 8; - memcpy(f.data, SEND_PAYLOAD, 8); uint32_t idx_be = htobe32(pinfo_->index_); - memcpy(f.data + 4, &idx_be, 4); + switch(pkt_type) { + case PacketType::EVENT_REPORT: { + SET_CAN_FRAME_ID_EFF(f, SEND_HEADER_EVENT); + memcpy(f.data, SEND_PAYLOAD, 8); + memcpy(f.data + 4, &idx_be, 4); + break; + } + case PacketType::EVENT_IDENTIFY: { + SET_CAN_FRAME_ID_EFF(f, SEND_HEADER_IDENT); + memcpy(f.data, SEND_PAYLOAD, 8); + memcpy(f.data + 4, &idx_be, 4); + break; + } + case PacketType::NODE_ID_VERIFY: { + SET_CAN_FRAME_ID_EFF(f, SEND_HEADER_NODEID); + f.can_dlc = 6; + openlcb::node_id_to_data(destination_nodeid, f.data); + break; + } + } b->data()->skipMember_ = nullptr; pinfo_->pendingReceivers_ = std::max(0, g_num_live_links - 1); g_num_packets_sent++; From 5786c59d3a818a943568beb9f106b95d24496817 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 26 Jan 2024 21:09:40 +0100 Subject: [PATCH 38/38] Removes output buffer delay. --- applications/hub_test/targets/linux.x86/main.cxx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/applications/hub_test/targets/linux.x86/main.cxx b/applications/hub_test/targets/linux.x86/main.cxx index e626c2537..bd1dbc89f 100644 --- a/applications/hub_test/targets/linux.x86/main.cxx +++ b/applications/hub_test/targets/linux.x86/main.cxx @@ -53,7 +53,8 @@ OVERRIDE_CONST(gridconnect_bridge_max_outgoing_packets, 2); OVERRIDE_CONST(gridconnect_tcp_notsent_lowat_buffer_size, 1460); // Maximum 100 TCP packets per second. -OVERRIDE_CONST(gridconnect_buffer_delay_usec, 10000); +//OVERRIDE_CONST(gridconnect_buffer_delay_usec, 10000); +OVERRIDE_CONST(gridconnect_buffer_delay_usec, 100); // Or one full packet. OVERRIDE_CONST(gridconnect_buffer_size, 1460);