diff --git a/openr/common/Constants.cpp b/openr/common/Constants.cpp index d5867446236..72e19cf8f71 100644 --- a/openr/common/Constants.cpp +++ b/openr/common/Constants.cpp @@ -72,6 +72,7 @@ constexpr std::chrono::milliseconds Constants::kServiceProcTimeout; constexpr std::chrono::milliseconds Constants::kTtlDecrement; constexpr std::chrono::milliseconds Constants::kTtlInfInterval; constexpr std::chrono::milliseconds Constants::kTtlThreshold; +constexpr std::chrono::milliseconds Constants::kLongPollReqHoldTime; constexpr std::chrono::seconds Constants::kConvergenceMaxDuration; constexpr std::chrono::seconds Constants::kKeepAliveIntvl; constexpr std::chrono::seconds Constants::kKeepAliveTime; diff --git a/openr/common/Constants.h b/openr/common/Constants.h index 5da86ce56d3..8658b721d5d 100644 --- a/openr/common/Constants.h +++ b/openr/common/Constants.h @@ -235,6 +235,9 @@ class Constants { static constexpr uint16_t kPerfBufferSize{10}; static constexpr std::chrono::seconds kConvergenceMaxDuration{3s}; + // hold time for longPoll requests in openrCtrl thrift server + static constexpr std::chrono::milliseconds kLongPollReqHoldTime{20000}; + // // Prefix manager specific // diff --git a/openr/ctrl-server/OpenrCtrlHandler.cpp b/openr/ctrl-server/OpenrCtrlHandler.cpp index 6b20207cc51..66723586c20 100644 --- a/openr/ctrl-server/OpenrCtrlHandler.cpp +++ b/openr/ctrl-server/OpenrCtrlHandler.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -76,6 +77,62 @@ OpenrCtrlHandler::OpenrCtrlHandler( kv.second.next(maybePublication.value()); } } + + bool isAdjChanged = false; + // check if any of KeyVal has 'adj' update + for (auto& kv : maybePublication.value().keyVals) { + auto& key = kv.first; + auto& val = kv.second; + // check if we have any value update. + // Ttl refreshing won't update any value. + if (!val.value.hasValue()) { + continue; + } + + // "adj:*" key has changed. Update local collection + if (key.find(Constants::kAdjDbMarker.toString()) == 0) { + VLOG(3) << "Adj key: " << key << " change received"; + isAdjChanged = true; + break; + } + } + + if (isAdjChanged) { + // thrift::Publication contains "adj:*" key change. + // Clean ALL pending promises + longPollReqs_.withWLock([&](auto& longPollReqs) { + for (auto& kv : longPollReqs) { + auto& p = kv.second.first; + p.setValue(true); + } + longPollReqs.clear(); + }); + } else { + longPollReqs_.withWLock([&](auto& longPollReqs) { + auto now = getUnixTimeStampMs(); + std::vector reqsToClean; + for (auto& kv : longPollReqs) { + auto& clientId = kv.first; + auto& req = kv.second; + + auto& p = req.first; + auto& timeStamp = req.second; + if (now - timeStamp >= + Constants::kLongPollReqHoldTime.count()) { + LOG(INFO) << "Elapsed time: " << now - timeStamp + << " is over hold limit: " + << Constants::kLongPollReqHoldTime.count(); + reqsToClean.emplace_back(clientId); + p.setException(thrift::OpenrError("Request timed out")); + } + } + + // cleanup expired requests since no ADJ change observed + for (auto& clientId : reqsToClean) { + longPollReqs.erase(clientId); + } + }); + } }); }); @@ -122,6 +179,9 @@ OpenrCtrlHandler::~OpenrCtrlHandler() { for (auto& publisher : publishers) { std::move(publisher).complete(); } + + LOG(INFO) << "Cleanup all pending request(s)."; + longPollReqs_.withWLock([&](auto& longPollReqs) { longPollReqs.clear(); }); } void @@ -611,6 +671,46 @@ OpenrCtrlHandler::semifuture_getKvStoreHashFiltered( return p.getSemiFuture(); } +folly::SemiFuture +OpenrCtrlHandler::semifuture_longPollKvStoreAdj( + std::unique_ptr snapshot) { + folly::Promise p; + folly::SemiFuture sf = p.getSemiFuture(); + + auto timeStamp = getUnixTimeStampMs(); + auto requestId = pendingRequestId_++; + + thrift::KvStoreRequest request; + thrift::KeyDumpParams params; + + // Only care about "adj:" key + params.prefix = Constants::kAdjDbMarker; + // Only dump difference between KvStore and client snapshot + params.keyValHashes = std::move(*snapshot); + request.cmd = thrift::Command::KEY_DUMP; + request.keyDumpParams = std::move(params); + + auto reply = requestReplyThrift( + thrift::OpenrModuleType::KVSTORE, std::move(request)); + if (reply.hasError()) { + p.setException(thrift::OpenrError(reply.error().errString)); + } else if ( + reply->keyVals.size() or + (reply->tobeUpdatedKeys.hasValue() and + reply->tobeUpdatedKeys.value().size())) { + VLOG(3) << "AdjKey hash change. Notify immediately"; + p.setValue(true); + } else { + // Client provided data is consistent with KvStore. + // Store req for future processing when there is publication + // from KvStore. + longPollReqs_.withWLock([&](auto& longPollReq) { + longPollReq.emplace(requestId, std::make_pair(std::move(p), timeStamp)); + }); + } + return sf; +} + folly::SemiFuture OpenrCtrlHandler::semifuture_setKvStoreKeyVals( std::unique_ptr setParams, diff --git a/openr/ctrl-server/OpenrCtrlHandler.h b/openr/ctrl-server/OpenrCtrlHandler.h index 4db42986490..544aa29cb66 100644 --- a/openr/ctrl-server/OpenrCtrlHandler.h +++ b/openr/ctrl-server/OpenrCtrlHandler.h @@ -179,6 +179,10 @@ class OpenrCtrlHandler final : public thrift::OpenrCtrlCppSvIf, thrift::Publication>> semifuture_subscribeAndGetKvStore() override; + // Long poll support + folly::SemiFuture semifuture_longPollKvStoreAdj( + std::unique_ptr snapshot) override; + // // LinkMonitor APIs // @@ -231,11 +235,24 @@ class OpenrCtrlHandler final : public thrift::OpenrCtrlCppSvIf, // // APIs to expose state of private variables // - size_t + inline size_t getNumKvStorePublishers() { return kvStorePublishers_->size(); } + inline size_t + getNumPendingLongPollReqs() { + return longPollReqs_->size(); + } + + // + // API to cleanup private variables + // + inline void + cleanupPendingLongPollReqs() { + longPollReqs_->clear(); + } + private: // For oneway requests, empty message will be returned immediately folly::Expected requestReplyMessage( @@ -250,6 +267,7 @@ class OpenrCtrlHandler final : public thrift::OpenrCtrlCppSvIf, thrift::OpenrModuleType module, InputType&& request, bool oneway); void authorizeConnection(); + const std::string nodeName_; const std::unordered_set acceptablePeerCommonNames_; std::unordered_map> @@ -275,5 +293,12 @@ class OpenrCtrlHandler final : public thrift::OpenrCtrlCppSvIf, apache::thrift::StreamPublisher>> kvStorePublishers_; + // pending longPoll requests from clients, which consists of + // 1). promise; 2). timestamp when req received on server + std::atomic pendingRequestId_{0}; + folly::Synchronized< + std::unordered_map, int64_t>>> + longPollReqs_; + }; // class OpenrCtrlHandler } // namespace openr diff --git a/openr/ctrl-server/tests/OpenrCtrlLongPollTest.cpp b/openr/ctrl-server/tests/OpenrCtrlLongPollTest.cpp new file mode 100644 index 00000000000..2e14b33d855 --- /dev/null +++ b/openr/ctrl-server/tests/OpenrCtrlLongPollTest.cpp @@ -0,0 +1,246 @@ +/** + * Copyright (c) 2014-present, Facebook, Inc. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace openr; + +class LongPollFixture : public ::testing::Test { + void + SetUp() override { + // Create KvStore module + kvStoreWrapper_ = std::make_unique( + context_, + nodeName_, + std::chrono::seconds(60), + std::chrono::seconds(600), + std::unordered_map()); + kvStoreWrapper_->run(); + + // spin up an openrThriftServer + openrThriftServerWrapper_ = std::make_shared( + nodeName_, + MonitorSubmitUrl{"inproc://monitor-submit-url"}, + KvStoreLocalPubUrl{kvStoreWrapper_->localPubUrl}, + context_); + + // add module into thriftServer module map + openrThriftServerWrapper_->addModuleType( + thrift::OpenrModuleType::KVSTORE, kvStoreWrapper_->getKvStore()); + openrThriftServerWrapper_->run(); + + // initialize openrCtrlClient talking to server + openrCtrlThriftClient_ = + getOpenrCtrlPlainTextClient( + evb_, + folly::IPAddress("::1"), + openrThriftServerWrapper_->getOpenrCtrlThriftPort()); + } + + void + TearDown() override { + openrCtrlThriftClient_.reset(); + openrThriftServerWrapper_->stop(); + kvStoreWrapper_->stop(); + } + + private: + fbzmq::Context context_; + folly::EventBase evb_; + + public: + const std::string nodeName_{"Valar-Morghulis"}; + const std::string adjKey_ = folly::sformat("adj:{}", nodeName_); + const std::string prefixKey_ = folly::sformat("prefix:{}", nodeName_); + + fbzmq::ZmqEventLoop evl_; + std::unique_ptr kvStoreWrapper_; + std::shared_ptr openrThriftServerWrapper_{nullptr}; + std::unique_ptr + openrCtrlThriftClient_{nullptr}; +}; + +TEST_F(LongPollFixture, LongPollSuccess) { + // + // This UT mimicks the basic functionality of long poll API to make sure + // server will return to client if there is "adj:" key change received. + // + bool isAdjChanged = false; + bool isTimeout = false; + std::chrono::steady_clock::time_point startTime; + std::chrono::steady_clock::time_point endTime; + + // mimick there is a new publication from kvstore + evl_.scheduleTimeout(std::chrono::milliseconds(5000), [&]() noexcept { + LOG(INFO) << "AdjKey set..."; + // catch up the time + startTime = std::chrono::steady_clock::now(); + kvStoreWrapper_->setKey( + adjKey_, createThriftValue(1, nodeName_, std::string("value1"))); + + // stop the evl + evl_.stop(); + }); + + // start eventloop + std::thread evlThread([&]() { evl_.run(); }); + evl_.waitUntilRunning(); + + // client starts to do long-poll + try { + // By default, the processing timeout value for client is 10s. + LOG(INFO) << "Start long poll..."; + thrift::KeyVals snapshot; + isAdjChanged = openrCtrlThriftClient_->sync_longPollKvStoreAdj(snapshot); + endTime = std::chrono::steady_clock::now(); + LOG(INFO) << "Finished long poll..."; + } catch (std::exception& ex) { + LOG(INFO) << "Exception happened: " << folly::exceptionStr(ex); + isTimeout = true; + } + + // make sure we are receiving update and NOT timed out + ASSERT_FALSE(isTimeout); + // make sure when there is publication, processing delay is less than 50ms + ASSERT_LE(endTime - startTime, std::chrono::milliseconds(50)); + ASSERT_TRUE(isAdjChanged); + + // wait for evl before cleanup + evl_.waitUntilStopped(); + evlThread.join(); +} + +TEST_F(LongPollFixture, LongPollTimeout) { + // + // This UT mimicks the scenario there is a client side timeout since + // there is NOT "adj:" key change. + // + bool isTimeout = false; + bool isAdjChanged = false; + + // mimick there is a new publication from kvstore + evl_.scheduleTimeout(std::chrono::milliseconds(5000), [&]() noexcept { + LOG(INFO) << "Prefix key set..."; + kvStoreWrapper_->setKey( + prefixKey_, createThriftValue(1, nodeName_, std::string("value1"))); + + // stop the evl + evl_.stop(); + }); + + // start eventloop + std::thread evlThread([&]() { evl_.run(); }); + evl_.waitUntilRunning(); + + // client starts to do long-poll + try { + // By default, the processing timeout value for client is 10s. + LOG(INFO) << "Start long poll..."; + thrift::KeyVals snapshot; + isAdjChanged = openrCtrlThriftClient_->sync_longPollKvStoreAdj(snapshot); + } catch (std::exception& ex) { + LOG(INFO) << "Exception happened: " << folly::exceptionStr(ex); + isTimeout = true; + } + + // Client timeout and nothing received + ASSERT_TRUE(isTimeout); + ASSERT_FALSE(isAdjChanged); + + // Explicitly cleanup pending longPollReq + openrThriftServerWrapper_->getOpenrCtrlHandler() + ->cleanupPendingLongPollReqs(); + + // wait for evl before cleanup + evl_.waitUntilStopped(); + evlThread.join(); +} + +TEST_F(LongPollFixture, LongPollPendingAdj) { + // + // Test1: mimicks the scenario that before client send req. There is already + // "adj:" key published before client subscribe. Should push immediately. + // + bool isTimeout = false; + bool isAdjChanged = false; + std::chrono::steady_clock::time_point startTime; + std::chrono::steady_clock::time_point endTime; + + // inject key to kvstore and openrCtrlThriftServer should have adj key + kvStoreWrapper_->setKey( + adjKey_, createThriftValue(1, nodeName_, std::string("value1"))); + + try { + // mimicking scenario that server has different value for the same key + thrift::KeyVals snapshot; + snapshot.emplace( + adjKey_, createThriftValue(2, "Valar-Dohaeris", std::string("value1"))); + + // By default, the processing timeout value for client is 10s. + LOG(INFO) << "Start long poll..."; + startTime = std::chrono::steady_clock::now(); + isAdjChanged = openrCtrlThriftClient_->sync_longPollKvStoreAdj(snapshot); + endTime = std::chrono::steady_clock::now(); + LOG(INFO) << "Finished long poll..."; + } catch (std::exception& ex) { + LOG(INFO) << "Exception happened: " << folly::exceptionStr(ex); + isTimeout = true; + } + + ASSERT_TRUE(isAdjChanged); + ASSERT_FALSE(isTimeout); + + // + // Test2: mimicks the scenario that client already hold the same adj key. + // Server will NOT push notification since there is no diff. + // + isTimeout = false; + isAdjChanged = false; + try { + thrift::KeyVals snapshot; + snapshot.emplace( + adjKey_, createThriftValue(1, nodeName_, std::string("value1"))); + + // By default, the processing timeout value for client is 10s. + LOG(INFO) << "Start long poll..."; + startTime = std::chrono::steady_clock::now(); + isAdjChanged = openrCtrlThriftClient_->sync_longPollKvStoreAdj(snapshot); + endTime = std::chrono::steady_clock::now(); + LOG(INFO) << "Finished long poll..."; + } catch (std::exception& ex) { + LOG(INFO) << "Exception happened: " << folly::exceptionStr(ex); + isTimeout = true; + } + + ASSERT_FALSE(isAdjChanged); + ASSERT_TRUE(isTimeout); + + // Explicitly cleanup pending longPollReq + openrThriftServerWrapper_->getOpenrCtrlHandler() + ->cleanupPendingLongPollReqs(); +} + +int +main(int argc, char* argv[]) { + // Parse command line flags + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv); + FLAGS_logtostderr = true; + + // Run the tests + return RUN_ALL_TESTS(); +} diff --git a/openr/if/OpenrCtrl.thrift b/openr/if/OpenrCtrl.thrift index 5ab02357a5e..1e5d444ff64 100644 --- a/openr/if/OpenrCtrl.thrift +++ b/openr/if/OpenrCtrl.thrift @@ -172,7 +172,7 @@ service OpenrCtrl extends fb303.FacebookService { * Get raw key-values from KvStore with more control over filter */ KvStore.Publication getKvStoreKeyValsFiltered(1: KvStore.KeyDumpParams filter) - throws (1: OpenrError errror) + throws (1: OpenrError error) /** * Get kvstore metadata (no values) with filter @@ -193,6 +193,13 @@ service OpenrCtrl extends fb303.FacebookService { 2: string area ) + /** + * Long poll API to get KvStore + * Will return true/false with our own KeyVal snapshot provided + */ + bool longPollKvStoreAdj(1: KvStore.KeyVals snapshot) + throws (1: OpenrError error) + /** * Send Dual message */