From 96e31b56e7eb1d31b1db16a2e41f10e7449a526f Mon Sep 17 00:00:00 2001 From: mingzc0508 Date: Wed, 13 Mar 2019 17:19:29 +0800 Subject: [PATCH] tcp socket connection keepalive (#13) --- CMakeLists.txt | 1 + examples/all-demos.cc | 11 +-- include/flora-agent.h | 6 ++ include/flora-cli.h | 31 ++++++-- include/flora-svc.h | 5 ++ src/beep-sock-poll.h | 167 ++++++++++++++++++++++++++++++++++++++++++ src/cli.cc | 142 +++++++++++++++++++++-------------- src/cli.h | 20 +++-- src/defs.h | 6 +- src/disp.cc | 30 ++++++-- src/disp.h | 5 +- src/flora-agent.cc | 13 +++- src/poll.cc | 4 +- src/ser-helper.cc | 20 +++++ src/ser-helper.h | 4 + src/sock-adap.cc | 15 ++-- src/sock-adap.h | 4 +- src/sock-conn.cc | 19 ++++- src/sock-conn.h | 7 +- src/sock-poll.cc | 100 ++++++++++++------------- src/sock-poll.h | 27 ++++--- 21 files changed, 477 insertions(+), 160 deletions(-) create mode 100644 src/beep-sock-poll.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 2c82b43..9ac5af4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -88,6 +88,7 @@ set(flora_svc_SOURCES src/poll.cc src/sock-poll.h src/sock-poll.cc + src/beep-sock-poll.h src/disp.h src/disp.cc src/ser-helper.h diff --git a/examples/all-demos.cc b/examples/all-demos.cc index 8bb624d..9b74d62 100644 --- a/examples/all-demos.cc +++ b/examples/all-demos.cc @@ -389,11 +389,12 @@ void DemoAllInOne::test_invoke_in_callback() { int32_t r = agent.call("not-exists", empty, "blah", resp); KLOGI(TAG, "invoke 'call' in callback function, return %d, excepted %d", r, FLORA_CLI_EDEADLOCK); - r = agent.call("not-exists", empty, "blah", [&agent](int32_t code, Response &) { - KLOGI(TAG, "invoke async 'call' in callback, return %d, excepted %d", - code, FLORA_CLI_ENEXISTS); - agent.close(); - }); + r = agent.call( + "not-exists", empty, "blah", [&agent](int32_t code, Response &) { + KLOGI(TAG, "invoke async 'call' in callback, return %d, excepted %d", + code, FLORA_CLI_ENEXISTS); + agent.close(); + }); }); agent.start(); shared_ptr empty; diff --git a/include/flora-agent.h b/include/flora-agent.h index 99f8b1b..ef53762 100644 --- a/include/flora-agent.h +++ b/include/flora-agent.h @@ -21,6 +21,10 @@ // config(KEY, uint32_t flags, MonitorCallback *cb) // flags: FLORA_CLI_FLAG_MONITOR_* (see flora-cli.h) #define FLORA_AGENT_CONFIG_MONITOR 3 +// config(KEY, uint32_t interval, uint32_t timeout) +// interval: interval of send beep packet +// timeout: timeout of flora service no response +#define FLORA_AGENT_CONFIG_KEEPALIVE 4 #ifdef __cplusplus @@ -104,6 +108,8 @@ class Agent : public ClientCallback { std::chrono::milliseconds(10000); uint32_t flags = 0; MonitorCallback *mon_callback = nullptr; + uint32_t beep_interval = FLORA_CLI_DEFAULT_BEEP_INTERVAL; + uint32_t noresp_timeout = FLORA_CLI_DEFAULT_NORESP_TIMEOUT; }; Options options; diff --git a/include/flora-cli.h b/include/flora-cli.h index 2048301..47082e8 100644 --- a/include/flora-cli.h +++ b/include/flora-cli.h @@ -40,6 +40,10 @@ #define FLORA_CLI_FLAG_MONITOR_DETAIL_DECL 0x4 #define FLORA_CLI_FLAG_MONITOR_DETAIL_POST 0x8 #define FLORA_CLI_FLAG_MONITOR_DETAIL_CALL 0x10 +#define FLORA_CLI_FLAG_KEEPALIVE 0x20 + +#define FLORA_CLI_DEFAULT_BEEP_INTERVAL 50000 +#define FLORA_CLI_DEFAULT_NORESP_TIMEOUT 100000 namespace flora { @@ -68,6 +72,15 @@ class Reply { class ClientCallback; class MonitorCallback; +class ClientOptions { +public: + uint32_t bufsize = 0; + uint32_t flags = 0; + // effective when FLORA_CLI_FLAG_KEEPALIVE is set + uint32_t beep_interval = FLORA_CLI_DEFAULT_BEEP_INTERVAL; + uint32_t noresp_timeout = FLORA_CLI_DEFAULT_NORESP_TIMEOUT; +}; + class Client { public: virtual ~Client() = default; @@ -101,13 +114,9 @@ class Client { uint32_t msg_buf_size, std::shared_ptr &result); - static int32_t connect(const char *uri, MonitorCallback *cb, - uint32_t msg_buf_size, uint32_t flags, - std::shared_ptr &result); - static int32_t connect(const char *uri, ClientCallback *ccb, - MonitorCallback *mcb, uint32_t msg_buf_size, - uint32_t flags, std::shared_ptr &result); + MonitorCallback *mcb, ClientOptions *opts, + std::shared_ptr &result); }; class ClientCallback { @@ -291,6 +300,16 @@ int32_t flora_cli_connect2(const char *uri, flora_cli_monitor_callback_t *callback, void *arg, uint32_t msg_buf_size, flora_cli_t *result); +typedef struct { + uint32_t bufsize; + uint32_t flags; + uint32_t beep_interval; + uint32_t noresp_timeout; +} flora_cli_options; +int32_t flora_cli_connect3(const char *uri, flora_cli_callback_t *ccb, + flora_cli_monitor_callback_t *mcb, void *arg, + flora_cli_options *opts, flora_cli_t *result); + void flora_cli_delete(flora_cli_t handle); int32_t flora_cli_subscribe(flora_cli_t handle, const char *name); diff --git a/include/flora-svc.h b/include/flora-svc.h index 7809544..1e55e8d 100644 --- a/include/flora-svc.h +++ b/include/flora-svc.h @@ -8,6 +8,9 @@ #define FLORA_POLL_INVAL -3 #define FLORA_POLL_UNSUPP -4 +// options of 'config' +#define FLORA_POLL_OPT_KEEPALIVE_TIMEOUT 1 + #define FLORA_DISP_FLAG_MONITOR 1 #ifdef __cplusplus @@ -35,6 +38,8 @@ class Poll { virtual void stop() = 0; + virtual void config(uint32_t opt, ...) = 0; + static std::shared_ptr new_instance(const char *uri); }; diff --git a/src/beep-sock-poll.h b/src/beep-sock-poll.h new file mode 100644 index 0000000..eebc091 --- /dev/null +++ b/src/beep-sock-poll.h @@ -0,0 +1,167 @@ +#pragma once + +#include "defs.h" +#include "rlog.h" +#include "sock-poll.h" +#include +#include +#include +#include +#include +#include +#include + +namespace flora { +namespace internal { + +class ActiveAdapter { +public: + std::chrono::steady_clock::time_point lastest_action_tp; + std::shared_ptr adapter; +}; +typedef std::list ActiveAdapterList; + +class BeepSocketPoll : public SocketPoll { +public: + BeepSocketPoll(const std::string &host, int32_t port) + : SocketPoll(host, port) {} + + void config(uint32_t opt, ...) { + va_list ap; + va_start(ap, opt); + switch (opt) { + case FLORA_POLL_OPT_KEEPALIVE_TIMEOUT: + options.beep_timeout = va_arg(ap, uint32_t); + break; + } + va_end(ap); + } + +private: + int32_t do_poll(fd_set *rfds, int max_fd) { + int r; + struct timeval tv; + struct timeval *ptv; + std::chrono::steady_clock::time_point nowtp; + + while (true) { + if (active_adapters.empty()) { +#ifdef SELECT_BLOCK_IF_FD_CLOSED + tv.tv_sec = 5; + tv.tv_usec = 0; + ptv = &tv; +#else + ptv = nullptr; +#endif + } else { + nowtp = std::chrono::steady_clock::now(); + obtain_timeout(nowtp, &tv); + ptv = &tv; + } + r = select(max_fd, rfds, nullptr, nullptr, ptv); + if (r < 0) { + if (errno == EAGAIN) { + sleep(1); + continue; + } + KLOGE(TAG, "select failed: %s", strerror(errno)); + } + nowtp = std::chrono::steady_clock::now(); + shutdown_timeout_adapter(nowtp); + break; + } + return r; + } + + std::shared_ptr do_accept(int lfd) { + auto adap = SocketPoll::do_accept(lfd); + if (adap != nullptr) { + auto it = active_adapters.emplace(active_adapters.end()); + it->adapter = adap; + it->lastest_action_tp = std::chrono::steady_clock::now(); + return adap; + } + return adap; + } + + bool do_read(std::shared_ptr &adap) { + bool r = SocketPoll::do_read(adap); + auto it = active_adapters.begin(); + while (it != active_adapters.end()) { + if (it->adapter == adap) + break; + ++it; + } + if (it != active_adapters.end()) { + if (r) { + // move adapter to end of list + // adapter sorted by lastest_action_tp + it->lastest_action_tp = std::chrono::steady_clock::now(); + if (it != active_adapters.rbegin().base()) + active_adapters.splice(active_adapters.end(), active_adapters, it); + } else { + active_adapters.erase(it); + } + } + return r; + } + + void obtain_timeout(std::chrono::steady_clock::time_point &nowtp, + struct timeval *tv) { + auto dur = std::chrono::duration_cast( + std::chrono::milliseconds(options.beep_timeout) - + (nowtp - active_adapters.front().lastest_action_tp)); + tv->tv_sec = dur.count() / 1000; + tv->tv_usec = (dur.count() % 1000) * 1000; + } + + void shutdown_timeout_adapter(std::chrono::steady_clock::time_point &nowtp) { +#ifdef FLORA_DEBUG + std::chrono::steady_clock::time_point dnowtp = nowtp; + KLOGD(TAG, "before shutdown timeout adapters, timeout = %u", + options.beep_timeout); + print_active_adapters(dnowtp); +#endif + nowtp -= std::chrono::milliseconds(options.beep_timeout); + auto it = active_adapters.begin(); + while (it != active_adapters.end()) { + if (nowtp >= it->lastest_action_tp) { + int fd = std::static_pointer_cast(it->adapter)->socket(); + KLOGW(TAG, "tcp socket %d keepalive timeout, shutdown!", fd); + ::shutdown(fd, SHUT_RDWR); + it = active_adapters.erase(it); + continue; + } + break; + } +#ifdef FLORA_DEBUG + KLOGD(TAG, "after shutdown timeout adapters, timeout = %u", + options.beep_timeout); + print_active_adapters(dnowtp); +#endif + } + +#ifdef FLORA_DEBUG + void print_active_adapters(std::chrono::steady_clock::time_point &nowtp) { + auto it = active_adapters.begin(); + while (it != active_adapters.end()) { + KLOGD(TAG, "%p: %d", it->adapter.get(), + std::chrono::duration_cast( + nowtp - it->lastest_action_tp) + .count()); + ++it; + } + } +#endif + +private: + class Options { + public: + uint32_t beep_timeout = 60000; + }; + ActiveAdapterList active_adapters; + Options options; +}; + +} // namespace internal +} // namespace flora diff --git a/src/cli.cc b/src/cli.cc index b7c27d1..c1949b3 100644 --- a/src/cli.cc +++ b/src/cli.cc @@ -13,23 +13,24 @@ using namespace std; using namespace std::chrono; using rokid::Uri; -#define TAG "flora.Client" - static bool ignore_sigpipe = false; int32_t flora::Client::connect(const char *uri, flora::ClientCallback *ccb, flora::MonitorCallback *mcb, - uint32_t msg_buf_size, uint32_t flags, + flora::ClientOptions *opts, shared_ptr &result) { if (!ignore_sigpipe) { signal(SIGPIPE, SIG_IGN); ignore_sigpipe = true; } - uint32_t bufsize = - msg_buf_size > DEFAULT_MSG_BUF_SIZE ? msg_buf_size : DEFAULT_MSG_BUF_SIZE; + ClientOptions defopts; + if (opts == nullptr) + opts = &defopts; + if (opts->bufsize < DEFAULT_MSG_BUF_SIZE) + opts->bufsize = DEFAULT_MSG_BUF_SIZE; shared_ptr cli = - make_shared(bufsize); - int32_t r = cli->connect(uri, flags, ccb, mcb); + make_shared(opts); + int32_t r = cli->connect(uri, ccb, mcb); if (r != FLORA_CLI_SUCCESS) return r; cli->set_weak_ptr(cli); @@ -40,14 +41,9 @@ int32_t flora::Client::connect(const char *uri, flora::ClientCallback *ccb, int32_t flora::Client::connect(const char *uri, ClientCallback *cb, uint32_t msg_buf_size, shared_ptr &result) { - return flora::Client::connect(uri, cb, nullptr, msg_buf_size, 0, result); -} - -int32_t flora::Client::connect(const char *uri, MonitorCallback *cb, - uint32_t msg_buf_size, uint32_t flags, - shared_ptr &result) { - flags |= FLORA_CLI_FLAG_MONITOR; - return flora::Client::connect(uri, nullptr, cb, msg_buf_size, flags, result); + flora::ClientOptions opts; + opts.bufsize = msg_buf_size; + return flora::Client::connect(uri, cb, nullptr, &opts, result); } namespace flora { @@ -61,16 +57,16 @@ Client::MonitorHandler Client::monitor_handlers[] = { &Client::handle_monitor_decl_remove, &Client::handle_monitor_post, &Client::handle_monitor_call}; -Client::Client(uint32_t bufsize) : buf_size(bufsize) { - sbuffer = (int8_t *)mmap(NULL, buf_size * 2, PROT_READ | PROT_WRITE, +Client::Client(flora::ClientOptions *opts) : options(*opts) { + sbuffer = (int8_t *)mmap(NULL, options.bufsize * 2, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); - rbuffer = sbuffer + buf_size; + rbuffer = sbuffer + options.bufsize; } Client::~Client() noexcept { close(false); - munmap(sbuffer, buf_size * 2); + munmap(sbuffer, options.bufsize * 2); #ifdef FLORA_DEBUG KLOGI(TAG, "client %s: post %u times, post %u bytes, " @@ -79,7 +75,7 @@ Client::~Client() noexcept { #endif } -int32_t Client::connect(const char *uri, uint32_t flags, ClientCallback *ccb, +int32_t Client::connect(const char *uri, ClientCallback *ccb, MonitorCallback *mcb) { if (uri == nullptr) return FLORA_CLI_EINVAL; @@ -92,8 +88,9 @@ int32_t Client::connect(const char *uri, uint32_t flags, ClientCallback *ccb, KLOGI(TAG, "uri parse: path = %s", urip.path.c_str()); KLOGI(TAG, "uri parse: fragment = %s", urip.fragment.c_str()); - SocketConn *conn = new SocketConn(); + SocketConn *conn = nullptr; if (urip.scheme == "unix") { + conn = new SocketConn(0); if (!conn->connect(urip.path)) { delete conn; return FLORA_CLI_ECONN; @@ -101,6 +98,7 @@ int32_t Client::connect(const char *uri, uint32_t flags, ClientCallback *ccb, connection.reset(conn); serialize_flags = 0; } else if (urip.scheme == "tcp") { + conn = new SocketConn(options.noresp_timeout); if (!conn->connect(urip.host, urip.port)) { delete conn; return FLORA_CLI_ECONN; @@ -109,27 +107,26 @@ int32_t Client::connect(const char *uri, uint32_t flags, ClientCallback *ccb, serialize_flags = CAPS_FLAG_NET_BYTEORDER; } else { KLOGE(TAG, "unsupported uri scheme %s", urip.scheme.c_str()); - delete conn; return FLORA_CLI_EINVAL; } mon_callback = mcb; recv_thread = thread([this]() { this->recv_loop(); }); - int32_t r = auth(urip.fragment, flags); + int32_t r = auth(urip.fragment, options.flags); if (r != FLORA_CLI_SUCCESS) { mon_callback = nullptr; close(false); return r; } + keepalive_thread = thread([this]() { this->keepalive_loop(); }); cli_callback = ccb; auth_extra = urip.fragment; - this->flags = flags; return FLORA_CLI_SUCCESS; } int32_t Client::auth(const string &extra, uint32_t flags) { - int32_t c = RequestSerializer::serialize_auth(FLORA_VERSION, extra.c_str(), - getpid(), flags, sbuffer, - buf_size, serialize_flags); + int32_t c = RequestSerializer::serialize_auth( + FLORA_VERSION, extra.c_str(), getpid(), flags, sbuffer, options.bufsize, + serialize_flags); if (c <= 0) return FLORA_CLI_EAUTH; AuthResult ares; @@ -153,12 +150,13 @@ void Client::recv_loop() { callback_thr_id = this_thread::get_id(); while (true) { - if (rbuf_off == buf_size) { - KLOGW(TAG, "recv buffer not enough, %u bytes", buf_size); + if (rbuf_off == options.bufsize) { + KLOGW(TAG, "recv buffer not enough, %u bytes", options.bufsize); err = FLORA_CLI_EINSUFF_BUF; break; } - int32_t c = connection->recv(rbuffer + rbuf_off, buf_size - rbuf_off); + int32_t c = + connection->recv(rbuffer + rbuf_off, options.bufsize - rbuf_off); if (c <= 0) { if (auth_result != nullptr) { err = FLORA_CLI_EAUTH; @@ -167,6 +165,9 @@ void Client::recv_loop() { auth_result->amutex.unlock(); auth_result = nullptr; } else { + if (c == -2) { + KLOGW(TAG, "wait flora service response timeout"); + } err = FLORA_CLI_ECONN; } break; @@ -306,6 +307,9 @@ bool Client::handle_cmd_after_auth(int32_t cmd, shared_ptr &resp) { return false; break; } + case CMD_PONG_RESP: + KLOGD(TAG, "recv pong"); + break; default: KLOGE(TAG, "client received invalid command %d", cmd); return false; @@ -313,6 +317,27 @@ bool Client::handle_cmd_after_auth(int32_t cmd, shared_ptr &resp) { return true; } +void Client::keepalive_loop() { + unique_lock locker(ka_mutex); + milliseconds inter(options.beep_interval); + + while (true) { + // connection CANNOT be nullptr here + if (connection->closed()) + break; + if (ka_cond.wait_for(locker, inter) == cv_status::timeout) + ping(); + } +} + +void Client::ping() { + int32_t c = RequestSerializer::serialize_ping(sbuffer, options.bufsize, + serialize_flags); + if (c <= 0) + return; + connection->send(sbuffer, c); +} + void Client::iclose(bool passive, int32_t err) { if (connection == nullptr || connection->closed()) return; @@ -336,6 +361,10 @@ void Client::iclose(bool passive, int32_t err) { } req_mutex.unlock(); + ka_mutex.lock(); + ka_cond.notify_one(); + ka_mutex.unlock(); + if (passive) { if (cli_callback) cli_callback->disconnected(); @@ -351,13 +380,15 @@ int32_t Client::close(bool passive) { if (recv_thread.joinable()) recv_thread.join(); cmd_handler = &Client::handle_cmd_before_auth; + if (keepalive_thread.joinable()) + keepalive_thread.join(); return FLORA_CLI_SUCCESS; } void Client::send_reply(int32_t callid, int32_t code, std::shared_ptr &data) { - int32_t c = RequestSerializer::serialize_reply(callid, code, data, sbuffer, - buf_size, serialize_flags); + int32_t c = RequestSerializer::serialize_reply( + callid, code, data, sbuffer, options.bufsize, serialize_flags); if (c <= 0) return; connection->send(sbuffer, c); @@ -366,10 +397,10 @@ void Client::send_reply(int32_t callid, int32_t code, int32_t Client::subscribe(const char *name) { if (name == nullptr) return FLORA_CLI_EINVAL; - if (flags & FLORA_CLI_FLAG_MONITOR) + if (options.flags & FLORA_CLI_FLAG_MONITOR) return FLORA_CLI_EMONITOR; - int32_t c = RequestSerializer::serialize_subscribe(name, sbuffer, buf_size, - serialize_flags); + int32_t c = RequestSerializer::serialize_subscribe( + name, sbuffer, options.bufsize, serialize_flags); if (c <= 0) return FLORA_CLI_EINVAL; if (!connection->send(sbuffer, c)) { @@ -385,10 +416,10 @@ int32_t Client::subscribe(const char *name) { int32_t Client::unsubscribe(const char *name) { if (name == nullptr) return FLORA_CLI_EINVAL; - if (flags & FLORA_CLI_FLAG_MONITOR) + if (options.flags & FLORA_CLI_FLAG_MONITOR) return FLORA_CLI_EMONITOR; - int32_t c = RequestSerializer::serialize_unsubscribe(name, sbuffer, buf_size, - serialize_flags); + int32_t c = RequestSerializer::serialize_unsubscribe( + name, sbuffer, options.bufsize, serialize_flags); if (c <= 0) return FLORA_CLI_EINVAL; if (!connection->send(sbuffer, c)) { @@ -404,10 +435,10 @@ int32_t Client::unsubscribe(const char *name) { int32_t Client::declare_method(const char *name) { if (name == nullptr) return FLORA_CLI_EINVAL; - if (flags & FLORA_CLI_FLAG_MONITOR) + if (options.flags & FLORA_CLI_FLAG_MONITOR) return FLORA_CLI_EMONITOR; int32_t c = RequestSerializer::serialize_declare_method( - name, sbuffer, buf_size, serialize_flags); + name, sbuffer, options.bufsize, serialize_flags); if (c <= 0) return FLORA_CLI_EINVAL; if (!connection->send(sbuffer, c)) { @@ -423,10 +454,10 @@ int32_t Client::declare_method(const char *name) { int32_t Client::remove_method(const char *name) { if (name == nullptr) return FLORA_CLI_EINVAL; - if (flags & FLORA_CLI_FLAG_MONITOR) + if (options.flags & FLORA_CLI_FLAG_MONITOR) return FLORA_CLI_EMONITOR; int32_t c = RequestSerializer::serialize_remove_method( - name, sbuffer, buf_size, serialize_flags); + name, sbuffer, options.bufsize, serialize_flags); if (c <= 0) return FLORA_CLI_EINVAL; if (!connection->send(sbuffer, c)) { @@ -443,10 +474,10 @@ int32_t Client::post(const char *name, shared_ptr &msg, uint32_t msgtype) { if (name == nullptr || !is_valid_msgtype(msgtype)) return FLORA_CLI_EINVAL; - if (flags & FLORA_CLI_FLAG_MONITOR) + if (options.flags & FLORA_CLI_FLAG_MONITOR) return FLORA_CLI_EMONITOR; - int32_t c = RequestSerializer::serialize_post(name, msgtype, msg, sbuffer, - buf_size, serialize_flags); + int32_t c = RequestSerializer::serialize_post( + name, msgtype, msg, sbuffer, options.bufsize, serialize_flags); if (c <= 0) return FLORA_CLI_EINVAL; if (!connection->send(sbuffer, c)) { @@ -465,12 +496,13 @@ int32_t Client::call(const char *name, shared_ptr &msg, const char *target, Response &reply, uint32_t timeout) { if (name == nullptr) return FLORA_CLI_EINVAL; - if (flags & FLORA_CLI_FLAG_MONITOR) + if (options.flags & FLORA_CLI_FLAG_MONITOR) return FLORA_CLI_EMONITOR; if (this_thread::get_id() == callback_thr_id) return FLORA_CLI_EDEADLOCK; int32_t c = RequestSerializer::serialize_call( - name, msg, target, ++reqseq, timeout, sbuffer, buf_size, serialize_flags); + name, msg, target, ++reqseq, timeout, sbuffer, options.bufsize, + serialize_flags); if (c <= 0) return FLORA_CLI_EINVAL; @@ -518,10 +550,11 @@ int32_t Client::call(const char *name, shared_ptr &msg, uint32_t timeout) { if (name == nullptr) return FLORA_CLI_EINVAL; - if (flags & FLORA_CLI_FLAG_MONITOR) + if (options.flags & FLORA_CLI_FLAG_MONITOR) return FLORA_CLI_EMONITOR; int32_t c = RequestSerializer::serialize_call( - name, msg, target, ++reqseq, timeout, sbuffer, buf_size, serialize_flags); + name, msg, target, ++reqseq, timeout, sbuffer, options.bufsize, + serialize_flags); if (c <= 0) return FLORA_CLI_EINVAL; @@ -744,9 +777,12 @@ int32_t flora_cli_connect(const char *uri, flora_cli_callback_t *cb, void *arg, if (result == nullptr) return FLORA_CLI_EINVAL; - uint32_t bufsize = - msg_buf_size > DEFAULT_MSG_BUF_SIZE ? msg_buf_size : DEFAULT_MSG_BUF_SIZE; - shared_ptr cli = make_shared(bufsize); + flora::ClientOptions opts; + if (msg_buf_size < DEFAULT_MSG_BUF_SIZE) + opts.bufsize = DEFAULT_MSG_BUF_SIZE; + else + opts.bufsize = msg_buf_size; + shared_ptr cli = make_shared(&opts); int32_t r; WrapClientCallback *wcb = nullptr; @@ -756,7 +792,7 @@ int32_t flora_cli_connect(const char *uri, flora_cli_callback_t *cb, void *arg, wcb->arg = arg; } - r = cli->connect(uri, 0, wcb, nullptr); + r = cli->connect(uri, wcb, nullptr); if (r != FLORA_CLI_SUCCESS) { return r; } diff --git a/src/cli.h b/src/cli.h index 392bdda..469669c 100644 --- a/src/cli.h +++ b/src/cli.h @@ -27,12 +27,11 @@ typedef std::list PendingRequestList; class Client : public flora::Client { public: - explicit Client(uint32_t bufsize); + explicit Client(flora::ClientOptions *opts); ~Client() noexcept; - int32_t connect(const char *uri, uint32_t flags, ClientCallback *ccb, - MonitorCallback *mcb); + int32_t connect(const char *uri, ClientCallback *ccb, MonitorCallback *mcb); int32_t close(bool passive); @@ -65,6 +64,8 @@ class Client : public flora::Client { void recv_loop(); + void keepalive_loop(); + bool handle_received(int32_t size); bool handle_cmd_before_auth(int32_t cmd, std::shared_ptr &resp); @@ -85,26 +86,29 @@ class Client : public flora::Client { bool handle_monitor_post(std::shared_ptr &resp); bool handle_monitor_call(std::shared_ptr &resp); + void ping(); + private: - uint32_t buf_size; int8_t *sbuffer = nullptr; int8_t *rbuffer = nullptr; uint32_t rbuf_off = 0; + ClientOptions options; std::shared_ptr connection; std::thread recv_thread; std::mutex req_mutex; std::condition_variable req_reply_cond; PendingRequestList pending_requests; + std::thread keepalive_thread; + std::mutex ka_mutex; + std::condition_variable ka_cond; int32_t reqseq = 0; uint32_t serialize_flags = 0; int32_t close_reason = 0; std::weak_ptr this_weak_ptr; std::thread::id callback_thr_id; - // client flags - // MONITOR - uint32_t flags = 0; MonitorCallback *mon_callback = nullptr; - typedef bool (flora::internal::Client::*CmdHandler)(int32_t cmd, std::shared_ptr &resp); + typedef bool (flora::internal::Client::*CmdHandler)( + int32_t cmd, std::shared_ptr &resp); CmdHandler cmd_handler = &Client::handle_cmd_before_auth; class AuthResult { public: diff --git a/src/defs.h b/src/defs.h index d40d34b..19e1eb1 100644 --- a/src/defs.h +++ b/src/defs.h @@ -11,14 +11,16 @@ #define CMD_DECLARE_METHOD_REQ 5 #define CMD_REMOVE_METHOD_REQ 6 #define CMD_CALL_REQ 7 +#define CMD_PING_REQ 8 // server --> client #define CMD_AUTH_RESP 101 #define CMD_POST_RESP 102 #define CMD_REPLY_RESP 103 #define CMD_CALL_RESP 104 #define CMD_MONITOR_RESP 105 +#define CMD_PONG_RESP 106 -#define MSG_HANDLER_COUNT 8 +#define MSG_HANDLER_COUNT 9 // subtype of CMD_MONITOR_RESP #define MONITOR_LIST_ALL 0 @@ -39,3 +41,5 @@ #ifdef __APPLE__ #define SELECT_BLOCK_IF_FD_CLOSED #endif + +#define TAG "flora" diff --git a/src/disp.cc b/src/disp.cc index 82cd2eb..1de8999 100644 --- a/src/disp.cc +++ b/src/disp.cc @@ -10,7 +10,6 @@ using namespace std; using namespace std::chrono; -#define TAG "flora.Dispatcher" #define DEFAULT_CALL_TIMEOUT 200 uint32_t AdapterInfo::idseq; @@ -23,7 +22,9 @@ bool (Dispatcher::*(Dispatcher::msg_handlers[MSG_HANDLER_COUNT]))( &Dispatcher::handle_auth_req, &Dispatcher::handle_subscribe_req, &Dispatcher::handle_unsubscribe_req, &Dispatcher::handle_post_req, &Dispatcher::handle_reply_req, &Dispatcher::handle_declare_method, - &Dispatcher::handle_remove_method, &Dispatcher::handle_call_req}; + &Dispatcher::handle_remove_method, &Dispatcher::handle_call_req, + &Dispatcher::handle_ping_req, +}; Dispatcher::Dispatcher(uint32_t f, uint32_t bufsize) : flags(f) { buf_size = bufsize > DEFAULT_MSG_BUF_SIZE ? bufsize : DEFAULT_MSG_BUF_SIZE; @@ -104,8 +105,7 @@ void Dispatcher::handle_cmd(shared_ptr &msg_caps, do_erase_adapter(sender); return; } - if (sender->info && - (sender->info->flags & FLORA_CLI_FLAG_MONITOR)) { + if (sender->info && (sender->info->flags & FLORA_CLI_FLAG_MONITOR)) { // monitor client should not send request sender->close(); return; @@ -155,7 +155,7 @@ void Dispatcher::close() { } } -void Dispatcher::erase_adapter(shared_ptr &&adapter) { +void Dispatcher::erase_adapter(shared_ptr &adapter) { if (adapter->info == nullptr) return; lock_guard locker(cmd_mutex); @@ -218,6 +218,8 @@ void Dispatcher::do_erase_adapter(shared_ptr &sender) { if (sender->info->flags & FLORA_CLI_FLAG_MONITOR) monitors.erase(reinterpret_cast(sender.get())); else { + KLOGI(TAG, "erase adapter <%d>:%s", sender->info->pid, + sender->info->name.c_str()); write_monitor_list_remove(sender->info->id); adapter_infos.erase(reinterpret_cast(sender.get())); } @@ -481,6 +483,20 @@ bool Dispatcher::handle_reply_req(shared_ptr &msg_caps, return true; } +bool Dispatcher::handle_ping_req(shared_ptr &msg_caps, + shared_ptr &sender) { + if (sender->info == nullptr) + return false; + KLOGD(TAG, "<<< %s: ping", sender->info->name.c_str()); + int32_t c = ResponseSerializer::serialize_pong(buffer, buf_size, + sender->serialize_flags); + if (c < 0) + return false; + KLOGD(TAG, ">>> %s: pong", sender->info->name.c_str()); + sender->write(buffer, c); + return true; +} + bool Dispatcher::add_adapter(const string &name, int32_t pid, uint32_t flags, shared_ptr &adapter) { if (name.length() > 0) { @@ -538,8 +554,8 @@ void Dispatcher::write_monitor_list_add(shared_ptr &newitem, *newitem->info, buffer, buf_size, monitor->serialize_flags); if (c < 0) return; - KLOGD(TAG, ">>> %s: monitor list add %s, %d bytes", monitor->info->name.c_str(), - newitem->info->name.c_str(), c); + KLOGD(TAG, ">>> %s: monitor list add %s, %d bytes", + monitor->info->name.c_str(), newitem->info->name.c_str(), c); monitor->write(buffer, c); } diff --git a/src/disp.h b/src/disp.h index 233d915..9ea84c1 100644 --- a/src/disp.h +++ b/src/disp.h @@ -49,7 +49,7 @@ class Dispatcher : public flora::Dispatcher { inline uint32_t max_msg_size() const { return buf_size; } - void erase_adapter(std::shared_ptr &&adapter); + void erase_adapter(std::shared_ptr &adapter); private: bool handle_auth_req(std::shared_ptr &msg_caps, @@ -76,6 +76,9 @@ class Dispatcher : public flora::Dispatcher { bool handle_reply_req(std::shared_ptr &msg_caps, std::shared_ptr &sender); + bool handle_ping_req(std::shared_ptr &msg_caps, + std::shared_ptr &sender); + bool add_adapter(const std::string &name, int32_t pid, uint32_t flags, std::shared_ptr &adapter); diff --git a/src/flora-agent.cc b/src/flora-agent.cc index 16bc5ab..a58a838 100644 --- a/src/flora-agent.cc +++ b/src/flora-agent.cc @@ -4,8 +4,6 @@ #include #include -#define TAG "flora.agent" - using namespace std; using namespace std::chrono; @@ -37,6 +35,10 @@ void Agent::config(uint32_t key, va_list ap) { options.mon_callback = va_arg(ap, MonitorCallback *); } break; + case FLORA_AGENT_CONFIG_KEEPALIVE: + options.beep_interval = va_arg(ap, uint32_t); + options.noresp_timeout = va_arg(ap, uint32_t); + break; } } @@ -116,10 +118,15 @@ void Agent::start(bool block) { void Agent::run() { unique_lock locker(conn_mutex); shared_ptr cli; + flora::ClientOptions cliopts; + cliopts.bufsize = options.bufsize; + cliopts.flags = options.flags; + cliopts.beep_interval = options.beep_interval; + cliopts.noresp_timeout = options.noresp_timeout; while (working) { int32_t r = Client::connect(options.uri.c_str(), this, options.mon_callback, - options.bufsize, options.flags, cli); + &cliopts, cli); if (r != FLORA_CLI_SUCCESS) { KLOGI(TAG, "connect to flora service %s failed, retry after %u milliseconds", diff --git a/src/poll.cc b/src/poll.cc index 4a77c20..fa7eaf8 100644 --- a/src/poll.cc +++ b/src/poll.cc @@ -1,5 +1,5 @@ +#include "beep-sock-poll.h" #include "flora-svc.h" -#include "sock-poll.h" #include "uri.h" using namespace std; @@ -14,7 +14,7 @@ shared_ptr flora::Poll::new_instance(const char *uri) { make_shared(urip.path)); } else if (urip.scheme == "tcp") { return static_pointer_cast( - make_shared(urip.host, urip.port)); + make_shared(urip.host, urip.port)); } return nullptr; } diff --git a/src/ser-helper.cc b/src/ser-helper.cc index b3de70d..df19424 100644 --- a/src/ser-helper.cc +++ b/src/ser-helper.cc @@ -131,6 +131,16 @@ int32_t RequestSerializer::serialize_reply(int32_t id, int32_t code, return r; } +int32_t RequestSerializer::serialize_ping(void *data, uint32_t size, + uint32_t flags) { + shared_ptr caps = Caps::new_instance(); + caps->write(CMD_PING_REQ); + int32_t r = caps->serialize(data, size, flags); + if (r < 0 || r > size) + return -1; + return r; +} + int32_t ResponseSerializer::serialize_auth(int32_t result, void *data, uint32_t size, uint32_t flags) { shared_ptr caps = Caps::new_instance(); @@ -322,6 +332,16 @@ int32_t ResponseSerializer::serialize_monitor_call( return -1; } +int32_t ResponseSerializer::serialize_pong(void *data, uint32_t size, + uint32_t flags) { + shared_ptr p = Caps::new_instance(); + p->write(CMD_PONG_RESP); + int32_t r = p->serialize(data, size, flags); + if (r < 0 || r > size) + return -1; + return r; +} + int32_t RequestParser::parse_auth(shared_ptr &caps, uint32_t &version, string &extra, int32_t &pid, uint32_t &flags) { diff --git a/src/ser-helper.h b/src/ser-helper.h index 92d3c34..0548667 100644 --- a/src/ser-helper.h +++ b/src/ser-helper.h @@ -40,6 +40,8 @@ class RequestSerializer { static int32_t serialize_reply(int32_t id, int32_t code, std::shared_ptr &values, void *data, uint32_t size, uint32_t flags); + + static int32_t serialize_ping(void *data, uint32_t size, uint32_t flags); }; class ResponseSerializer { @@ -99,6 +101,8 @@ class ResponseSerializer { const std::string &target, int32_t err, void *data, uint32_t size, uint32_t flags); + + static int32_t serialize_pong(void *data, uint32_t size, uint32_t flags); }; class RequestParser { diff --git a/src/sock-adap.cc b/src/sock-adap.cc index de17009..e1184e6 100644 --- a/src/sock-adap.cc +++ b/src/sock-adap.cc @@ -1,5 +1,6 @@ #include "sock-adap.h" #include "caps.h" +#include "defs.h" #include "rlog.h" #include #include @@ -7,32 +8,27 @@ #include #include -#define TAG "flora.SocketAdapter" - using namespace std; SocketAdapter::SocketAdapter(int sock, uint32_t bufsize, uint32_t flags) - : Adapter(flags), socket(sock) { + : Adapter(flags), socketfd(sock) { buffer = (int8_t *)mmap(NULL, bufsize, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); buf_size = bufsize; } -SocketAdapter::~SocketAdapter() { - close(); -} +SocketAdapter::~SocketAdapter() { close(); } int32_t SocketAdapter::read() { if (buffer == nullptr) return SOCK_ADAPTER_ECLOSED; - ssize_t c = ::read(socket, buffer + cur_size, buf_size - cur_size); + ssize_t c = ::read(socketfd, buffer + cur_size, buf_size - cur_size); if (c <= 0) { if (c == 0) { KLOGD(TAG, "socket closed by remote"); } else { KLOGE(TAG, "read socket failed: %s", strerror(errno)); } - close(); return SOCK_ADAPTER_ECLOSED; } cur_size += c; @@ -83,7 +79,6 @@ void SocketAdapter::close_nolock() { if (buffer) { munmap(buffer, buf_size); buffer = nullptr; - socket = -1; #ifdef FLORA_DEBUG KLOGI(TAG, "socket adapter %s: recv times = %u, recv bytes = %u", info ? info->name.c_str() : "", recv_times, recv_bytes); @@ -100,7 +95,7 @@ void SocketAdapter::write(const void *data, uint32_t size) { lock_guard locker(write_mutex); if (buffer == nullptr) return; - if (::write(socket, data, size) < 0) { + if (::write(socketfd, data, size) < 0) { KLOGE(TAG, "write to socket failed: %s", strerror(errno)); close_nolock(); } diff --git a/src/sock-adap.h b/src/sock-adap.h index 93e6ac0..40f6271 100644 --- a/src/sock-adap.h +++ b/src/sock-adap.h @@ -33,11 +33,13 @@ class SocketAdapter : public Adapter { bool closed() override; + int socket() const { return socketfd; } + private: void close_nolock(); private: - int socket; + int socketfd; int8_t *buffer = nullptr; uint32_t buf_size; uint32_t cur_size = 0; diff --git a/src/sock-conn.cc b/src/sock-conn.cc index 54fd3e0..9da235a 100644 --- a/src/sock-conn.cc +++ b/src/sock-conn.cc @@ -1,4 +1,5 @@ #include "sock-conn.h" +#include "defs.h" #include "rlog.h" #include #include @@ -13,7 +14,7 @@ using namespace std; -#define TAG "flora.SocketConn" +SocketConn::SocketConn(uint32_t rtmo) : recv_timeout(rtmo) {} SocketConn::~SocketConn() { if (sock >= 0) { @@ -27,6 +28,13 @@ bool SocketConn::connect(const std::string &name) { KLOGE(TAG, "socket create failed: %s", strerror(errno)); return false; } + // set socket recv timeout + if (recv_timeout > 0) { + struct timeval tv; + tv.tv_sec = recv_timeout / 1000; + tv.tv_usec = (recv_timeout % 1000) * 1000; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + } struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; @@ -47,6 +55,13 @@ bool SocketConn::connect(const std::string &host, int32_t port) { KLOGE(TAG, "socket create failed: %s", strerror(errno)); return false; } + // set socket recv timeout + if (recv_timeout > 0) { + struct timeval tv; + tv.tv_sec = recv_timeout / 1000; + tv.tv_usec = (recv_timeout % 1000) * 1000; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + } struct sockaddr_in addr; struct hostent *hp; hp = gethostbyname(host.c_str()); @@ -95,6 +110,8 @@ int32_t SocketConn::recv(void *data, uint32_t size) { do { c = ::read(sock, data, size); if (c < 0) { + if (errno == EAGAIN) + return -2; if (errno == EINTR) { KLOGE(TAG, "read socket failed: %s", strerror(errno)); continue; diff --git a/src/sock-conn.h b/src/sock-conn.h index 7f37c0a..63ebba2 100644 --- a/src/sock-conn.h +++ b/src/sock-conn.h @@ -6,6 +6,8 @@ class SocketConn : public Connection { public: + SocketConn(uint32_t rtmo); + ~SocketConn(); bool connect(const std::string &name); @@ -14,14 +16,17 @@ class SocketConn : public Connection { bool send(const void *data, uint32_t size) override; + // return: -1 socket error + // -2 read timeout int32_t recv(void *data, uint32_t size) override; void close() override; - bool closed() const override { return sock < 0; } + bool closed() const override { return !sock_ready; } private: int sock = -1; + uint32_t recv_timeout; bool sock_ready = false; std::mutex write_mutex; }; diff --git a/src/sock-poll.cc b/src/sock-poll.cc index 04ebd68..ff06828 100644 --- a/src/sock-poll.cc +++ b/src/sock-poll.cc @@ -13,7 +13,6 @@ using namespace std; -#define TAG "flora.UnixPoll" #define POLL_TYPE_UNIX 0 #define POLL_TYPE_TCP 1 @@ -104,13 +103,8 @@ static int tcp_accept(int lfd) { } void SocketPoll::run() { - fd_set all_fds; fd_set rfds; - int new_fd; - int max_fd; - AdapterMap::iterator adap_it; - vector pending_delete_adapters; - size_t i; + int ifd; start_mutex.lock(); FD_ZERO(&all_fds); @@ -133,48 +127,53 @@ void SocketPoll::run() { // select timeout, this Poll not closed, continue if (r == 0) continue; - // accept new connection - if (FD_ISSET(lfd, &rfds)) { - if (type == POLL_TYPE_TCP) - new_fd = tcp_accept(lfd); - else - new_fd = unix_accept(lfd); - if (new_fd < 0) { - KLOGE(TAG, "accept failed: %s", strerror(errno)); - continue; - } - if (new_fd >= max_fd) - max_fd = new_fd + 1; - FD_SET(new_fd, &all_fds); - KLOGI(TAG, "accept new connection %d", new_fd); - new_adapter(new_fd); - } - // read - for (adap_it = adapters.begin(); adap_it != adapters.end(); ++adap_it) { - if (FD_ISSET(adap_it->first, &rfds)) { - KLOGD(TAG, "read from fd %d", adap_it->first); - if (!read_from_client(adap_it->second)) { - adap_it->second->close(); - dispatcher->erase_adapter( - static_pointer_cast(adap_it->second)); - pending_delete_adapters.push_back(adap_it->first); - FD_CLR(adap_it->first, &all_fds); + for (ifd = 0; ifd < max_fd; ++ifd) { + if (FD_ISSET(ifd, &rfds)) { + if (ifd == lfd) { + auto new_adap = do_accept(lfd); + if (new_adap == nullptr) { + KLOGE(TAG, "accept failed: %s", strerror(errno)); + continue; + } + KLOGI(TAG, "accept new connection %d", + static_pointer_cast(new_adap)->socket()); + } else { + auto it = adapters.find(ifd); + if (it != adapters.end()) { + KLOGD(TAG, "read from fd %d", ifd); + if (!do_read(it->second)) { + delete_adapter(it->second); + adapters.erase(it); + } + } } } } - for (i = 0; i < pending_delete_adapters.size(); ++i) { - delete_adapter(pending_delete_adapters[i]); - } - pending_delete_adapters.clear(); } // release resources while (adapters.size() > 0) { - delete_adapter(adapters.begin()->first); + auto it = adapters.begin(); + delete_adapter(it->second); + adapters.erase(it); } KLOGI(TAG, "unix poll: run thread quit"); } +shared_ptr SocketPoll::do_accept(int lfd) { + int new_fd = -1; + + if (type == POLL_TYPE_TCP) + new_fd = tcp_accept(lfd); + else + new_fd = unix_accept(lfd); + if (new_fd < 0) + return nullptr; + auto adap = new_adapter(new_fd); + adapters.insert(make_pair(new_fd, adap)); + return adap; +} + bool SocketPoll::init_unix_socket() { int fd = socket(PF_UNIX, SOCK_STREAM, 0); if (fd < 0) @@ -223,22 +222,24 @@ bool SocketPoll::init_tcp_socket() { return true; } -void SocketPoll::new_adapter(int fd) { +shared_ptr SocketPoll::new_adapter(int fd) { shared_ptr adap = make_shared( fd, max_msg_size, type == POLL_TYPE_TCP ? CAPS_FLAG_NET_BYTEORDER : 0); - adapters.insert(make_pair(fd, adap)); + if (fd >= max_fd) + max_fd = fd + 1; + FD_SET(fd, &all_fds); + return static_pointer_cast(adap); } -void SocketPoll::delete_adapter(int fd) { - AdapterMap::iterator it = adapters.find(fd); - if (it != adapters.end()) { - ::close(fd); - it->second->close(); - adapters.erase(it); - } +void SocketPoll::delete_adapter(shared_ptr &adap) { + dispatcher->erase_adapter(adap); + int fd = static_pointer_cast(adap)->socket(); + adap->close(); + FD_CLR(fd, &all_fds); + ::close(fd); } -bool SocketPoll::read_from_client(shared_ptr &adap) { +bool SocketPoll::do_read(shared_ptr &adap) { int32_t r = adap->read(); if (r != SOCK_ADAPTER_SUCCESS) return false; @@ -247,8 +248,7 @@ bool SocketPoll::read_from_client(shared_ptr &adap) { while (true) { r = adap->next_frame(frame); if (r == SOCK_ADAPTER_SUCCESS) { - shared_ptr a = static_pointer_cast(adap); - if (!dispatcher->put(frame.data, frame.size, a)) { + if (!dispatcher->put(frame.data, frame.size, adap)) { KLOGE(TAG, "dispatcher put failed"); return false; } diff --git a/src/sock-poll.h b/src/sock-poll.h index de978c7..d0e208b 100644 --- a/src/sock-poll.h +++ b/src/sock-poll.h @@ -11,13 +11,11 @@ #include #include -typedef std::map> AdapterMap; -// typedef std::list > SessionList; -// typedef std::map SubscribeMap; - namespace flora { namespace internal { +typedef std::map> AdapterMap; + class SocketPoll : public flora::Poll { public: explicit SocketPoll(const std::string &name); @@ -30,6 +28,15 @@ class SocketPoll : public flora::Poll { void stop(); + virtual void config(uint32_t opt, ...) {} + +protected: + virtual int32_t do_poll(fd_set *rfds, int max_fd); + + virtual std::shared_ptr do_accept(int lfd); + + virtual bool do_read(std::shared_ptr &adap); + private: void run(); @@ -37,19 +44,17 @@ class SocketPoll : public flora::Poll { bool init_tcp_socket(); - void new_adapter(int fd); - - void delete_adapter(int fd); - - bool read_from_client(std::shared_ptr &adap); - int get_listen_fd(); - int32_t do_poll(fd_set *rfds, int max_fd); + std::shared_ptr new_adapter(int fd); + + void delete_adapter(std::shared_ptr &adap); private: std::shared_ptr dispatcher; int listen_fd = -1; + int max_fd = 0; + fd_set all_fds; uint32_t max_msg_size = 0; std::thread run_thread; std::mutex start_mutex;