Skip to content

Commit

Permalink
Fix VDisk replication token handling, add some extra checks and log p…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Oct 14, 2024
1 parent 39173fc commit 2414fb7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
25 changes: 24 additions & 1 deletion ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ namespace NKikimr {
TEvResumeForce *ResumeForceToken = nullptr;
TInstant ReplicationEndTime;
bool UnrecoveredNonphantomBlobs = false;
bool RequestedReplicationToken = false;
bool HoldingReplicationToken = false;

TWatchdogTimer<TEvReplCheckProgress> ReplProgressWatchdog;

Expand Down Expand Up @@ -288,6 +290,12 @@ namespace NKikimr {
case Plan:
// this is a first quantum of replication, so we have to register it in the broker
State = AwaitToken;
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
if (RequestedReplicationToken) {
STLOG(PRI_CRIT, BS_REPL, BSVR38, ReplCtx->VCtx->VDiskLogPrefix << "excessive replication token requested");
break;
}
RequestedReplicationToken = true;
if (!Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(ReplCtx->VDiskCfg->BaseInfo.PDiskId))) {
HandleReplToken();
}
Expand All @@ -304,6 +312,10 @@ namespace NKikimr {
}

void HandleReplToken() {
Y_ABORT_UNLESS(RequestedReplicationToken);
RequestedReplicationToken = false;
HoldingReplicationToken = true;

// switch to replication state
Transition(AwaitToken, Replication);
if (!ResumeIfReady()) {
Expand Down Expand Up @@ -410,6 +422,9 @@ namespace NKikimr {
if (State == WaitQueues || State == Replication) {
// release token as we have finished replicating
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
Y_DEBUG_ABORT_UNLESS(HoldingReplicationToken);
HoldingReplicationToken = false;
}
ResetReplProgressTimer(true);

Expand Down Expand Up @@ -638,7 +653,15 @@ namespace NKikimr {

// return replication token if we have one
if (State == AwaitToken || State == WaitQueues || State == Replication) {
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
Y_DEBUG_ABORT_UNLESS(RequestedReplicationToken || HoldingReplicationToken);
if (RequestedReplicationToken || HoldingReplicationToken) {
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
}
} else {
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken && !HoldingReplicationToken);
if (RequestedReplicationToken || HoldingReplicationToken) {
STLOG(PRI_CRIT, BS_REPL, BSVR37, ReplCtx->VCtx->VDiskLogPrefix << "stuck replication token");
}
}

if (ReplJobActorId) {
Expand Down
22 changes: 17 additions & 5 deletions ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ namespace NKikimr {
ui64 NextReceiveCookie;
TResultQueue ResultQueue;
std::shared_ptr<TMessageRelevanceTracker> Tracker = std::make_shared<TMessageRelevanceTracker>();
bool Terminated = false;

TQueue<std::unique_ptr<TEvBlobStorage::TEvVGet>> SchedulerRequestQ;
THashMap<ui64, TReplMemTokenId> RequestTokens;
Expand Down Expand Up @@ -227,9 +228,7 @@ namespace NKikimr {
PrefetchDataSize = 0;
RequestFromVDiskProxyPending = false;
if (Finished) {
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
RequestTokens.clear();
return PassAway(); // TODO(alexvru): check correctness of invocations
return PassAway();
}
}
// send request(s) if prefetch queue is not full
Expand Down Expand Up @@ -297,6 +296,9 @@ namespace NKikimr {
if (msg->Record.GetCookie() == NextReceiveCookie) {
ui64 cookie = NextReceiveCookie;
ProcessResult(msg);
if (Terminated) {
return;
}
ReleaseMemToken(cookie);
while (!ResultQueue.empty()) {
const TQueueItem& top = ResultQueue.top();
Expand All @@ -305,6 +307,9 @@ namespace NKikimr {
}
ui64 cookie = NextReceiveCookie;
ProcessResult(top.get());
if (Terminated) {
return;
}
ReleaseMemToken(cookie);
ResultQueue.pop();
}
Expand All @@ -314,6 +319,7 @@ namespace NKikimr {
}

void ReleaseMemToken(ui64 cookie) {
Y_ABORT_UNLESS(!Terminated);
if (RequestTokens) {
auto it = RequestTokens.find(cookie);
Y_ABORT_UNLESS(it != RequestTokens.end());
Expand Down Expand Up @@ -428,6 +434,13 @@ namespace NKikimr {
}
}

void PassAway() override {
Y_ABORT_UNLESS(!Terminated);
Terminated = true;
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
TActorBootstrapped::PassAway();
}

STRICT_STFUNC(StateFunc,
hFunc(TEvReplProxyNext, Handle)
hFunc(TEvReplMemToken, Handle)
Expand All @@ -446,8 +459,7 @@ namespace NKikimr {
TTrackableVector<TVDiskProxy::TScheduledBlob>&& ids,
const TVDiskID& vdiskId,
const TActorId& serviceId)
: TActorBootstrapped<TVDiskProxyActor>()
, ReplCtx(std::move(replCtx))
: ReplCtx(std::move(replCtx))
, GType(ReplCtx->VCtx->Top->GType)
, Ids(std::move(ids))
, VDiskId(vdiskId)
Expand Down

0 comments on commit 2414fb7

Please sign in to comment.