Skip to content

Commit

Permalink
tcp socket connection keepalive (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
mingzc0508 authored Mar 13, 2019
1 parent efc54f6 commit 96e31b5
Show file tree
Hide file tree
Showing 21 changed files with 477 additions and 160 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ set(flora_svc_SOURCES
src/poll.cc
src/sock-poll.h
src/sock-poll.cc
src/beep-sock-poll.h
src/disp.h
src/disp.cc
src/ser-helper.h
Expand Down
11 changes: 6 additions & 5 deletions examples/all-demos.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,12 @@ void DemoAllInOne::test_invoke_in_callback() {
int32_t r = agent.call("not-exists", empty, "blah", resp);
KLOGI(TAG, "invoke 'call' in callback function, return %d, excepted %d", r,
FLORA_CLI_EDEADLOCK);
r = agent.call("not-exists", empty, "blah", [&agent](int32_t code, Response &) {
KLOGI(TAG, "invoke async 'call' in callback, return %d, excepted %d",
code, FLORA_CLI_ENEXISTS);
agent.close();
});
r = agent.call(
"not-exists", empty, "blah", [&agent](int32_t code, Response &) {
KLOGI(TAG, "invoke async 'call' in callback, return %d, excepted %d",
code, FLORA_CLI_ENEXISTS);
agent.close();
});
});
agent.start();
shared_ptr<Caps> empty;
Expand Down
6 changes: 6 additions & 0 deletions include/flora-agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
// config(KEY, uint32_t flags, MonitorCallback *cb)
// flags: FLORA_CLI_FLAG_MONITOR_* (see flora-cli.h)
#define FLORA_AGENT_CONFIG_MONITOR 3
// config(KEY, uint32_t interval, uint32_t timeout)
// interval: interval of send beep packet
// timeout: timeout of flora service no response
#define FLORA_AGENT_CONFIG_KEEPALIVE 4

#ifdef __cplusplus

Expand Down Expand Up @@ -104,6 +108,8 @@ class Agent : public ClientCallback {
std::chrono::milliseconds(10000);
uint32_t flags = 0;
MonitorCallback *mon_callback = nullptr;
uint32_t beep_interval = FLORA_CLI_DEFAULT_BEEP_INTERVAL;
uint32_t noresp_timeout = FLORA_CLI_DEFAULT_NORESP_TIMEOUT;
};

Options options;
Expand Down
31 changes: 25 additions & 6 deletions include/flora-cli.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
#define FLORA_CLI_FLAG_MONITOR_DETAIL_DECL 0x4
#define FLORA_CLI_FLAG_MONITOR_DETAIL_POST 0x8
#define FLORA_CLI_FLAG_MONITOR_DETAIL_CALL 0x10
#define FLORA_CLI_FLAG_KEEPALIVE 0x20

#define FLORA_CLI_DEFAULT_BEEP_INTERVAL 50000
#define FLORA_CLI_DEFAULT_NORESP_TIMEOUT 100000

namespace flora {

Expand Down Expand Up @@ -68,6 +72,15 @@ class Reply {
class ClientCallback;
class MonitorCallback;

class ClientOptions {
public:
uint32_t bufsize = 0;
uint32_t flags = 0;
// effective when FLORA_CLI_FLAG_KEEPALIVE is set
uint32_t beep_interval = FLORA_CLI_DEFAULT_BEEP_INTERVAL;
uint32_t noresp_timeout = FLORA_CLI_DEFAULT_NORESP_TIMEOUT;
};

class Client {
public:
virtual ~Client() = default;
Expand Down Expand Up @@ -101,13 +114,9 @@ class Client {
uint32_t msg_buf_size,
std::shared_ptr<Client> &result);

static int32_t connect(const char *uri, MonitorCallback *cb,
uint32_t msg_buf_size, uint32_t flags,
std::shared_ptr<Client> &result);

static int32_t connect(const char *uri, ClientCallback *ccb,
MonitorCallback *mcb, uint32_t msg_buf_size,
uint32_t flags, std::shared_ptr<Client> &result);
MonitorCallback *mcb, ClientOptions *opts,
std::shared_ptr<Client> &result);
};

class ClientCallback {
Expand Down Expand Up @@ -291,6 +300,16 @@ int32_t flora_cli_connect2(const char *uri,
flora_cli_monitor_callback_t *callback, void *arg,
uint32_t msg_buf_size, flora_cli_t *result);

typedef struct {
uint32_t bufsize;
uint32_t flags;
uint32_t beep_interval;
uint32_t noresp_timeout;
} flora_cli_options;
int32_t flora_cli_connect3(const char *uri, flora_cli_callback_t *ccb,
flora_cli_monitor_callback_t *mcb, void *arg,
flora_cli_options *opts, flora_cli_t *result);

void flora_cli_delete(flora_cli_t handle);

int32_t flora_cli_subscribe(flora_cli_t handle, const char *name);
Expand Down
5 changes: 5 additions & 0 deletions include/flora-svc.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#define FLORA_POLL_INVAL -3
#define FLORA_POLL_UNSUPP -4

// options of 'config'
#define FLORA_POLL_OPT_KEEPALIVE_TIMEOUT 1

#define FLORA_DISP_FLAG_MONITOR 1

#ifdef __cplusplus
Expand Down Expand Up @@ -35,6 +38,8 @@ class Poll {

virtual void stop() = 0;

virtual void config(uint32_t opt, ...) = 0;

static std::shared_ptr<Poll> new_instance(const char *uri);
};

Expand Down
167 changes: 167 additions & 0 deletions src/beep-sock-poll.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#pragma once

#include "defs.h"
#include "rlog.h"
#include "sock-poll.h"
#include <chrono>
#include <memory>
#include <errno.h>
#include <stdarg.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

namespace flora {
namespace internal {

class ActiveAdapter {
public:
std::chrono::steady_clock::time_point lastest_action_tp;
std::shared_ptr<Adapter> adapter;
};
typedef std::list<ActiveAdapter> ActiveAdapterList;

class BeepSocketPoll : public SocketPoll {
public:
BeepSocketPoll(const std::string &host, int32_t port)
: SocketPoll(host, port) {}

void config(uint32_t opt, ...) {
va_list ap;
va_start(ap, opt);
switch (opt) {
case FLORA_POLL_OPT_KEEPALIVE_TIMEOUT:
options.beep_timeout = va_arg(ap, uint32_t);
break;
}
va_end(ap);
}

private:
int32_t do_poll(fd_set *rfds, int max_fd) {
int r;
struct timeval tv;
struct timeval *ptv;
std::chrono::steady_clock::time_point nowtp;

while (true) {
if (active_adapters.empty()) {
#ifdef SELECT_BLOCK_IF_FD_CLOSED
tv.tv_sec = 5;
tv.tv_usec = 0;
ptv = &tv;
#else
ptv = nullptr;
#endif
} else {
nowtp = std::chrono::steady_clock::now();
obtain_timeout(nowtp, &tv);
ptv = &tv;
}
r = select(max_fd, rfds, nullptr, nullptr, ptv);
if (r < 0) {
if (errno == EAGAIN) {
sleep(1);
continue;
}
KLOGE(TAG, "select failed: %s", strerror(errno));
}
nowtp = std::chrono::steady_clock::now();
shutdown_timeout_adapter(nowtp);
break;
}
return r;
}

std::shared_ptr<Adapter> do_accept(int lfd) {
auto adap = SocketPoll::do_accept(lfd);
if (adap != nullptr) {
auto it = active_adapters.emplace(active_adapters.end());
it->adapter = adap;
it->lastest_action_tp = std::chrono::steady_clock::now();
return adap;
}
return adap;
}

bool do_read(std::shared_ptr<Adapter> &adap) {
bool r = SocketPoll::do_read(adap);
auto it = active_adapters.begin();
while (it != active_adapters.end()) {
if (it->adapter == adap)
break;
++it;
}
if (it != active_adapters.end()) {
if (r) {
// move adapter to end of list
// adapter sorted by lastest_action_tp
it->lastest_action_tp = std::chrono::steady_clock::now();
if (it != active_adapters.rbegin().base())
active_adapters.splice(active_adapters.end(), active_adapters, it);
} else {
active_adapters.erase(it);
}
}
return r;
}

void obtain_timeout(std::chrono::steady_clock::time_point &nowtp,
struct timeval *tv) {
auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::milliseconds(options.beep_timeout) -
(nowtp - active_adapters.front().lastest_action_tp));
tv->tv_sec = dur.count() / 1000;
tv->tv_usec = (dur.count() % 1000) * 1000;
}

void shutdown_timeout_adapter(std::chrono::steady_clock::time_point &nowtp) {
#ifdef FLORA_DEBUG
std::chrono::steady_clock::time_point dnowtp = nowtp;
KLOGD(TAG, "before shutdown timeout adapters, timeout = %u",
options.beep_timeout);
print_active_adapters(dnowtp);
#endif
nowtp -= std::chrono::milliseconds(options.beep_timeout);
auto it = active_adapters.begin();
while (it != active_adapters.end()) {
if (nowtp >= it->lastest_action_tp) {
int fd = std::static_pointer_cast<SocketAdapter>(it->adapter)->socket();
KLOGW(TAG, "tcp socket %d keepalive timeout, shutdown!", fd);
::shutdown(fd, SHUT_RDWR);
it = active_adapters.erase(it);
continue;
}
break;
}
#ifdef FLORA_DEBUG
KLOGD(TAG, "after shutdown timeout adapters, timeout = %u",
options.beep_timeout);
print_active_adapters(dnowtp);
#endif
}

#ifdef FLORA_DEBUG
void print_active_adapters(std::chrono::steady_clock::time_point &nowtp) {
auto it = active_adapters.begin();
while (it != active_adapters.end()) {
KLOGD(TAG, "%p: %d", it->adapter.get(),
std::chrono::duration_cast<std::chrono::milliseconds>(
nowtp - it->lastest_action_tp)
.count());
++it;
}
}
#endif

private:
class Options {
public:
uint32_t beep_timeout = 60000;
};
ActiveAdapterList active_adapters;
Options options;
};

} // namespace internal
} // namespace flora
Loading

0 comments on commit 96e31b5

Please sign in to comment.