Skip to content

Commit

Permalink
fix memory leak: collect subscription gabages
Browse files Browse the repository at this point in the history
  • Loading branch information
张晨 authored and 张晨 committed Aug 5, 2019
1 parent 4d0940e commit cbde4d6
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#define MONITOR_SUBTYPE_NUM 11

#define DEFAULT_MSG_BUF_SIZE 32768
#define CLEAR_SUBSCRIPTION_TIME_THRESHOLD 10
#define CLEAR_SUBSCRIPTION_THRESHOLD 50

#ifdef __APPLE__
#define SELECT_BLOCK_IF_FD_CLOSED
Expand Down
85 changes: 66 additions & 19 deletions src/disp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void Dispatcher::handle_cmd(shared_ptr<Caps> &msg_caps,
}
if (!(this->*(msg_handlers[cmd]))(msg_caps, sender))
sender->close();
handle_cmd_tp = steady_clock::now();
}

void Dispatcher::pending_call_timeout(PendingCall &pc) {
Expand Down Expand Up @@ -240,6 +241,42 @@ void Dispatcher::do_erase_adapter(shared_ptr<Adapter> &sender) {
write_monitor_list_remove(sender->info->id);
adapter_infos.erase(reinterpret_cast<intptr_t>(sender.get()));
}
check_subscriptions();
}

void Dispatcher::check_subscriptions() {
auto nowtp = steady_clock::now();
if (duration_cast<seconds>(nowtp - handle_cmd_tp).count() >=
CLEAR_SUBSCRIPTION_TIME_THRESHOLD) {
clear_sub_gabages();
clear_sub_prio = 0;
handle_cmd_tp = nowtp;
return;
}
++clear_sub_prio;
if (clear_sub_prio >= CLEAR_SUBSCRIPTION_THRESHOLD) {
clear_sub_gabages();
clear_sub_prio = 0;
handle_cmd_tp = nowtp;
}
}

void Dispatcher::clear_sub_gabages() {
auto it = subscriptions.begin();
while (it != subscriptions.end()) {
auto ait = it->second.begin();
while (ait != it->second.end()) {
auto adap = ait->lock();
if (adap == nullptr)
ait = it->second.erase(ait);
else
++ait;
}
if (it->second.empty())
it = subscriptions.erase(it);
else
++it;
}
}

bool Dispatcher::handle_subscribe_req(shared_ptr<Caps> &msg_caps,
Expand All @@ -255,7 +292,8 @@ bool Dispatcher::handle_subscribe_req(shared_ptr<Caps> &msg_caps,
AdapterList &adapters = subscriptions[name];
AdapterList::iterator it;
for (it = adapters.begin(); it != adapters.end(); ++it) {
if ((*it).get() == sender.get())
auto adap = it->lock();
if (adap.get() == sender.get())
return true;
}
adapters.push_back(sender);
Expand Down Expand Up @@ -289,13 +327,18 @@ bool Dispatcher::handle_unsubscribe_req(shared_ptr<Caps> &msg_caps,
name.c_str());
if (name.length() == 0)
return false;
AdapterList &adapters = subscriptions[name];
AdapterList::iterator it;
for (it = adapters.begin(); it != adapters.end(); ++it) {
if ((*it).get() == sender.get()) {
adapters.erase(it);
break;
auto it = subscriptions.find(name);
if (it != subscriptions.end()) {
auto ait = it->second.begin();
while (ait != it->second.end()) {
auto adap = ait->lock();
if (adap == nullptr || adap.get() == sender.get())
ait = it->second.erase(ait);
else
++ait;
}
if (it->second.empty())
subscriptions.erase(it);
}
return true;
}
Expand Down Expand Up @@ -353,19 +396,18 @@ bool Dispatcher::post_msg(const string &name, uint32_t type,

SubscriptionMap::iterator sit;
sit = subscriptions.find(name);
if (sit != subscriptions.end() && !sit->second.empty()) {
if (sit != subscriptions.end()) {
AdapterList nobo_adapters; // no net byteorder
AdapterList bo_adapters; // net byteorder
AdapterList::iterator ait;
ait = sit->second.begin();
while (ait != sit->second.end()) {
if ((*ait)->closed()) {
AdapterList::iterator dit = ait;
++ait;
sit->second.erase(dit);
auto adap = ait->lock();
if (adap == nullptr || adap->closed()) {
ait = sit->second.erase(ait);
continue;
}
if ((*ait)->serialize_flags == CAPS_FLAG_NET_BYTEORDER) {
if (adap->serialize_flags == CAPS_FLAG_NET_BYTEORDER) {
bo_adapters.push_back(*ait);
} else {
nobo_adapters.push_back(*ait);
Expand All @@ -379,6 +421,8 @@ bool Dispatcher::post_msg(const string &name, uint32_t type,
write_post_msg_to_adapters(name, type, args, sender->tag,
CAPS_FLAG_NET_BYTEORDER, bo_adapters,
cli_name);
if (sit->second.empty())
subscriptions.erase(sit);
}

if (type == FLORA_MSGTYPE_PERSIST) {
Expand All @@ -398,12 +442,15 @@ void Dispatcher::write_post_msg_to_adapters(
AdapterList::iterator ait;
ait = adapters.begin();
while (ait != adapters.end()) {
KLOGI(TAG, "%s >>> %s: post %u..%s", sender_name,
(*ait)->info->name.c_str(), type, name.c_str());
if ((*ait)->write(buffer, c) == -2) {
KLOGW(FILE_TAG, "write timeout: post msg, [0x%llx]%s >>> [0x%llx]%s",
tag, sender_name, (*ait)->tag,
(*ait)->info ? (*ait)->info->name.c_str() : "");
auto adap = ait->lock();
if (adap != nullptr) {
KLOGI(TAG, "%s >>> %s: post %u..%s", sender_name,
adap->info->name.c_str(), type, name.c_str());
if (adap->write(buffer, c) == -2) {
KLOGW(FILE_TAG, "write timeout: post msg, [0x%llx]%s >>> [0x%llx]%s",
tag, sender_name, adap->tag,
adap->info ? adap->info->name.c_str() : "");
}
}
++ait;
}
Expand Down
13 changes: 12 additions & 1 deletion src/disp.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
namespace flora {
namespace internal {

typedef std::list<std::shared_ptr<Adapter>> AdapterList;
typedef std::list<std::weak_ptr<Adapter>> AdapterList;
typedef std::map<std::string, AdapterList> SubscriptionMap;
typedef struct {
std::shared_ptr<Caps> data;
Expand Down Expand Up @@ -121,6 +121,10 @@ class Dispatcher : public flora::Dispatcher {

void write_monitor_list_remove(uint32_t id);

void check_subscriptions();

void clear_sub_gabages();

private:
SubscriptionMap subscriptions;
PersistMsgMap persist_msgs;
Expand All @@ -136,6 +140,13 @@ class Dispatcher : public flora::Dispatcher {
AdapterInfoMap adapter_infos;
AdapterInfoMap monitors;
uint32_t flags;
// if now timepoint more than handle_cmd_tp 10 seconds when erase adapter
// traversal map of subscriptions, clear gabages
std::chrono::steady_clock::time_point handle_cmd_tp;
// the variable +1 for each erased adapter
// if the variable >= CLEAR_SUBSCRIPTION_THRESHOLD,
// traversal map of subscriptions, clear gabages
uint32_t clear_sub_prio{0};
bool working = false;

static bool (Dispatcher::*msg_handlers[MSG_HANDLER_COUNT])(
Expand Down

0 comments on commit cbde4d6

Please sign in to comment.