Skip to content

Commit

Permalink
Add support in OpenrCtrl server for longPollAdj API (25/N)
Browse files Browse the repository at this point in the history
Summary:
As per discussion, Ebb will use longPoll API to mimick the server side "PUSH" behavior for any "adj:" key change.

This diff contains:
1). thrift API definition;
2). server side handler logic to handle client requests and return when kvStore updates received;
3). lazy cleanup when there is KvStore publication happened. Cleanup will be based on timestamp.
4). UT to mimick client side receive/timeout cases;

Reviewed By: saifhhasan

Differential Revision: D17817739

fbshipit-source-id: 3d19134ed97ed0f8d648098a95902150455fd2b6
  • Loading branch information
xiangxu1121 authored and facebook-github-bot committed Oct 19, 2019
1 parent 93649bf commit afa42e3
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 2 deletions.
1 change: 1 addition & 0 deletions openr/common/Constants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ constexpr std::chrono::milliseconds Constants::kServiceProcTimeout;
constexpr std::chrono::milliseconds Constants::kTtlDecrement;
constexpr std::chrono::milliseconds Constants::kTtlInfInterval;
constexpr std::chrono::milliseconds Constants::kTtlThreshold;
constexpr std::chrono::milliseconds Constants::kLongPollReqHoldTime;
constexpr std::chrono::seconds Constants::kConvergenceMaxDuration;
constexpr std::chrono::seconds Constants::kKeepAliveIntvl;
constexpr std::chrono::seconds Constants::kKeepAliveTime;
Expand Down
3 changes: 3 additions & 0 deletions openr/common/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ class Constants {
static constexpr uint16_t kPerfBufferSize{10};
static constexpr std::chrono::seconds kConvergenceMaxDuration{3s};

// hold time for longPoll requests in openrCtrl thrift server
static constexpr std::chrono::milliseconds kLongPollReqHoldTime{20000};

//
// Prefix manager specific
//
Expand Down
100 changes: 100 additions & 0 deletions openr/ctrl-server/OpenrCtrlHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <thrift/lib/cpp2/server/ThriftServer.h>

#include <openr/common/Constants.h>
#include <openr/common/Util.h>
#include <openr/if/gen-cpp2/PersistentStore_types.h>
#include <openr/if/gen-cpp2/PrefixManager_types.h>

Expand Down Expand Up @@ -76,6 +77,62 @@ OpenrCtrlHandler::OpenrCtrlHandler(
kv.second.next(maybePublication.value());
}
}

bool isAdjChanged = false;
// check if any of KeyVal has 'adj' update
for (auto& kv : maybePublication.value().keyVals) {
auto& key = kv.first;
auto& val = kv.second;
// check if we have any value update.
// Ttl refreshing won't update any value.
if (!val.value.hasValue()) {
continue;
}

// "adj:*" key has changed. Update local collection
if (key.find(Constants::kAdjDbMarker.toString()) == 0) {
VLOG(3) << "Adj key: " << key << " change received";
isAdjChanged = true;
break;
}
}

if (isAdjChanged) {
// thrift::Publication contains "adj:*" key change.
// Clean ALL pending promises
longPollReqs_.withWLock([&](auto& longPollReqs) {
for (auto& kv : longPollReqs) {
auto& p = kv.second.first;
p.setValue(true);
}
longPollReqs.clear();
});
} else {
longPollReqs_.withWLock([&](auto& longPollReqs) {
auto now = getUnixTimeStampMs();
std::vector<int64_t> reqsToClean;
for (auto& kv : longPollReqs) {
auto& clientId = kv.first;
auto& req = kv.second;

auto& p = req.first;
auto& timeStamp = req.second;
if (now - timeStamp >=
Constants::kLongPollReqHoldTime.count()) {
LOG(INFO) << "Elapsed time: " << now - timeStamp
<< " is over hold limit: "
<< Constants::kLongPollReqHoldTime.count();
reqsToClean.emplace_back(clientId);
p.setException(thrift::OpenrError("Request timed out"));
}
}

// cleanup expired requests since no ADJ change observed
for (auto& clientId : reqsToClean) {
longPollReqs.erase(clientId);
}
});
}
});
});

Expand Down Expand Up @@ -122,6 +179,9 @@ OpenrCtrlHandler::~OpenrCtrlHandler() {
for (auto& publisher : publishers) {
std::move(publisher).complete();
}

LOG(INFO) << "Cleanup all pending request(s).";
longPollReqs_.withWLock([&](auto& longPollReqs) { longPollReqs.clear(); });
}

void
Expand Down Expand Up @@ -611,6 +671,46 @@ OpenrCtrlHandler::semifuture_getKvStoreHashFiltered(
return p.getSemiFuture();
}

folly::SemiFuture<bool>
OpenrCtrlHandler::semifuture_longPollKvStoreAdj(
std::unique_ptr<thrift::KeyVals> snapshot) {
folly::Promise<bool> p;
folly::SemiFuture<bool> sf = p.getSemiFuture();

auto timeStamp = getUnixTimeStampMs();
auto requestId = pendingRequestId_++;

thrift::KvStoreRequest request;
thrift::KeyDumpParams params;

// Only care about "adj:" key
params.prefix = Constants::kAdjDbMarker;
// Only dump difference between KvStore and client snapshot
params.keyValHashes = std::move(*snapshot);
request.cmd = thrift::Command::KEY_DUMP;
request.keyDumpParams = std::move(params);

auto reply = requestReplyThrift<thrift::Publication>(
thrift::OpenrModuleType::KVSTORE, std::move(request));
if (reply.hasError()) {
p.setException(thrift::OpenrError(reply.error().errString));
} else if (
reply->keyVals.size() or
(reply->tobeUpdatedKeys.hasValue() and
reply->tobeUpdatedKeys.value().size())) {
VLOG(3) << "AdjKey hash change. Notify immediately";
p.setValue(true);
} else {
// Client provided data is consistent with KvStore.
// Store req for future processing when there is publication
// from KvStore.
longPollReqs_.withWLock([&](auto& longPollReq) {
longPollReq.emplace(requestId, std::make_pair(std::move(p), timeStamp));
});
}
return sf;
}

folly::SemiFuture<folly::Unit>
OpenrCtrlHandler::semifuture_setKvStoreKeyVals(
std::unique_ptr<thrift::KeySetParams> setParams,
Expand Down
27 changes: 26 additions & 1 deletion openr/ctrl-server/OpenrCtrlHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ class OpenrCtrlHandler final : public thrift::OpenrCtrlCppSvIf,
thrift::Publication>>
semifuture_subscribeAndGetKvStore() override;

// Long poll support
folly::SemiFuture<bool> semifuture_longPollKvStoreAdj(
std::unique_ptr<thrift::KeyVals> snapshot) override;

//
// LinkMonitor APIs
//
Expand Down Expand Up @@ -231,11 +235,24 @@ class OpenrCtrlHandler final : public thrift::OpenrCtrlCppSvIf,
//
// APIs to expose state of private variables
//
size_t
inline size_t
getNumKvStorePublishers() {
return kvStorePublishers_->size();
}

inline size_t
getNumPendingLongPollReqs() {
return longPollReqs_->size();
}

//
// API to cleanup private variables
//
inline void
cleanupPendingLongPollReqs() {
longPollReqs_->clear();
}

private:
// For oneway requests, empty message will be returned immediately
folly::Expected<fbzmq::Message, fbzmq::Error> requestReplyMessage(
Expand All @@ -250,6 +267,7 @@ class OpenrCtrlHandler final : public thrift::OpenrCtrlCppSvIf,
thrift::OpenrModuleType module, InputType&& request, bool oneway);

void authorizeConnection();

const std::string nodeName_;
const std::unordered_set<std::string> acceptablePeerCommonNames_;
std::unordered_map<thrift::OpenrModuleType, std::shared_ptr<OpenrEventLoop>>
Expand All @@ -275,5 +293,12 @@ class OpenrCtrlHandler final : public thrift::OpenrCtrlCppSvIf,
apache::thrift::StreamPublisher<thrift::Publication>>>
kvStorePublishers_;

// pending longPoll requests from clients, which consists of
// 1). promise; 2). timestamp when req received on server
std::atomic<int64_t> pendingRequestId_{0};
folly::Synchronized<
std::unordered_map<int64_t, std::pair<folly::Promise<bool>, int64_t>>>
longPollReqs_;

}; // class OpenrCtrlHandler
} // namespace openr
Loading

0 comments on commit afa42e3

Please sign in to comment.