From 2bf3eb9bbd614559f5ef812eed19c37a5da7a534 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 2 Jan 2025 15:43:52 +0100 Subject: [PATCH] NRG: Don't mark current/healthy while paused with pending commits Signed-off-by: Maurice van Veen --- server/raft.go | 6 ++++++ server/raft_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/server/raft.go b/server/raft.go index 72a66f3a21..3361cdfead 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1437,6 +1437,12 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { return false } + if n.paused && n.hcommit > n.commit { + // We're currently paused, waiting to be resumed to apply pending commits. + n.debug("Not current, waiting to resume applies commit=%d, hcommit=%d", n.commit, n.hcommit) + return false + } + if n.commit == n.applied { // At this point if we are current, we can return saying so. clearBehindState() diff --git a/server/raft_test.go b/server/raft_test.go index fbce5251e6..2b6dcddf29 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1896,3 +1896,52 @@ func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) { n.Applied(3) require_True(t, n.Healthy()) } + +func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) + + // Process first message. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // Process second message, moves commit up. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + require_False(t, n.Healthy()) + + // We're healthy once we've applied the first message. + n.Applied(1) + require_True(t, n.Healthy()) + + // If we're paused we still are healthy if there are no pending commits. + err := n.PauseApply() + require_NoError(t, err) + require_True(t, n.Healthy()) + + // Heartbeat marks second message to be committed. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.pindex, 2) + require_False(t, n.Healthy()) + + // Resuming apply commits the message. + n.ResumeApply() + require_NoError(t, err) + require_False(t, n.Healthy()) + + // But still waiting for it to be applied before marking healthy. + n.Applied(2) + require_True(t, n.Healthy()) +}