diff --git a/yasio/config.hpp b/yasio/config.hpp index 82c291c6..c0ffb82f 100644 --- a/yasio/config.hpp +++ b/yasio/config.hpp @@ -216,9 +216,6 @@ SOFTWARE. // The default ttl of multicast #define YASIO_DEFAULT_MULTICAST_TTL (int)128 -// The max internet buffer size -#define YASIO_INET_BUFFER_SIZE 65536 - // The max pdu buffer length, avoid large memory allocation when application decode a huge length. #define YASIO_MAX_PDU_BUFFER_SIZE static_cast(1 * 1024 * 1024) diff --git a/yasio/io_service.cpp b/yasio/io_service.cpp index 51a507d3..3ac4653b 100644 --- a/yasio/io_service.cpp +++ b/yasio/io_service.cpp @@ -47,7 +47,6 @@ SOFTWARE. #include "yasio/wtimer_hres.hpp" #if defined(YASIO_ENABLE_KCP) -# include "kcp/ikcp.h" struct yasio_kcp_options { int kcp_conv_ = 0; @@ -139,10 +138,10 @@ namespace { // By default we will wait no longer than 5 minutes. This will ensure that // any changes to the system clock are detected after no longer than this. -static const highp_time_t yasio__max_wait_usec = 5 * 60 * 1000 * 1000LL; +static const int yasio__max_wait_usec = 5 * 60 * 1000 * 1000LL; // the max transport alloc size static const size_t yasio__max_tsize = (std::max)({sizeof(io_transport_tcp), sizeof(io_transport_udp), sizeof(io_transport_ssl), sizeof(io_transport_kcp)}); -static const int yasio_udp_mss = static_cast((std::numeric_limits::max)() - (sizeof(yasio::ip::ip_hdr_st) + sizeof(yasio::ip::udp_hdr_st))); +static const int yasio__udp_mss = static_cast((std::numeric_limits::max)() - (sizeof(yasio::ip::ip_hdr_st) + sizeof(yasio::ip::udp_hdr_st))); } // namespace struct yasio__global_state { enum @@ -207,7 +206,7 @@ void highp_timer::cancel() std::chrono::microseconds highp_timer::wait_duration() const { - return std::chrono::duration_cast(this->expire_time_ - service_.time_); + return std::chrono::duration_cast(this->expire_time_ - service_.current_time_); } /// io_send_op @@ -486,7 +485,7 @@ void io_transport::set_primitives() else // UDP { this->write_cb_ = [this](const void* data, int len, const ip::endpoint*, int& error) { - int n = socket_->send(data, (std::min)(len, yasio_udp_mss), YASIO_MSG_FLAG); + int n = socket_->send(data, (std::min)(len, yasio__udp_mss), YASIO_MSG_FLAG); if (n < 0) error = xxsocket::get_last_errno(); return n; @@ -635,7 +634,7 @@ void io_transport_udp::set_primitives() { this->write_cb_ = [this](const void* data, int len, const ip::endpoint* destination, int& error) { assert(destination); - int n = socket_->sendto(data, (std::min)(len, yasio_udp_mss), *destination); + int n = socket_->sendto(data, (std::min)(len, yasio__udp_mss), *destination); if (n < 0) { error = xxsocket::get_last_errno(); @@ -668,30 +667,22 @@ int io_transport_udp::handle_input(const char* data, int bytes_transferred, int& #if defined(YASIO_ENABLE_KCP) // ----------------------- io_transport_kcp ------------------ -io_transport_kcp::io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s) - : io_transport_udp(ctx, std::forward(s)), timer_for_update_(ctx->get_service()) +io_transport_kcp::io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s) : io_transport_udp(ctx, std::forward(s)) { auto& kopts = ctx->kcp_options(); this->kcp_ = ::ikcp_create(static_cast(kopts.kcp_conv_), this); - ::ikcp_nodelay(this->kcp_, kopts.kcp_nodelay_, kopts.kcp_interval_ /*kcp max interval is 5000(ms)*/, kopts.kcp_resend_, kopts.kcp_ncwnd_); + ::ikcp_nodelay(this->kcp_, kopts.kcp_nodelay_, kopts.kcp_interval_, kopts.kcp_resend_, kopts.kcp_ncwnd_); ::ikcp_wndsize(this->kcp_, kopts.kcp_sndwnd_, kopts.kcp_rcvwnd_); ::ikcp_setmtu(this->kcp_, kopts.kcp_mtu_); // Because of nodelaying config will change the value. so setting RTO min after call ikcp_nodely. this->kcp_->rx_minrto = kopts.kcp_minrto_; - this->rawbuf_.resize(YASIO_INET_BUFFER_SIZE); + this->rawbuf_.resize(yasio__max_rcvbuf); ::ikcp_setoutput(this->kcp_, [](const char* buf, int len, ::ikcpcb* /*kcp*/, void* user) { auto t = (io_transport_kcp*)user; int ignored_ec = 0; return t->underlaying_write_cb_(buf, len, std::addressof(t->ensure_destination()), ignored_ec); }); - - // schedule a update timer - timer_for_update_.expires_from_now(std::chrono::milliseconds(kopts.kcp_interval_)); - timer_for_update_.async_wait([=](io_service&) { - ::ikcp_update(kcp_, yasio::clock()); - return false; - }); } io_transport_kcp::~io_transport_kcp() { ::ikcp_release(this->kcp_); } @@ -704,6 +695,7 @@ void io_transport_kcp::set_primitives() if (nsent > 0) { ::ikcp_flush(kcp_); + expire_time_ = 0; get_service().wakeup(); } else if (nsent == -2) @@ -713,7 +705,19 @@ void io_transport_kcp::set_primitives() return nsent; }; } +bool io_transport_kcp::do_write(highp_time_t& wait_duration) +{ + bool ret = io_transport_udp::do_write(wait_duration); + + const auto current = static_cast(std::chrono::duration_cast(get_service().current_time_.time_since_epoch()).count()); + if (((IINT32)(current - expire_time_)) >= 0) + { + ::ikcp_update(kcp_, current); + expire_time_ = ::ikcp_check(kcp_, current); + } + return ret; +} int io_transport_kcp::do_read(int revent, int& error, highp_time_t& wait_duration) { int n = this->call_read(&rawbuf_.front(), static_cast(rawbuf_.size()), revent, error); @@ -733,8 +737,10 @@ int io_transport_kcp::handle_input(const char* buf, int len, int& error, highp_t { // ikcp in event always in service thread, so no need to lock if (0 == ::ikcp_input(kcp_, buf, len)) + { + expire_time_ = 0; return len; - + } // simply regards -1,-2,-3 as error and trigger connection lost event. error = yasio::errc::invalid_packet; return -1; @@ -831,7 +837,7 @@ void io_service::initialize(const io_hostent* channel_eps, int channel_count) ssl_roles_[YSSL_CLIENT] = ssl_roles_[YSSL_SERVER] = nullptr; #endif - this->wait_duration_ = yasio__max_wait_usec; + this->wait_duration_ = this->sched_freq_; // at least one channel if (channel_count < 1) @@ -945,11 +951,10 @@ void io_service::run() ares_socket_t ares_socks[ARES_GETSOCK_MAXNUM] = {0}; #endif - // Update time for 1st loop - this->update_time(); - do { + this->current_time_ = yasio::steady_clock_t::now(); + auto waitd_usec = get_timeout(this->wait_duration_); // Gets current wait duration #if defined(YASIO_USE_CARES) /** @@ -1133,6 +1138,11 @@ void io_service::handle_close(transport_handle_t thandle) auto error = thandle->error_; const bool client = yasio__testbits(ctx->properties_, YCM_CLIENT); + if (yasio__testbits(ctx->properties_, YCM_KCP)) + { + if (--nsched_ <= 0) // if no sched transport, reset sched_freq to max wait 5mins + sched_freq_ = yasio__max_wait_usec; + } if (thandle->state_ == io_base::state::OPENED) { // @Because we can't retrive peer endpoint when connect reset by peer, so use id to trace. YASIO_KLOGD("[index: %d] the connection #%u is lost, ec=%d, where=%d, detail:%s", ctx->index_, thandle->id_, error, (int)thandle->error_stage_, @@ -1492,7 +1502,7 @@ void io_service::do_accept(io_channel* ctx) { if (yasio__testbits(ctx->properties_, YCPF_MCAST)) ctx->join_multicast_group(); - ctx->buffer_.resize(YASIO_INET_BUFFER_SIZE); + ctx->buffer_.resize(yasio__max_rcvbuf); } io_watcher_.mod_event(ctx->socket_->native_handle(), socket_event::read, 0); YASIO_KLOGI("[index: %d] open server succeed, socket.fd=%d listening at %s...", ctx->index_, (int)ctx->socket_->native_handle(), ep.to_string().c_str()); @@ -1651,6 +1661,13 @@ void io_service::active_transport(transport_handle_t t) auto ctx = t->ctx_; auto& s = t->socket_; this->transports_.push_back(t); + if (yasio__testbits(ctx->properties_, YCM_KCP)) + { + ++this->nsched_; + auto interval = static_cast(t)->interval(); + if (this->sched_freq_ > interval) + this->sched_freq_ = interval; + } if (!yasio__testbits(ctx->properties_, YCM_SSL)) { YASIO__UNUSED_PARAM(s); @@ -1871,8 +1888,6 @@ bool io_service::close_internal(io_channel* ctx) } void io_service::process_timers() { - this->update_time(); - if (this->timer_queue_.empty()) return; @@ -1908,7 +1923,7 @@ void io_service::process_deferred_events() } highp_time_t io_service::get_timeout(highp_time_t usec) { - this->wait_duration_ = yasio__max_wait_usec; // Reset next wait duration per frame + this->wait_duration_ = this->sched_freq_; // Reset next wait duration per frame if (this->timer_queue_.empty()) return usec; diff --git a/yasio/io_service.hpp b/yasio/io_service.hpp index de1b679b..98155d39 100644 --- a/yasio/io_service.hpp +++ b/yasio/io_service.hpp @@ -54,7 +54,7 @@ SOFTWARE. #endif #if defined(YASIO_ENABLE_KCP) -typedef struct IKCPCB ikcpcb; +# include "kcp/ikcp.h" struct yasio_kcp_options; #endif @@ -399,6 +399,11 @@ typedef highp_timer_ptr deadline_timer_ptr; typedef event_cb_t io_event_cb_t; typedef completion_cb_t io_completion_cb_t; +namespace +{ +static const int yasio__max_rcvbuf = YASIO_SZ(64, k); +} // namespace + // the ssl role enum ssl_role { @@ -774,8 +779,8 @@ class io_transport : public io_base { bool is_valid() const { return ctx_ != nullptr; } - char buffer_[YASIO_INET_BUFFER_SIZE]; // recv buffer, 64K - int offset_ = 0; // recv buffer offset + char buffer_[yasio__max_rcvbuf]; // recv buffer, 64K + int offset_ = 0; // recv buffer offset int expected_size_ = -1; sbyte_buffer expected_packet_; @@ -842,22 +847,26 @@ class YASIO_API io_transport_udp : public io_transport { }; #if defined(YASIO_ENABLE_KCP) class io_transport_kcp : public io_transport_udp { + friend class io_service; public: YASIO__DECL io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s); YASIO__DECL ~io_transport_kcp(); ikcpcb* internal_object() { return kcp_; } protected: - YASIO__DECL void set_primitives() override; YASIO__DECL int do_read(int revent, int& error, highp_time_t& wait_duration) override; + YASIO__DECL bool do_write(highp_time_t& wait_duration) override; + YASIO__DECL int handle_input(const char* buf, int len, int& error, highp_time_t& wait_duration) override; + + int interval() const { return kcp_->interval * std::milli::den; } sbyte_buffer rawbuf_; // the low level raw buffer - ikcpcb* kcp_; - highp_timer timer_for_update_; + ikcpcb* kcp_{nullptr}; + IUINT32 expire_time_{0}; // the next expire time(ms) to call ikcp_update std::function underlaying_write_cb_; }; #else @@ -1233,15 +1242,13 @@ class YASIO_API io_service // lgtm [cpp/class-many-fields] /* For log macro only */ inline const print_fn2_t& __get_cprint() const { return options_.print_; } - void update_time() { this->time_ = yasio::steady_clock_t::now(); } - private: state state_ = state::UNINITIALIZED; // The service state std::thread worker_; std::thread::id worker_id_; /* The current time according to the event loop. in msecs. */ - std::chrono::time_point time_; + std::chrono::time_point current_time_; privacy::concurrent_queue events_; @@ -1262,6 +1269,9 @@ class YASIO_API io_service // lgtm [cpp/class-many-fields] io_watcher io_watcher_; + int nsched_ = 0; + int sched_freq_ = 5 * 60 * 1000 * 1000; // 5mins in us + // options struct __unnamed_options { highp_time_t connect_timeout_ = 10LL * std::micro::den;