Skip to content

Commit

Permalink
Improve kcp implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
halx99 committed Jan 31, 2024
1 parent a22302d commit 78c6892
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 38 deletions.
3 changes: 0 additions & 3 deletions yasio/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,6 @@ SOFTWARE.
// The default ttl of multicast
#define YASIO_DEFAULT_MULTICAST_TTL (int)128

// The max internet buffer size
#define YASIO_INET_BUFFER_SIZE 65536

// The max pdu buffer length, avoid large memory allocation when application decode a huge length.
#define YASIO_MAX_PDU_BUFFER_SIZE static_cast<int>(1 * 1024 * 1024)

Expand Down
67 changes: 41 additions & 26 deletions yasio/io_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ SOFTWARE.
#include "yasio/wtimer_hres.hpp"

#if defined(YASIO_ENABLE_KCP)
# include "kcp/ikcp.h"
struct yasio_kcp_options {
int kcp_conv_ = 0;

Expand Down Expand Up @@ -139,10 +138,10 @@ namespace
{
// By default we will wait no longer than 5 minutes. This will ensure that
// any changes to the system clock are detected after no longer than this.
static const highp_time_t yasio__max_wait_usec = 5 * 60 * 1000 * 1000LL;
static const int yasio__max_wait_usec = 5 * 60 * 1000 * 1000LL;
// the max transport alloc size
static const size_t yasio__max_tsize = (std::max)({sizeof(io_transport_tcp), sizeof(io_transport_udp), sizeof(io_transport_ssl), sizeof(io_transport_kcp)});
static const int yasio_udp_mss = static_cast<int>((std::numeric_limits<uint16_t>::max)() - (sizeof(yasio::ip::ip_hdr_st) + sizeof(yasio::ip::udp_hdr_st)));
static const int yasio__udp_mss = static_cast<int>((std::numeric_limits<uint16_t>::max)() - (sizeof(yasio::ip::ip_hdr_st) + sizeof(yasio::ip::udp_hdr_st)));
} // namespace
struct yasio__global_state {
enum
Expand Down Expand Up @@ -207,7 +206,7 @@ void highp_timer::cancel()

std::chrono::microseconds highp_timer::wait_duration() const
{
return std::chrono::duration_cast<std::chrono::microseconds>(this->expire_time_ - service_.time_);
return std::chrono::duration_cast<std::chrono::microseconds>(this->expire_time_ - service_.current_time_);
}

/// io_send_op
Expand Down Expand Up @@ -486,7 +485,7 @@ void io_transport::set_primitives()
else // UDP
{
this->write_cb_ = [this](const void* data, int len, const ip::endpoint*, int& error) {
int n = socket_->send(data, (std::min)(len, yasio_udp_mss), YASIO_MSG_FLAG);
int n = socket_->send(data, (std::min)(len, yasio__udp_mss), YASIO_MSG_FLAG);
if (n < 0)
error = xxsocket::get_last_errno();
return n;
Expand Down Expand Up @@ -635,7 +634,7 @@ void io_transport_udp::set_primitives()
{
this->write_cb_ = [this](const void* data, int len, const ip::endpoint* destination, int& error) {
assert(destination);
int n = socket_->sendto(data, (std::min)(len, yasio_udp_mss), *destination);
int n = socket_->sendto(data, (std::min)(len, yasio__udp_mss), *destination);
if (n < 0)
{
error = xxsocket::get_last_errno();
Expand Down Expand Up @@ -668,30 +667,22 @@ int io_transport_udp::handle_input(const char* data, int bytes_transferred, int&

#if defined(YASIO_ENABLE_KCP)
// ----------------------- io_transport_kcp ------------------
io_transport_kcp::io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s)
: io_transport_udp(ctx, std::forward<xxsocket_ptr>(s)), timer_for_update_(ctx->get_service())
io_transport_kcp::io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s) : io_transport_udp(ctx, std::forward<xxsocket_ptr>(s))
{
auto& kopts = ctx->kcp_options();
this->kcp_ = ::ikcp_create(static_cast<IUINT32>(kopts.kcp_conv_), this);
::ikcp_nodelay(this->kcp_, kopts.kcp_nodelay_, kopts.kcp_interval_ /*kcp max interval is 5000(ms)*/, kopts.kcp_resend_, kopts.kcp_ncwnd_);
::ikcp_nodelay(this->kcp_, kopts.kcp_nodelay_, kopts.kcp_interval_, kopts.kcp_resend_, kopts.kcp_ncwnd_);
::ikcp_wndsize(this->kcp_, kopts.kcp_sndwnd_, kopts.kcp_rcvwnd_);
::ikcp_setmtu(this->kcp_, kopts.kcp_mtu_);
// Because of nodelaying config will change the value. so setting RTO min after call ikcp_nodely.
this->kcp_->rx_minrto = kopts.kcp_minrto_;

this->rawbuf_.resize(YASIO_INET_BUFFER_SIZE);
this->rawbuf_.resize(yasio__max_rcvbuf);
::ikcp_setoutput(this->kcp_, [](const char* buf, int len, ::ikcpcb* /*kcp*/, void* user) {
auto t = (io_transport_kcp*)user;
int ignored_ec = 0;
return t->underlaying_write_cb_(buf, len, std::addressof(t->ensure_destination()), ignored_ec);
});

// schedule a update timer
timer_for_update_.expires_from_now(std::chrono::milliseconds(kopts.kcp_interval_));
timer_for_update_.async_wait([=](io_service&) {
::ikcp_update(kcp_, yasio::clock());
return false;
});
}
io_transport_kcp::~io_transport_kcp() { ::ikcp_release(this->kcp_); }

Expand All @@ -704,6 +695,7 @@ void io_transport_kcp::set_primitives()
if (nsent > 0)
{
::ikcp_flush(kcp_);
expire_time_ = 0;
get_service().wakeup();
}
else if (nsent == -2)
Expand All @@ -713,7 +705,19 @@ void io_transport_kcp::set_primitives()
return nsent;
};
}
bool io_transport_kcp::do_write(highp_time_t& wait_duration)
{
bool ret = io_transport_udp::do_write(wait_duration);

const auto current = static_cast<IUINT32>(std::chrono::duration_cast<std::chrono::milliseconds>(get_service().current_time_.time_since_epoch()).count());
if (((IINT32)(current - expire_time_)) >= 0)
{
::ikcp_update(kcp_, current);
expire_time_ = ::ikcp_check(kcp_, current);
}

return ret;
}
int io_transport_kcp::do_read(int revent, int& error, highp_time_t& wait_duration)
{
int n = this->call_read(&rawbuf_.front(), static_cast<int>(rawbuf_.size()), revent, error);
Expand All @@ -733,8 +737,10 @@ int io_transport_kcp::handle_input(const char* buf, int len, int& error, highp_t
{
// ikcp in event always in service thread, so no need to lock
if (0 == ::ikcp_input(kcp_, buf, len))
{
expire_time_ = 0;
return len;

}
// simply regards -1,-2,-3 as error and trigger connection lost event.
error = yasio::errc::invalid_packet;
return -1;
Expand Down Expand Up @@ -831,7 +837,7 @@ void io_service::initialize(const io_hostent* channel_eps, int channel_count)
ssl_roles_[YSSL_CLIENT] = ssl_roles_[YSSL_SERVER] = nullptr;
#endif

this->wait_duration_ = yasio__max_wait_usec;
this->wait_duration_ = this->sched_freq_;

// at least one channel
if (channel_count < 1)
Expand Down Expand Up @@ -945,11 +951,10 @@ void io_service::run()
ares_socket_t ares_socks[ARES_GETSOCK_MAXNUM] = {0};
#endif

// Update time for 1st loop
this->update_time();

do
{
this->current_time_ = yasio::steady_clock_t::now();

auto waitd_usec = get_timeout(this->wait_duration_); // Gets current wait duration
#if defined(YASIO_USE_CARES)
/**
Expand Down Expand Up @@ -1133,6 +1138,11 @@ void io_service::handle_close(transport_handle_t thandle)
auto error = thandle->error_;
const bool client = yasio__testbits(ctx->properties_, YCM_CLIENT);

if (yasio__testbits(ctx->properties_, YCM_KCP))
{
if (--nsched_ <= 0) // if no sched transport, reset sched_freq to max wait 5mins
sched_freq_ = yasio__max_wait_usec;
}
if (thandle->state_ == io_base::state::OPENED)
{ // @Because we can't retrive peer endpoint when connect reset by peer, so use id to trace.
YASIO_KLOGD("[index: %d] the connection #%u is lost, ec=%d, where=%d, detail:%s", ctx->index_, thandle->id_, error, (int)thandle->error_stage_,
Expand Down Expand Up @@ -1492,7 +1502,7 @@ void io_service::do_accept(io_channel* ctx)
{
if (yasio__testbits(ctx->properties_, YCPF_MCAST))
ctx->join_multicast_group();
ctx->buffer_.resize(YASIO_INET_BUFFER_SIZE);
ctx->buffer_.resize(yasio__max_rcvbuf);
}
io_watcher_.mod_event(ctx->socket_->native_handle(), socket_event::read, 0);
YASIO_KLOGI("[index: %d] open server succeed, socket.fd=%d listening at %s...", ctx->index_, (int)ctx->socket_->native_handle(), ep.to_string().c_str());
Expand Down Expand Up @@ -1651,6 +1661,13 @@ void io_service::active_transport(transport_handle_t t)
auto ctx = t->ctx_;
auto& s = t->socket_;
this->transports_.push_back(t);
if (yasio__testbits(ctx->properties_, YCM_KCP))
{
++this->nsched_;
auto interval = static_cast<io_transport_kcp*>(t)->interval();
if (this->sched_freq_ > interval)
this->sched_freq_ = interval;
}
if (!yasio__testbits(ctx->properties_, YCM_SSL))
{
YASIO__UNUSED_PARAM(s);
Expand Down Expand Up @@ -1871,8 +1888,6 @@ bool io_service::close_internal(io_channel* ctx)
}
void io_service::process_timers()
{
this->update_time();

if (this->timer_queue_.empty())
return;

Expand Down Expand Up @@ -1908,7 +1923,7 @@ void io_service::process_deferred_events()
}
highp_time_t io_service::get_timeout(highp_time_t usec)
{
this->wait_duration_ = yasio__max_wait_usec; // Reset next wait duration per frame
this->wait_duration_ = this->sched_freq_; // Reset next wait duration per frame

if (this->timer_queue_.empty())
return usec;
Expand Down
28 changes: 19 additions & 9 deletions yasio/io_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ SOFTWARE.
#endif

#if defined(YASIO_ENABLE_KCP)
typedef struct IKCPCB ikcpcb;
# include "kcp/ikcp.h"
struct yasio_kcp_options;
#endif

Expand Down Expand Up @@ -399,6 +399,11 @@ typedef highp_timer_ptr deadline_timer_ptr;
typedef event_cb_t io_event_cb_t;
typedef completion_cb_t io_completion_cb_t;

namespace
{
static const int yasio__max_rcvbuf = YASIO_SZ(64, k);
} // namespace

// the ssl role
enum ssl_role
{
Expand Down Expand Up @@ -774,8 +779,8 @@ class io_transport : public io_base {

bool is_valid() const { return ctx_ != nullptr; }

char buffer_[YASIO_INET_BUFFER_SIZE]; // recv buffer, 64K
int offset_ = 0; // recv buffer offset
char buffer_[yasio__max_rcvbuf]; // recv buffer, 64K
int offset_ = 0; // recv buffer offset

int expected_size_ = -1;
sbyte_buffer expected_packet_;
Expand Down Expand Up @@ -842,22 +847,26 @@ class YASIO_API io_transport_udp : public io_transport {
};
#if defined(YASIO_ENABLE_KCP)
class io_transport_kcp : public io_transport_udp {
friend class io_service;
public:
YASIO__DECL io_transport_kcp(io_channel* ctx, xxsocket_ptr&& s);
YASIO__DECL ~io_transport_kcp();
ikcpcb* internal_object() { return kcp_; }

protected:

YASIO__DECL void set_primitives() override;

YASIO__DECL int do_read(int revent, int& error, highp_time_t& wait_duration) override;

YASIO__DECL bool do_write(highp_time_t& wait_duration) override;

YASIO__DECL int handle_input(const char* buf, int len, int& error, highp_time_t& wait_duration) override;

int interval() const { return kcp_->interval * std::milli::den; }

sbyte_buffer rawbuf_; // the low level raw buffer
ikcpcb* kcp_;
highp_timer timer_for_update_;
ikcpcb* kcp_{nullptr};
IUINT32 expire_time_{0}; // the next expire time(ms) to call ikcp_update
std::function<int(const void*, int, const ip::endpoint*, int&)> underlaying_write_cb_;
};
#else
Expand Down Expand Up @@ -1233,15 +1242,13 @@ class YASIO_API io_service // lgtm [cpp/class-many-fields]
/* For log macro only */
inline const print_fn2_t& __get_cprint() const { return options_.print_; }

void update_time() { this->time_ = yasio::steady_clock_t::now(); }

private:
state state_ = state::UNINITIALIZED; // The service state
std::thread worker_;
std::thread::id worker_id_;

/* The current time according to the event loop. in msecs. */
std::chrono::time_point<yasio::steady_clock_t> time_;
std::chrono::time_point<yasio::steady_clock_t> current_time_;

privacy::concurrent_queue<event_ptr, true> events_;

Expand All @@ -1262,6 +1269,9 @@ class YASIO_API io_service // lgtm [cpp/class-many-fields]

io_watcher io_watcher_;

int nsched_ = 0;
int sched_freq_ = 5 * 60 * 1000 * 1000; // 5mins in us

// options
struct __unnamed_options {
highp_time_t connect_timeout_ = 10LL * std::micro::den;
Expand Down

0 comments on commit 78c6892

Please sign in to comment.