Skip to content

Commit

Permalink
Add option to mark CONNECTED only on initial sync
Browse files Browse the repository at this point in the history
Summary:
In Dsf we use SubscriptionState::CONNECTED to determine that we actually have all the state we need.

Previously, fsdb would serve heartbeats after serving initial sync so first chunk revieved would always be initial sync and we'd go into CONNECTED. But now heartbeats are served on a dedicated thread to avoid starvation, so we need to make sure the client doesnt prematurely mark CONNECTED.

Ideally we can always follow this behavior but actually switch agent relies on the reverse (CONNECTED to mean any chunk came through). For now introducing a flag to allow both behaviors, perhaps we can introduce a new state SYNCED to differentiate

Differential Revision: D63554533

fbshipit-source-id: d260d48b1d7dea4b02a7c5caed6fd77d11222456
  • Loading branch information
Peyman Gardideh authored and facebook-github-bot committed Oct 8, 2024
1 parent 8ceca8e commit f87e048
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 8 deletions.
8 changes: 6 additions & 2 deletions fboss/fsdb/client/FsdbDeltaSubscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,17 @@ FsdbDeltaSubscriberImpl<SubUnit, PathElement>::serveStream(StreamT&& stream) {
XLOG(DBG2) << " Detected cancellation: " << this->clientId();
break;
}
// even empty change/heartbeat indicates subscription is connected
if (this->getSubscriptionState() != SubscriptionState::CONNECTED) {
if (!this->subscriptionOptions().requireInitialSyncToMarkConnect_ &&
this->getSubscriptionState() != SubscriptionState::CONNECTED) {
BaseT::updateSubscriptionState(SubscriptionState::CONNECTED);
}
if (!delta->changes()->size()) {
continue;
}
if (this->subscriptionOptions().requireInitialSyncToMarkConnect_ &&
this->getSubscriptionState() != SubscriptionState::CONNECTED) {
BaseT::updateSubscriptionState(SubscriptionState::CONNECTED);
}
SubUnitT tmp(*delta);
this->operSubUnitUpdate_(std::move(tmp));
}
Expand Down
8 changes: 6 additions & 2 deletions fboss/fsdb/client/FsdbPatchSubscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ FsdbPatchSubscriberImpl<MessageType, SubUnit, PathElement>::serveStream(
XLOG(DBG2) << " Detected cancellation: " << this->clientId();
break;
}
// even empty change/heartbeat indicates subscription is connected
if (this->getSubscriptionState() != SubscriptionState::CONNECTED) {
if (!this->subscriptionOptions().requireInitialSyncToMarkConnect_ &&
this->getSubscriptionState() != SubscriptionState::CONNECTED) {
BaseT::updateSubscriptionState(SubscriptionState::CONNECTED);
}
switch (message->getType()) {
case SubscriberMessage::Type::chunk:
if (this->subscriptionOptions().requireInitialSyncToMarkConnect_ &&
this->getSubscriptionState() != SubscriptionState::CONNECTED) {
BaseT::updateSubscriptionState(SubscriptionState::CONNECTED);
}
this->operSubUnitUpdate_(message->move_chunk());
break;
case SubscriberMessage::Type::heartbeat:
Expand Down
8 changes: 6 additions & 2 deletions fboss/fsdb/client/FsdbStateSubscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ FsdbStateSubscriberImpl<SubUnit, PathElement>::serveStream(StreamT&& stream) {
XLOG(DBG2) << " Detected cancellation: " << this->clientId();
break;
}
// even empty change/heartbeat indicates subscription is connected
if (this->getSubscriptionState() != SubscriptionState::CONNECTED) {
if (!this->subscriptionOptions().requireInitialSyncToMarkConnect_ &&
this->getSubscriptionState() != SubscriptionState::CONNECTED) {
BaseT::updateSubscriptionState(SubscriptionState::CONNECTED);
}
if constexpr (std::is_same_v<SubUnitT, OperState>) {
Expand All @@ -58,6 +58,10 @@ FsdbStateSubscriberImpl<SubUnit, PathElement>::serveStream(StreamT&& stream) {
continue;
}
}
if (this->subscriptionOptions().requireInitialSyncToMarkConnect_ &&
this->getSubscriptionState() != SubscriptionState::CONNECTED) {
BaseT::updateSubscriptionState(SubscriptionState::CONNECTED);
}
SubUnitT tmp(*state);
this->operSubUnitUpdate_(std::move(tmp));
}
Expand Down
12 changes: 10 additions & 2 deletions fboss/fsdb/client/FsdbSubscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,18 @@ struct SubscriptionOptions {
explicit SubscriptionOptions(
const std::string& clientId,
bool subscribeStats = false,
uint32_t grHoldTimeSec = 0)
uint32_t grHoldTimeSec = 0,
// only mark subscription as CONNECTED on initial sync
bool requireInitialSyncToMarkConnect = false)
: clientId_(clientId),
subscribeStats_(subscribeStats),
grHoldTimeSec_(grHoldTimeSec) {}
grHoldTimeSec_(grHoldTimeSec),
requireInitialSyncToMarkConnect_(requireInitialSyncToMarkConnect) {}

const std::string clientId_;
bool subscribeStats_{false};
uint32_t grHoldTimeSec_{0};
bool requireInitialSyncToMarkConnect_{false};
};

struct SubscriptionInfo {
Expand Down Expand Up @@ -283,6 +287,10 @@ class FsdbSubscriber : public FsdbSubscriberBase {
return subscribePaths_;
}

const SubscriptionOptions& subscriptionOptions() const {
return subscriptionOptions_;
}

FsdbSubUnitUpdateCb operSubUnitUpdate_;

private:
Expand Down

0 comments on commit f87e048

Please sign in to comment.