From de06145b2b2eb7f5da6e362593fcc440a0ca240b Mon Sep 17 00:00:00 2001 From: halx99 Date: Wed, 31 Jan 2024 19:34:38 +0800 Subject: [PATCH 01/11] Refactor kcp implementation - slice data to kcp mss - schedule timer to invoke ikcp_update --- tests/speed/main.cpp | 64 ++++++++++--------------- yasio/io_service.cpp | 112 ++++++++++++------------------------------- yasio/io_service.hpp | 9 ++-- 3 files changed, 59 insertions(+), 126 deletions(-) diff --git a/tests/speed/main.cpp b/tests/speed/main.cpp index a5c88c3e..0074b27e 100644 --- a/tests/speed/main.cpp +++ b/tests/speed/main.cpp @@ -43,11 +43,14 @@ Test detail, please see: https://github.com/yasio/yasio/blob/master/benchmark.md #define SPEEDTEST_TRANSFER_PROTOCOL SPEEDTEST_PROTO_KCP #if SPEEDTEST_TRANSFER_PROTOCOL == SPEEDTEST_PROTO_TCP -# define SPEEDTEST_DEFAULT_KIND YCK_TCP_CLIENT | SPEEDTEST_SSL_MASK +# define SPEEDTEST_SERVER_KIND YCK_TCP_SERVER +# define SPEEDTEST_CLIENT_KIND YCK_TCP_CLIENT | SPEEDTEST_SSL_MASK #elif SPEEDTEST_TRANSFER_PROTOCOL == SPEEDTEST_PROTO_UDP -# define SPEEDTEST_DEFAULT_KIND YCK_UDP_CLIENT +# define SPEEDTEST_SERVER_KIND YCK_UDP_SERVER +# define SPEEDTEST_CLIENT_KIND YCK_UDP_CLIENT #elif SPEEDTEST_TRANSFER_PROTOCOL == SPEEDTEST_PROTO_KCP -# define SPEEDTEST_DEFAULT_KIND YCK_KCP_CLIENT +# define SPEEDTEST_SERVER_KIND YCK_KCP_SERVER +# define SPEEDTEST_CLIENT_KIND YCK_KCP_CLIENT #else # error "please define SPEEDTEST_TRANSFER_PROTOCOL to one of SPEEDTEST_PROTO_TCP, SPEEDTEST_PROTO_UDP, SPEEDTEST_PROTO_KCP" #endif @@ -56,40 +59,29 @@ Test detail, please see: https://github.com/yasio/yasio/blob/master/benchmark.md #define SPEEDTEST_KCP_MTU 65507 #define SPEEDTEST_KCP_MSS (SPEEDTEST_KCP_MTU - 24) -#if SPEEDTEST_TRANSFER_PROTOCOL == SPEEDTEST_PROTO_TCP namespace speedtest { enum { RECEIVER_PORT = 3002, SENDER_PORT = RECEIVER_PORT, -# if !SPEEDTEST_VIA_UDS - RECEIVER_CHANNEL_KIND = YCK_TCP_SERVER | SPEEDTEST_SSL_MASK, - SENDER_CHANNEL_KIND = SPEEDTEST_DEFAULT_KIND, -# else - RECEIVER_CHANNEL_KIND = YCK_TCP_SERVER | YCM_UDS, - SENDER_CHANNEL_KIND = SPEEDTEST_DEFAULT_KIND | YCM_UDS, -# endif -}; -} // namespace speedtest -# if !SPEEDTEST_VIA_UDS -# define SPEEDTEST_SOCKET_NAME "127.0.0.1" -# else -# define SPEEDTEST_SOCKET_NAME "speedtest.socket" -# endif +#if !SPEEDTEST_VIA_UDS + RECEIVER_CHANNEL_KIND = SPEEDTEST_SERVER_KIND | SPEEDTEST_SSL_MASK, + SENDER_CHANNEL_KIND = SPEEDTEST_CLIENT_KIND, #else -namespace speedtest -{ -enum -{ - RECEIVER_PORT = 3001, - SENDER_PORT = 3002, - RECEIVER_CHANNEL_KIND = SPEEDTEST_DEFAULT_KIND, - SENDER_CHANNEL_KIND = SPEEDTEST_DEFAULT_KIND, + RECEIVER_CHANNEL_KIND = SPEEDTEST_SERVER_KIND | YCM_UDS, + SENDER_CHANNEL_KIND = SPEEDTEST_CLIENT_KIND | YCM_UDS, +#endif }; +#if !SPEEDTEST_VIA_UDS # define SPEEDTEST_SOCKET_NAME "127.0.0.1" -} // namespace speedtest +# define SPEEDTEST_LISTEN_NAME "0.0.0.0" +#else + +# define SPEEDTEST_SOCKET_NAME "speedtest.socket" +# define SPEEDTEST_LISTEN_NAME SPEEDTEST_SOCKET_NAME #endif +} // namespace speedtest static const double s_send_limit_time = 10; // max send time in seconds @@ -99,7 +91,7 @@ static long long s_recv_total_bytes = 0; static double s_send_speed = 0; // bytes/s static double s_recv_speed = 0; -static const long long s_kcp_send_interval = 10; // (us) in microseconds +static const long long s_kcp_send_interval = 1000; // (ms) in milliseconds static const uint32_t s_kcp_conv = 8633; // can be any, but must same with two endpoint static const char* proto_name(int myproto) @@ -143,9 +135,9 @@ static void print_speed_detail(double interval, double time_elapsed) #if defined(YASIO_ENABLE_KCP) void setup_kcp_transfer(transport_handle_t handle) { - auto kcp_handle = static_cast(handle)->internal_object(); - ::ikcp_setmtu(kcp_handle, SPEEDTEST_KCP_MTU); - ::ikcp_wndsize(kcp_handle, 256, 1024); + // auto kcp_handle = static_cast(handle)->internal_object(); + //::ikcp_setmtu(kcp_handle, SPEEDTEST_KCP_MTU); + //::ikcp_wndsize(kcp_handle, 256, 1024); } #endif @@ -174,7 +166,7 @@ void kcp_send_repeated(io_service* service, transport_handle_t thandle, obstream static double time_elapsed = 0; static double last_print_time = 0; - highp_timer_ptr ignored_ret = service->schedule(std::chrono::microseconds(s_kcp_send_interval), [=](io_service&) { + highp_timer_ptr ignored_ret = service->schedule(std::chrono::milliseconds(s_kcp_send_interval), [=](io_service&) { s_send_total_bytes += service->write(thandle, obs->buffer()); time_elapsed = (yasio::highp_clock<>() - time_start) / 1000000.0; s_send_speed = s_send_total_bytes / time_elapsed; @@ -241,10 +233,6 @@ void start_sender(io_service& service) printf("Start trasnfer test via %s after 170ms...\n", proto_name(SPEEDTEST_TRANSFER_PROTOCOL)); std::this_thread::sleep_for(std::chrono::milliseconds(170)); -#if SPEEDTEST_TRANSFER_PROTOCOL != SPEEDTEST_PROTO_TCP - service.set_option(YOPT_C_LOCAL_PORT, 0, speedtest::RECEIVER_PORT); -#endif - #if SPEEDTEST_TRANSFER_PROTOCOL == SPEEDTEST_PROTO_KCP service.set_option(YOPT_C_KCP_CONV, 0, s_kcp_conv); #endif @@ -297,10 +285,6 @@ void start_receiver(io_service& service) } }); -#if SPEEDTEST_TRANSFER_PROTOCOL != SPEEDTEST_PROTO_TCP - service.set_option(YOPT_C_LOCAL_PORT, 0, speedtest::SENDER_PORT); -#endif - #if SPEEDTEST_TRANSFER_PROTOCOL == SPEEDTEST_PROTO_KCP service.set_option(YOPT_C_KCP_CONV, 0, s_kcp_conv); #endif diff --git a/yasio/io_service.cpp b/yasio/io_service.cpp index 973af4e3..51a507d3 100644 --- a/yasio/io_service.cpp +++ b/yasio/io_service.cpp @@ -142,8 +142,7 @@ namespace static const highp_time_t yasio__max_wait_usec = 5 * 60 * 1000 * 1000LL; // the max transport alloc size static const size_t yasio__max_tsize = (std::max)({sizeof(io_transport_tcp), sizeof(io_transport_udp), sizeof(io_transport_ssl), sizeof(io_transport_kcp)}); -static const int yasio_max_udp_data_mtu = - static_cast((std::numeric_limits::max)() - (sizeof(yasio::ip::ip_hdr_st) + sizeof(yasio::ip::udp_hdr_st))); +static const int yasio_udp_mss = static_cast((std::numeric_limits::max)() - (sizeof(yasio::ip::ip_hdr_st) + sizeof(yasio::ip::udp_hdr_st))); } // namespace struct yasio__global_state { enum @@ -487,7 +486,7 @@ void io_transport::set_primitives() else // UDP { this->write_cb_ = [this](const void* data, int len, const ip::endpoint*, int& error) { - int n = socket_->send(data, (std::min)(len, yasio_max_udp_data_mtu), YASIO_MSG_FLAG); + int n = socket_->send(data, (std::min)(len, yasio_udp_mss), YASIO_MSG_FLAG); if (n < 0) error = xxsocket::get_last_errno(); return n; @@ -636,7 +635,7 @@ void io_transport_udp::set_primitives() { this->write_cb_ = [this](const void* data, int len, const ip::endpoint* destination, int& error) { assert(destination); - int n = socket_->sendto(data, (std::min)(len, yasio_max_udp_data_mtu), *destination); + int n = socket_->sendto(data, (std::min)(len, yasio_udp_mss), *destination); if (n < 0) { error = xxsocket::get_last_errno(); @@ -669,7 +668,8 @@ int io_transport_udp::handle_input(const char* data, int bytes_transferred, int& #if defined(YASIO_ENABLE_KCP) // ----------------------- io_transport_kcp ------------------ -io_transport_kcp::io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s) : io_transport_udp(ctx, std::forward(s)) +io_transport_kcp::io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s) + : io_transport_udp(ctx, std::forward(s)), timer_for_update_(ctx->get_service()) { auto& kopts = ctx->kcp_options(); this->kcp_ = ::ikcp_create(static_cast(kopts.kcp_conv_), this); @@ -683,21 +683,37 @@ io_transport_kcp::io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s) : io_trans ::ikcp_setoutput(this->kcp_, [](const char* buf, int len, ::ikcpcb* /*kcp*/, void* user) { auto t = (io_transport_kcp*)user; int ignored_ec = 0; - return t->write_cb_(buf, len, std::addressof(t->ensure_destination()), ignored_ec); + return t->underlaying_write_cb_(buf, len, std::addressof(t->ensure_destination()), ignored_ec); + }); + + // schedule a update timer + timer_for_update_.expires_from_now(std::chrono::milliseconds(kopts.kcp_interval_)); + timer_for_update_.async_wait([=](io_service&) { + ::ikcp_update(kcp_, yasio::clock()); + return false; }); } io_transport_kcp::~io_transport_kcp() { ::ikcp_release(this->kcp_); } -int io_transport_kcp::write(io_send_buffer&& buffer, completion_cb_t&& handler) +void io_transport_kcp::set_primitives() { - std::lock_guard lck(send_mtx_); - int nsent = ::ikcp_send(kcp_, buffer.data(), static_cast(buffer.size())); - assert(nsent > 0); - if (handler) - handler(nsent > 0 ? 0 : nsent, nsent); - get_service().wakeup(); - return nsent; + io_transport_udp::set_primitives(); + underlaying_write_cb_ = write_cb_; + write_cb_ = [this](const void* data, int len, const ip::endpoint*, int& error) { + int nsent = ::ikcp_send(kcp_, static_cast(data), (std::min)(static_cast(kcp_->mss), len)); + if (nsent > 0) + { + ::ikcp_flush(kcp_); + get_service().wakeup(); + } + else if (nsent == -2) + error = EWOULDBLOCK; + else + error = yasio::errc::invalid_packet; + return nsent; + }; } + int io_transport_kcp::do_read(int revent, int& error, highp_time_t& wait_duration) { int n = this->call_read(&rawbuf_.front(), static_cast(rawbuf_.size()), revent, error); @@ -717,78 +733,12 @@ int io_transport_kcp::handle_input(const char* buf, int len, int& error, highp_t { // ikcp in event always in service thread, so no need to lock if (0 == ::ikcp_input(kcp_, buf, len)) - { - this->check_timeout(wait_duration); // call ikcp_check return len; - } // simply regards -1,-2,-3 as error and trigger connection lost event. error = yasio::errc::invalid_packet; return -1; } -bool io_transport_kcp::do_write(highp_time_t& wait_duration) -{ - std::lock_guard lck(send_mtx_); - - ::ikcp_update(kcp_, static_cast(::yasio::clock())); - ::ikcp_flush(kcp_); - this->check_timeout(wait_duration); // call ikcp_check - return true; -} -static IINT32 yasio_itimediff(IUINT32 later, IUINT32 earlier) { return static_cast(later - earlier); } -static IUINT32 yasio_ikcp_check(const ikcpcb* kcp, IUINT32 current, IUINT32 waitd_ms) -{ - IUINT32 ts_flush = kcp->ts_flush; - IINT32 tm_flush = 0x7fffffff; - IINT32 tm_packet = 0x7fffffff; - IUINT32 minimal = 0; - struct IQUEUEHEAD* p; - - if (kcp->updated == 0) - return current; - - if (yasio_itimediff(current, ts_flush) < -10000) - ts_flush = current; - - if (yasio_itimediff(current, ts_flush) >= 0) - return current; - - if (kcp->nsnd_que) - return current; - if (kcp->probe) - return current; - - if (kcp->rmt_wnd == 0 && yasio_itimediff(kcp->current, kcp->ts_probe) >= 0) - return current; - - tm_flush = yasio_itimediff(ts_flush, current); - - for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) - { - const IKCPSEG* seg = iqueue_entry(p, const IKCPSEG, node); - IINT32 diff = yasio_itimediff(seg->resendts, current); - if (diff <= 0) - { - return current; - } - if (diff < tm_packet) - tm_packet = diff; - } - - minimal = kcp->nsnd_buf ? static_cast(tm_packet < tm_flush ? tm_packet : tm_flush) : waitd_ms; - - return current + minimal; -} -void io_transport_kcp::check_timeout(highp_time_t& wait_duration) const -{ - auto current = static_cast(::yasio::clock()); - auto expire_time = yasio_ikcp_check(kcp_, current, static_cast(wait_duration / std::milli::den)); - highp_time_t duration = static_cast(expire_time - current) * std::milli::den; - if (duration < 0) - duration = 0; - if (wait_duration > duration) - wait_duration = duration; -} #endif // ------------------------ io_service ------------------------ void io_service::init_globals(const yasio::inet::print_fn2_t& prt) { yasio__shared_globals(prt).cprint_ = prt; } @@ -1690,7 +1640,7 @@ void io_service::handle_connect_succeed(transport_handle_t transport) if (yasio__testbits(ctx->properties_, YCM_UDP)) { constexpr int max_ip_mtu = static_cast((std::numeric_limits::max)()); - transport->socket_->set_optval(SOL_SOCKET, SO_SNDBUF, max_ip_mtu); + transport->socket_->set_optval(SOL_SOCKET, SO_SNDBUF, max_ip_mtu + 1); } #endif diff --git a/yasio/io_service.hpp b/yasio/io_service.hpp index 19fb4210..de1b679b 100644 --- a/yasio/io_service.hpp +++ b/yasio/io_service.hpp @@ -848,18 +848,17 @@ class io_transport_kcp : public io_transport_udp { ikcpcb* internal_object() { return kcp_; } protected: - YASIO__DECL int write(io_send_buffer&&, completion_cb_t&&) override; + + YASIO__DECL void set_primitives() override; YASIO__DECL int do_read(int revent, int& error, highp_time_t& wait_duration) override; - YASIO__DECL bool do_write(highp_time_t& wait_duration) override; YASIO__DECL int handle_input(const char* buf, int len, int& error, highp_time_t& wait_duration) override; - YASIO__DECL void check_timeout(highp_time_t& wait_duration) const; - sbyte_buffer rawbuf_; // the low level raw buffer ikcpcb* kcp_; - std::recursive_mutex send_mtx_; + highp_timer timer_for_update_; + std::function underlaying_write_cb_; }; #else class io_transport_kcp {}; From a22302ddcbe6b51f16bf9ec899a553ffe07a8c0c Mon Sep 17 00:00:00 2001 From: halx99 Date: Wed, 31 Jan 2024 19:39:15 +0800 Subject: [PATCH 02/11] Improve speedtest --- tests/speed/main.cpp | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/speed/main.cpp b/tests/speed/main.cpp index 0074b27e..2a16404b 100644 --- a/tests/speed/main.cpp +++ b/tests/speed/main.cpp @@ -240,7 +240,6 @@ void start_sender(io_service& service) service.open(0, speedtest::SENDER_CHANNEL_KIND); } -static io_service* s_sender; void start_receiver(io_service& service) { static long long time_start = yasio::highp_clock<>(); @@ -292,14 +291,24 @@ void start_receiver(io_service& service) service.open(0, speedtest::RECEIVER_CHANNEL_KIND); } -int main(int, char**) +int main(int argc, char** argv) { - io_hostent receiver_ep(SPEEDTEST_SOCKET_NAME, speedtest::RECEIVER_PORT), sender_ep(SPEEDTEST_SOCKET_NAME, speedtest::SENDER_PORT); + io_hostent receiver_ep(SPEEDTEST_LISTEN_NAME, speedtest::RECEIVER_PORT), sender_ep(SPEEDTEST_SOCKET_NAME, speedtest::SENDER_PORT); io_service receiver(&receiver_ep, 1), sender(&sender_ep, 1); - s_sender = &sender; - start_receiver(receiver); - start_sender(sender); + const char* mode = "host"; + if (argc > 1) + mode = argv[1]; + + if (cxx20::ic::iequals(mode, "server")) + start_receiver(receiver); + else if (cxx20::ic::iequals(mode, "client")) + start_sender(sender); + else + { + start_receiver(receiver); + start_sender(sender); + } static long long time_start = yasio::highp_clock<>(); while (true) From 78c68929273a11ab68af8e124b71ae9a304c6c6b Mon Sep 17 00:00:00 2001 From: halx99 Date: Thu, 1 Feb 2024 01:18:59 +0800 Subject: [PATCH 03/11] Improve kcp implementation --- yasio/config.hpp | 3 -- yasio/io_service.cpp | 67 +++++++++++++++++++++++++++----------------- yasio/io_service.hpp | 28 ++++++++++++------ 3 files changed, 60 insertions(+), 38 deletions(-) diff --git a/yasio/config.hpp b/yasio/config.hpp index 82c291c6..c0ffb82f 100644 --- a/yasio/config.hpp +++ b/yasio/config.hpp @@ -216,9 +216,6 @@ SOFTWARE. // The default ttl of multicast #define YASIO_DEFAULT_MULTICAST_TTL (int)128 -// The max internet buffer size -#define YASIO_INET_BUFFER_SIZE 65536 - // The max pdu buffer length, avoid large memory allocation when application decode a huge length. #define YASIO_MAX_PDU_BUFFER_SIZE static_cast(1 * 1024 * 1024) diff --git a/yasio/io_service.cpp b/yasio/io_service.cpp index 51a507d3..3ac4653b 100644 --- a/yasio/io_service.cpp +++ b/yasio/io_service.cpp @@ -47,7 +47,6 @@ SOFTWARE. #include "yasio/wtimer_hres.hpp" #if defined(YASIO_ENABLE_KCP) -# include "kcp/ikcp.h" struct yasio_kcp_options { int kcp_conv_ = 0; @@ -139,10 +138,10 @@ namespace { // By default we will wait no longer than 5 minutes. This will ensure that // any changes to the system clock are detected after no longer than this. -static const highp_time_t yasio__max_wait_usec = 5 * 60 * 1000 * 1000LL; +static const int yasio__max_wait_usec = 5 * 60 * 1000 * 1000LL; // the max transport alloc size static const size_t yasio__max_tsize = (std::max)({sizeof(io_transport_tcp), sizeof(io_transport_udp), sizeof(io_transport_ssl), sizeof(io_transport_kcp)}); -static const int yasio_udp_mss = static_cast((std::numeric_limits::max)() - (sizeof(yasio::ip::ip_hdr_st) + sizeof(yasio::ip::udp_hdr_st))); +static const int yasio__udp_mss = static_cast((std::numeric_limits::max)() - (sizeof(yasio::ip::ip_hdr_st) + sizeof(yasio::ip::udp_hdr_st))); } // namespace struct yasio__global_state { enum @@ -207,7 +206,7 @@ void highp_timer::cancel() std::chrono::microseconds highp_timer::wait_duration() const { - return std::chrono::duration_cast(this->expire_time_ - service_.time_); + return std::chrono::duration_cast(this->expire_time_ - service_.current_time_); } /// io_send_op @@ -486,7 +485,7 @@ void io_transport::set_primitives() else // UDP { this->write_cb_ = [this](const void* data, int len, const ip::endpoint*, int& error) { - int n = socket_->send(data, (std::min)(len, yasio_udp_mss), YASIO_MSG_FLAG); + int n = socket_->send(data, (std::min)(len, yasio__udp_mss), YASIO_MSG_FLAG); if (n < 0) error = xxsocket::get_last_errno(); return n; @@ -635,7 +634,7 @@ void io_transport_udp::set_primitives() { this->write_cb_ = [this](const void* data, int len, const ip::endpoint* destination, int& error) { assert(destination); - int n = socket_->sendto(data, (std::min)(len, yasio_udp_mss), *destination); + int n = socket_->sendto(data, (std::min)(len, yasio__udp_mss), *destination); if (n < 0) { error = xxsocket::get_last_errno(); @@ -668,30 +667,22 @@ int io_transport_udp::handle_input(const char* data, int bytes_transferred, int& #if defined(YASIO_ENABLE_KCP) // ----------------------- io_transport_kcp ------------------ -io_transport_kcp::io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s) - : io_transport_udp(ctx, std::forward(s)), timer_for_update_(ctx->get_service()) +io_transport_kcp::io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s) : io_transport_udp(ctx, std::forward(s)) { auto& kopts = ctx->kcp_options(); this->kcp_ = ::ikcp_create(static_cast(kopts.kcp_conv_), this); - ::ikcp_nodelay(this->kcp_, kopts.kcp_nodelay_, kopts.kcp_interval_ /*kcp max interval is 5000(ms)*/, kopts.kcp_resend_, kopts.kcp_ncwnd_); + ::ikcp_nodelay(this->kcp_, kopts.kcp_nodelay_, kopts.kcp_interval_, kopts.kcp_resend_, kopts.kcp_ncwnd_); ::ikcp_wndsize(this->kcp_, kopts.kcp_sndwnd_, kopts.kcp_rcvwnd_); ::ikcp_setmtu(this->kcp_, kopts.kcp_mtu_); // Because of nodelaying config will change the value. so setting RTO min after call ikcp_nodely. this->kcp_->rx_minrto = kopts.kcp_minrto_; - this->rawbuf_.resize(YASIO_INET_BUFFER_SIZE); + this->rawbuf_.resize(yasio__max_rcvbuf); ::ikcp_setoutput(this->kcp_, [](const char* buf, int len, ::ikcpcb* /*kcp*/, void* user) { auto t = (io_transport_kcp*)user; int ignored_ec = 0; return t->underlaying_write_cb_(buf, len, std::addressof(t->ensure_destination()), ignored_ec); }); - - // schedule a update timer - timer_for_update_.expires_from_now(std::chrono::milliseconds(kopts.kcp_interval_)); - timer_for_update_.async_wait([=](io_service&) { - ::ikcp_update(kcp_, yasio::clock()); - return false; - }); } io_transport_kcp::~io_transport_kcp() { ::ikcp_release(this->kcp_); } @@ -704,6 +695,7 @@ void io_transport_kcp::set_primitives() if (nsent > 0) { ::ikcp_flush(kcp_); + expire_time_ = 0; get_service().wakeup(); } else if (nsent == -2) @@ -713,7 +705,19 @@ void io_transport_kcp::set_primitives() return nsent; }; } +bool io_transport_kcp::do_write(highp_time_t& wait_duration) +{ + bool ret = io_transport_udp::do_write(wait_duration); + + const auto current = static_cast(std::chrono::duration_cast(get_service().current_time_.time_since_epoch()).count()); + if (((IINT32)(current - expire_time_)) >= 0) + { + ::ikcp_update(kcp_, current); + expire_time_ = ::ikcp_check(kcp_, current); + } + return ret; +} int io_transport_kcp::do_read(int revent, int& error, highp_time_t& wait_duration) { int n = this->call_read(&rawbuf_.front(), static_cast(rawbuf_.size()), revent, error); @@ -733,8 +737,10 @@ int io_transport_kcp::handle_input(const char* buf, int len, int& error, highp_t { // ikcp in event always in service thread, so no need to lock if (0 == ::ikcp_input(kcp_, buf, len)) + { + expire_time_ = 0; return len; - + } // simply regards -1,-2,-3 as error and trigger connection lost event. error = yasio::errc::invalid_packet; return -1; @@ -831,7 +837,7 @@ void io_service::initialize(const io_hostent* channel_eps, int channel_count) ssl_roles_[YSSL_CLIENT] = ssl_roles_[YSSL_SERVER] = nullptr; #endif - this->wait_duration_ = yasio__max_wait_usec; + this->wait_duration_ = this->sched_freq_; // at least one channel if (channel_count < 1) @@ -945,11 +951,10 @@ void io_service::run() ares_socket_t ares_socks[ARES_GETSOCK_MAXNUM] = {0}; #endif - // Update time for 1st loop - this->update_time(); - do { + this->current_time_ = yasio::steady_clock_t::now(); + auto waitd_usec = get_timeout(this->wait_duration_); // Gets current wait duration #if defined(YASIO_USE_CARES) /** @@ -1133,6 +1138,11 @@ void io_service::handle_close(transport_handle_t thandle) auto error = thandle->error_; const bool client = yasio__testbits(ctx->properties_, YCM_CLIENT); + if (yasio__testbits(ctx->properties_, YCM_KCP)) + { + if (--nsched_ <= 0) // if no sched transport, reset sched_freq to max wait 5mins + sched_freq_ = yasio__max_wait_usec; + } if (thandle->state_ == io_base::state::OPENED) { // @Because we can't retrive peer endpoint when connect reset by peer, so use id to trace. YASIO_KLOGD("[index: %d] the connection #%u is lost, ec=%d, where=%d, detail:%s", ctx->index_, thandle->id_, error, (int)thandle->error_stage_, @@ -1492,7 +1502,7 @@ void io_service::do_accept(io_channel* ctx) { if (yasio__testbits(ctx->properties_, YCPF_MCAST)) ctx->join_multicast_group(); - ctx->buffer_.resize(YASIO_INET_BUFFER_SIZE); + ctx->buffer_.resize(yasio__max_rcvbuf); } io_watcher_.mod_event(ctx->socket_->native_handle(), socket_event::read, 0); YASIO_KLOGI("[index: %d] open server succeed, socket.fd=%d listening at %s...", ctx->index_, (int)ctx->socket_->native_handle(), ep.to_string().c_str()); @@ -1651,6 +1661,13 @@ void io_service::active_transport(transport_handle_t t) auto ctx = t->ctx_; auto& s = t->socket_; this->transports_.push_back(t); + if (yasio__testbits(ctx->properties_, YCM_KCP)) + { + ++this->nsched_; + auto interval = static_cast(t)->interval(); + if (this->sched_freq_ > interval) + this->sched_freq_ = interval; + } if (!yasio__testbits(ctx->properties_, YCM_SSL)) { YASIO__UNUSED_PARAM(s); @@ -1871,8 +1888,6 @@ bool io_service::close_internal(io_channel* ctx) } void io_service::process_timers() { - this->update_time(); - if (this->timer_queue_.empty()) return; @@ -1908,7 +1923,7 @@ void io_service::process_deferred_events() } highp_time_t io_service::get_timeout(highp_time_t usec) { - this->wait_duration_ = yasio__max_wait_usec; // Reset next wait duration per frame + this->wait_duration_ = this->sched_freq_; // Reset next wait duration per frame if (this->timer_queue_.empty()) return usec; diff --git a/yasio/io_service.hpp b/yasio/io_service.hpp index de1b679b..98155d39 100644 --- a/yasio/io_service.hpp +++ b/yasio/io_service.hpp @@ -54,7 +54,7 @@ SOFTWARE. #endif #if defined(YASIO_ENABLE_KCP) -typedef struct IKCPCB ikcpcb; +# include "kcp/ikcp.h" struct yasio_kcp_options; #endif @@ -399,6 +399,11 @@ typedef highp_timer_ptr deadline_timer_ptr; typedef event_cb_t io_event_cb_t; typedef completion_cb_t io_completion_cb_t; +namespace +{ +static const int yasio__max_rcvbuf = YASIO_SZ(64, k); +} // namespace + // the ssl role enum ssl_role { @@ -774,8 +779,8 @@ class io_transport : public io_base { bool is_valid() const { return ctx_ != nullptr; } - char buffer_[YASIO_INET_BUFFER_SIZE]; // recv buffer, 64K - int offset_ = 0; // recv buffer offset + char buffer_[yasio__max_rcvbuf]; // recv buffer, 64K + int offset_ = 0; // recv buffer offset int expected_size_ = -1; sbyte_buffer expected_packet_; @@ -842,22 +847,26 @@ class YASIO_API io_transport_udp : public io_transport { }; #if defined(YASIO_ENABLE_KCP) class io_transport_kcp : public io_transport_udp { + friend class io_service; public: YASIO__DECL io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s); YASIO__DECL ~io_transport_kcp(); ikcpcb* internal_object() { return kcp_; } protected: - YASIO__DECL void set_primitives() override; YASIO__DECL int do_read(int revent, int& error, highp_time_t& wait_duration) override; + YASIO__DECL bool do_write(highp_time_t& wait_duration) override; + YASIO__DECL int handle_input(const char* buf, int len, int& error, highp_time_t& wait_duration) override; + + int interval() const { return kcp_->interval * std::milli::den; } sbyte_buffer rawbuf_; // the low level raw buffer - ikcpcb* kcp_; - highp_timer timer_for_update_; + ikcpcb* kcp_{nullptr}; + IUINT32 expire_time_{0}; // the next expire time(ms) to call ikcp_update std::function underlaying_write_cb_; }; #else @@ -1233,15 +1242,13 @@ class YASIO_API io_service // lgtm [cpp/class-many-fields] /* For log macro only */ inline const print_fn2_t& __get_cprint() const { return options_.print_; } - void update_time() { this->time_ = yasio::steady_clock_t::now(); } - private: state state_ = state::UNINITIALIZED; // The service state std::thread worker_; std::thread::id worker_id_; /* The current time according to the event loop. in msecs. */ - std::chrono::time_point time_; + std::chrono::time_point current_time_; privacy::concurrent_queue events_; @@ -1262,6 +1269,9 @@ class YASIO_API io_service // lgtm [cpp/class-many-fields] io_watcher io_watcher_; + int nsched_ = 0; + int sched_freq_ = 5 * 60 * 1000 * 1000; // 5mins in us + // options struct __unnamed_options { highp_time_t connect_timeout_ = 10LL * std::micro::den; From 5b818d71b9981d980c32e8232875782564576319 Mon Sep 17 00:00:00 2001 From: halx99 Date: Thu, 1 Feb 2024 01:22:03 +0800 Subject: [PATCH 04/11] Fix ci --- yasio/io_service.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/yasio/io_service.cpp b/yasio/io_service.cpp index 3ac4653b..44d67e98 100644 --- a/yasio/io_service.cpp +++ b/yasio/io_service.cpp @@ -1664,9 +1664,11 @@ void io_service::active_transport(transport_handle_t t) if (yasio__testbits(ctx->properties_, YCM_KCP)) { ++this->nsched_; +#if defined(YASIO_ENABLE_KCP) auto interval = static_cast(t)->interval(); if (this->sched_freq_ > interval) this->sched_freq_ = interval; +#endif } if (!yasio__testbits(ctx->properties_, YCM_SSL)) { From 377264cd419addeae61488090ece723ef973c544 Mon Sep 17 00:00:00 2001 From: halx99 Date: Thu, 1 Feb 2024 01:30:34 +0800 Subject: [PATCH 05/11] Remove unnecessary wakeup --- yasio/io_service.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/yasio/io_service.cpp b/yasio/io_service.cpp index 44d67e98..9c6fa4d5 100644 --- a/yasio/io_service.cpp +++ b/yasio/io_service.cpp @@ -696,7 +696,6 @@ void io_transport_kcp::set_primitives() { ::ikcp_flush(kcp_); expire_time_ = 0; - get_service().wakeup(); } else if (nsent == -2) error = EWOULDBLOCK; From cdc76a4c2273c07fdb8188b3d15df5e7255fee99 Mon Sep 17 00:00:00 2001 From: halx99 Date: Thu, 1 Feb 2024 01:49:36 +0800 Subject: [PATCH 06/11] Both win32 and unix(like) should check does remote endpoint already assoc with a transport --- yasio/io_service.cpp | 21 +++++++++++---------- yasio/io_service.hpp | 2 ++ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/yasio/io_service.cpp b/yasio/io_service.cpp index 9c6fa4d5..4e841d1a 100644 --- a/yasio/io_service.cpp +++ b/yasio/io_service.cpp @@ -896,6 +896,7 @@ void io_service::destroy_channels() } void io_service::clear_transports() { + transport_map_.clear(); for (auto transport : transports_) { cleanup_io(transport); @@ -1137,6 +1138,9 @@ void io_service::handle_close(transport_handle_t thandle) auto error = thandle->error_; const bool client = yasio__testbits(ctx->properties_, YCM_CLIENT); + if (yasio__testbits(ctx->properties_, YCM_UDP)) + transport_map_.erase(thandle->remote_endpoint()); + if (yasio__testbits(ctx->properties_, YCM_KCP)) { if (--nsched_ <= 0) // if no sched transport, reset sched_freq to max wait 5mins @@ -1588,16 +1592,10 @@ transport_handle_t io_service::do_dgram_accept(io_channel* ctx, const ip::endpoi b. for non-win32 multicast: same with win32, because the kernel can't route same udp peer as 1 transport when the peer always sendto multicast address. */ - const bool user_route = !YASIO__UDP_KROUTE || yasio__testbits(ctx->properties_, YCPF_MCAST); - if (user_route) - { - auto it = yasio__find_if(this->transports_, [&peer](const io_transport* transport) { - using namespace std; - return yasio__testbits(transport->ctx_->properties_, YCM_UDP) && static_cast(transport)->remote_endpoint() == peer; - }); - if (it != this->transports_.end()) - return *it; - } + // both win32 and unix(like) should check does remote endpoint already assoc with a transport + auto it = this->transport_map_.find(peer); + if (it != this->transport_map_.end()) + return it->second; auto new_sock = std::make_shared(); if (new_sock->popen(peer.af(), SOCK_DGRAM)) @@ -1611,10 +1609,13 @@ transport_handle_t io_service::do_dgram_accept(io_channel* ctx, const ip::endpoi auto transport = static_cast(allocate_transport(ctx, std::move(new_sock))); // We always establish 4 tuple with clients transport->confgure_remote(peer); + const bool user_route = !YASIO__UDP_KROUTE || yasio__testbits(ctx->properties_, YCPF_MCAST); if (user_route) active_transport(transport); else handle_connect_succeed(transport); + + this->transport_map_.emplace(peer, transport); return transport; } } diff --git a/yasio/io_service.hpp b/yasio/io_service.hpp index 98155d39..8deb6e21 100644 --- a/yasio/io_service.hpp +++ b/yasio/io_service.hpp @@ -36,6 +36,7 @@ SOFTWARE. #include #include #include +#include #include "yasio/sz.hpp" #include "yasio/config.hpp" #include "yasio/singleton.hpp" @@ -1259,6 +1260,7 @@ class YASIO_API io_service // lgtm [cpp/class-many-fields] std::vector transports_; std::vector tpool_; + std::map transport_map_; // timer support timer_pair, back is earliest expire timer std::vector timer_queue_; From 5797b8a61b67c91e50db897277aaa50f80a64744 Mon Sep 17 00:00:00 2001 From: halx99 Date: Thu, 1 Feb 2024 02:12:47 +0800 Subject: [PATCH 07/11] Let kcp internal to slice message --- tests/echo_client/main.cpp | 12 ++++++------ tests/speed/main.cpp | 13 +++++++------ yasio/io_service.cpp | 20 +++++++++----------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/tests/echo_client/main.cpp b/tests/echo_client/main.cpp index c7005b7a..e1f525f5 100644 --- a/tests/echo_client/main.cpp +++ b/tests/echo_client/main.cpp @@ -34,8 +34,9 @@ void run_echo_client(const char* ip, int port, const char* protocol) auto packet = std::move(event->packet()); total_bytes_transferred += static_cast(packet.size()); + printf("[latency: %lf(ms), bytes=%zu]", diff / 1000.0, packet.size()); fwrite(packet.data(), packet.size(), 1, stdout); - printf("latency: %lf(ms)\n", diff / 1000.0); + printf("\n"); fflush(stdout); break; } @@ -50,13 +51,12 @@ void run_echo_client(const char* ip, int port, const char* protocol) send_timer.async_wait([transport, index, protocol](io_service& service) -> bool { obstream obs; obs.write_byte('['); - obs.write_bytes(protocol, strlen(protocol)); + obs.write_bytes(protocol, static_cast(strlen(protocol))); obs.write_bytes("] Hello, "); - auto n = 4096 - obs.length(); - obs.fill_bytes(n - 1, '1'); - obs.write_byte(','); - printf("Sending %zu bytes ...\n", obs.length()); + auto n = 4096 - static_cast(obs.length()); + obs.fill_bytes(n, 'a'); service.write(transport, std::move(obs.buffer())); + printf("sent %zu bytes ...\n", obs.length()); s_last_send_time[index] = highp_clock(); return false; }); diff --git a/tests/speed/main.cpp b/tests/speed/main.cpp index 2a16404b..07199fe6 100644 --- a/tests/speed/main.cpp +++ b/tests/speed/main.cpp @@ -56,7 +56,8 @@ Test detail, please see: https://github.com/yasio/yasio/blob/master/benchmark.md #endif // speedtest kcp mtu to max mss of udp (65535 - 20(ip_hdr) - 8(udp_hdr)) -#define SPEEDTEST_KCP_MTU 65507 +#define SPEEDTEST_UDP_MSS 65507 +#define SPEEDTEST_KCP_MTU SPEEDTEST_UDP_MSS #define SPEEDTEST_KCP_MSS (SPEEDTEST_KCP_MTU - 24) namespace speedtest @@ -91,7 +92,7 @@ static long long s_recv_total_bytes = 0; static double s_send_speed = 0; // bytes/s static double s_recv_speed = 0; -static const long long s_kcp_send_interval = 1000; // (ms) in milliseconds +static const long long s_kcp_send_interval = 100; // (us) in milliseconds static const uint32_t s_kcp_conv = 8633; // can be any, but must same with two endpoint static const char* proto_name(int myproto) @@ -135,9 +136,9 @@ static void print_speed_detail(double interval, double time_elapsed) #if defined(YASIO_ENABLE_KCP) void setup_kcp_transfer(transport_handle_t handle) { - // auto kcp_handle = static_cast(handle)->internal_object(); - //::ikcp_setmtu(kcp_handle, SPEEDTEST_KCP_MTU); - //::ikcp_wndsize(kcp_handle, 256, 1024); + auto kcp_handle = static_cast(handle)->internal_object(); + ::ikcp_setmtu(kcp_handle, SPEEDTEST_KCP_MTU); + ::ikcp_wndsize(kcp_handle, 256, 1024); } #endif @@ -166,7 +167,7 @@ void kcp_send_repeated(io_service* service, transport_handle_t thandle, obstream static double time_elapsed = 0; static double last_print_time = 0; - highp_timer_ptr ignored_ret = service->schedule(std::chrono::milliseconds(s_kcp_send_interval), [=](io_service&) { + highp_timer_ptr ignored_ret = service->schedule(std::chrono::microseconds(s_kcp_send_interval), [=](io_service&) { s_send_total_bytes += service->write(thandle, obs->buffer()); time_elapsed = (yasio::highp_clock<>() - time_start) / 1000000.0; s_send_speed = s_send_total_bytes / time_elapsed; diff --git a/yasio/io_service.cpp b/yasio/io_service.cpp index 4e841d1a..d9954939 100644 --- a/yasio/io_service.cpp +++ b/yasio/io_service.cpp @@ -691,17 +691,15 @@ void io_transport_kcp::set_primitives() io_transport_udp::set_primitives(); underlaying_write_cb_ = write_cb_; write_cb_ = [this](const void* data, int len, const ip::endpoint*, int& error) { - int nsent = ::ikcp_send(kcp_, static_cast(data), (std::min)(static_cast(kcp_->mss), len)); - if (nsent > 0) - { - ::ikcp_flush(kcp_); - expire_time_ = 0; - } - else if (nsent == -2) - error = EWOULDBLOCK; - else - error = yasio::errc::invalid_packet; - return nsent; + int nsent = ::ikcp_send(kcp_, static_cast(data), len /*(std::min)(static_cast(kcp_->mss), len)*/); + if (nsent > 0) + { + ::ikcp_flush(kcp_); + expire_time_ = 0; + } + else + error = EMSGSIZE; // emit message too long + return nsent; }; } bool io_transport_kcp::do_write(highp_time_t& wait_duration) From edf58ab00437ea728777b732e1f416c1342c0a77 Mon Sep 17 00:00:00 2001 From: halx99 Date: Thu, 1 Feb 2024 02:18:28 +0800 Subject: [PATCH 08/11] Fix ci --- yasio/xxsocket.hpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/yasio/xxsocket.hpp b/yasio/xxsocket.hpp index 832e3d8c..2494810d 100644 --- a/yasio/xxsocket.hpp +++ b/yasio/xxsocket.hpp @@ -542,7 +542,7 @@ struct endpoint final { auto offst = s.find(fmt); if (offst != std::string::npos) { - snprintf(snum,sizeof(snum), "%u", addr_bytes[idx]); + snprintf(snum, sizeof(snum), "%u", addr_bytes[idx]); s.replace(offst, _N0, snum); } } @@ -1135,8 +1135,10 @@ using namespace yasio::inet; #endif } // namespace yasio +#if defined(YASIO__HAS_CXX11) namespace std { // VS2013 the operator must be at namespace std +#endif inline bool operator<(const yasio::inet::ip::endpoint& lhs, const yasio::inet::ip::endpoint& rhs) { // apply operator < to operands if (lhs.af() == AF_INET) @@ -1147,8 +1149,9 @@ inline bool operator==(const yasio::inet::ip::endpoint& lhs, const yasio::inet:: { // apply operator == to operands return !(lhs < rhs) && !(rhs < lhs); } +#if defined(YASIO__HAS_CXX11) } // namespace std - +#endif #if defined(YASIO_HEADER_ONLY) # include "yasio/xxsocket.cpp" // lgtm [cpp/include-non-header] #endif From 16f8cc2d31ef1d2cbaa6dc4eaa566984dacdd51e Mon Sep 17 00:00:00 2001 From: halx99 Date: Thu, 1 Feb 2024 02:47:00 +0800 Subject: [PATCH 09/11] Fix ci --- yasio/xxsocket.hpp | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/yasio/xxsocket.hpp b/yasio/xxsocket.hpp index 2494810d..fb2eb34f 100644 --- a/yasio/xxsocket.hpp +++ b/yasio/xxsocket.hpp @@ -218,6 +218,17 @@ struct endpoint final { explicit operator bool() const { return this->af() != AF_UNSPEC; } + friend inline bool operator<(const endpoint& lhs, const endpoint& rhs) + { // apply operator < to operands + if (lhs.af() == AF_INET) + return (static_cast(lhs.in4_.sin_addr.s_addr) + lhs.in4_.sin_port) < (static_cast(rhs.in4_.sin_addr.s_addr) + rhs.in4_.sin_port); + return ::memcmp(&lhs, &rhs, sizeof(rhs)) < 0; + } + friend inline bool operator==(const endpoint& lhs, const endpoint& rhs) + { // apply operator == to operands + return !(lhs < rhs) && !(rhs < lhs); + } + endpoint& operator=(const endpoint& rhs) { return as_is(rhs); } endpoint& as_is(const endpoint& rhs) { @@ -1135,23 +1146,6 @@ using namespace yasio::inet; #endif } // namespace yasio -#if defined(YASIO__HAS_CXX11) -namespace std -{ // VS2013 the operator must be at namespace std -#endif -inline bool operator<(const yasio::inet::ip::endpoint& lhs, const yasio::inet::ip::endpoint& rhs) -{ // apply operator < to operands - if (lhs.af() == AF_INET) - return (static_cast(lhs.in4_.sin_addr.s_addr) + lhs.in4_.sin_port) < (static_cast(rhs.in4_.sin_addr.s_addr) + rhs.in4_.sin_port); - return ::memcmp(&lhs, &rhs, sizeof(rhs)) < 0; -} -inline bool operator==(const yasio::inet::ip::endpoint& lhs, const yasio::inet::ip::endpoint& rhs) -{ // apply operator == to operands - return !(lhs < rhs) && !(rhs < lhs); -} -#if defined(YASIO__HAS_CXX11) -} // namespace std -#endif #if defined(YASIO_HEADER_ONLY) # include "yasio/xxsocket.cpp" // lgtm [cpp/include-non-header] #endif From e705fe79ec414735df98eb80dc9d893722092338 Mon Sep 17 00:00:00 2001 From: halx99 Date: Thu, 1 Feb 2024 03:00:12 +0800 Subject: [PATCH 10/11] Fix ci --- thirdparty | 2 +- yasio/xxsocket.hpp | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/thirdparty b/thirdparty index 39bd4c34..9e2ab5bc 160000 --- a/thirdparty +++ b/thirdparty @@ -1 +1 @@ -Subproject commit 39bd4c34a4a88494633b5fdf49c7fb096f7502ab +Subproject commit 9e2ab5bca290b0482d5bfd1c30b114f961c24b4e diff --git a/yasio/xxsocket.hpp b/yasio/xxsocket.hpp index fb2eb34f..dfae08e0 100644 --- a/yasio/xxsocket.hpp +++ b/yasio/xxsocket.hpp @@ -40,6 +40,7 @@ SOFTWARE. #include #include "yasio/impl/socket.hpp" #include "yasio/logging.hpp" +#include "yasio/string_view.hpp" namespace yasio { @@ -1146,6 +1147,17 @@ using namespace yasio::inet; #endif } // namespace yasio +namespace std +{ +template <> +struct hash { + std::size_t operator()(const yasio::ip::endpoint& ep) const YASIO__NOEXCEPT + { + return std::hash()(cxx17::string_view{reinterpret_cast(&ep), static_cast(ep.len())}); + } +}; +} // namespace std + #if defined(YASIO_HEADER_ONLY) # include "yasio/xxsocket.cpp" // lgtm [cpp/include-non-header] #endif From c1853e9df7524099f35197d426001453cc57f668 Mon Sep 17 00:00:00 2001 From: Deal Date: Thu, 1 Feb 2024 03:02:33 +0800 Subject: [PATCH 11/11] Improve code style --- yasio/xxsocket.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yasio/xxsocket.hpp b/yasio/xxsocket.hpp index dfae08e0..0d7ae515 100644 --- a/yasio/xxsocket.hpp +++ b/yasio/xxsocket.hpp @@ -219,13 +219,13 @@ struct endpoint final { explicit operator bool() const { return this->af() != AF_UNSPEC; } - friend inline bool operator<(const endpoint& lhs, const endpoint& rhs) + friend bool operator<(const endpoint& lhs, const endpoint& rhs) { // apply operator < to operands if (lhs.af() == AF_INET) return (static_cast(lhs.in4_.sin_addr.s_addr) + lhs.in4_.sin_port) < (static_cast(rhs.in4_.sin_addr.s_addr) + rhs.in4_.sin_port); return ::memcmp(&lhs, &rhs, sizeof(rhs)) < 0; } - friend inline bool operator==(const endpoint& lhs, const endpoint& rhs) + friend bool operator==(const endpoint& lhs, const endpoint& rhs) { // apply operator == to operands return !(lhs < rhs) && !(rhs < lhs); }