Skip to content

Commit

Permalink
[Java] Update RecoverPlan after standby snapshot replication complete…
Browse files Browse the repository at this point in the history
…s with the replicated snapshot entries.
  • Loading branch information
vyazelenko committed Nov 11, 2024
1 parent 7cc3512 commit a25c83d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2324,7 +2324,8 @@ private int checkNodeControlToggle()
ctx.serviceCount(),
ctx.leaderArchiveControlChannel(),
ctx.archiveContext().controlRequestStreamId(),
ctx.replicationChannel());
ctx.replicationChannel(),
ctx.fileSyncLevel());
}
NodeControl.ToggleState.reset(nodeControlToggle);
break;
Expand Down Expand Up @@ -3549,7 +3550,8 @@ private void replicateStandbySnapshotsForStartup()
ctx.serviceCount(),
ctx.leaderArchiveControlChannel(),
ctx.archiveContext().controlRequestStreamId(),
ctx.replicationChannel()))
ctx.replicationChannel(),
ctx.fileSyncLevel()))
{
while (!standbySnapshotReplicator.isComplete())
{
Expand Down Expand Up @@ -3583,7 +3585,8 @@ private int pollStandbySnapshotReplication(final long nowNs)

if (standbySnapshotReplicator.isComplete())
{
ctx.snapshotCounter().increment();
recoveryPlan = recordingLog.createRecoveryPlan(archive, ctx.serviceCount(), Aeron.NULL_VALUE);
ctx.snapshotCounter().incrementOrdered();
CloseHelper.quietClose(standbySnapshotReplicator);
standbySnapshotReplicator = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class StandbySnapshotReplicator implements AutoCloseable
private final String archiveControlChannel;
private final int archiveControlStreamId;
private final String replicationChannel;
private final int fileSyncLevel;
private final Object2ObjectHashMap<String, String> errorsByEndpoint = new Object2ObjectHashMap<>();
private MultipleRecordingReplication recordingReplication;
private ArrayList<SnapshotReplicationEntry> snapshotsToReplicate;
Expand All @@ -53,7 +54,8 @@ class StandbySnapshotReplicator implements AutoCloseable
final int serviceCount,
final String archiveControlChannel,
final int archiveControlStreamId,
final String replicationChannel)
final String replicationChannel,
final int fileSyncLevel)
{
this.memberId = memberId;
this.archive = archive;
Expand All @@ -62,6 +64,7 @@ class StandbySnapshotReplicator implements AutoCloseable
this.archiveControlChannel = archiveControlChannel;
this.archiveControlStreamId = archiveControlStreamId;
this.replicationChannel = replicationChannel;
this.fileSyncLevel = fileSyncLevel;
}

static StandbySnapshotReplicator newInstance(
Expand All @@ -71,7 +74,8 @@ static StandbySnapshotReplicator newInstance(
final int serviceCount,
final String archiveControlChannel,
final int archiveControlStreamId,
final String replicationChannel)
final String replicationChannel,
final int fileSyncLevel)
{
final AeronArchive archive = AeronArchive.connect(archiveCtx.clone().errorHandler(null));
final StandbySnapshotReplicator standbySnapshotReplicator = new StandbySnapshotReplicator(
Expand All @@ -81,7 +85,8 @@ static StandbySnapshotReplicator newInstance(
serviceCount,
archiveControlChannel,
archiveControlStreamId,
replicationChannel);
replicationChannel,
fileSyncLevel);
archive.context().recordingSignalConsumer(standbySnapshotReplicator::onSignal);
return standbySnapshotReplicator;
}
Expand Down Expand Up @@ -163,7 +168,7 @@ int poll(final long nowNs)
entry.timestamp,
entry.serviceId);
}
recordingLog.force(0);
recordingLog.force(fileSyncLevel);

CloseHelper.quietClose(recordingReplication);
recordingReplication = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

class StandbySnapshotReplicatorTest
{
Expand All @@ -57,6 +53,7 @@ class StandbySnapshotReplicatorTest
private final String replicationChannel = "aeron:udp?endpoint=host0:0";
private final AeronArchive.Context ctx = new AeronArchive.Context();
private final int memberId = 12;
private final int fileSyncLevel = 1;

private final AeronArchive mockArchive = mock(AeronArchive.class);
private final MultipleRecordingReplication mockMultipleRecordingReplication0 = mock(
Expand Down Expand Up @@ -88,7 +85,7 @@ void shouldReplicateStandbySnapshots()
when(mockMultipleRecordingReplication0.completedDstRecordingId(anyLong())).thenAnswer(
(invocation) -> dstRecordingIds.get(invocation.<Long>getArgument(0)));

try (RecordingLog recordingLog = new RecordingLog(clusterDir, true))
try (RecordingLog recordingLog = spy(new RecordingLog(clusterDir, true)))
{
recordingLog.appendSnapshot(1, 0, 0, logPositionOldest, 1_000_000_000L, SERVICE_ID);
recordingLog.appendSnapshot(2, 0, 0, logPositionOldest, 1_000_000_000L, 0);
Expand Down Expand Up @@ -118,7 +115,8 @@ void shouldReplicateStandbySnapshots()
serviceCount,
archiveControlChannel,
archiveControlStreamId,
replicationChannel);
replicationChannel,
fileSyncLevel);

when(mockMultipleRecordingReplication0.isComplete()).thenReturn(true);

Expand All @@ -137,6 +135,7 @@ void shouldReplicateStandbySnapshots()
verify(mockMultipleRecordingReplication0).addRecording(1L, NULL_RECORDING_ID, NULL_POSITION);
verify(mockMultipleRecordingReplication0).addRecording(2L, NULL_RECORDING_ID, NULL_POSITION);
verify(mockMultipleRecordingReplication0).poll(nowNs);
verify(recordingLog).force(fileSyncLevel);
}
}

Expand Down Expand Up @@ -178,7 +177,8 @@ void shouldPassSignalsToRecordingReplication()
1,
archiveControlChannel,
archiveControlStreamId,
replicationChannel);
replicationChannel,
fileSyncLevel);

standbySnapshotReplicator.poll(0);
verify(mockArchive).pollForRecordingSignals();
Expand Down Expand Up @@ -209,7 +209,8 @@ void shouldHandleNoStandbySnapshots()
1,
archiveControlChannel,
archiveControlStreamId,
replicationChannel);
replicationChannel,
fileSyncLevel);

standbySnapshotReplicator.poll(0);
assertTrue(standbySnapshotReplicator.isComplete());
Expand Down Expand Up @@ -261,7 +262,8 @@ void shouldSwitchEndpointsOnMultipleReplicationException()
1,
archiveControlChannel,
archiveControlStreamId,
replicationChannel);
replicationChannel,
fileSyncLevel);

when(mockMultipleRecordingReplication0.poll(anyLong())).thenThrow(new ClusterException("fail"));
when(mockMultipleRecordingReplication1.isComplete()).thenReturn(true);
Expand Down Expand Up @@ -322,7 +324,8 @@ void shouldSwitchEndpointsOnArchivePollForSignalsException()
1,
archiveControlChannel,
archiveControlStreamId,
replicationChannel);
replicationChannel,
fileSyncLevel);

when(mockMultipleRecordingReplication1.isComplete()).thenReturn(true);

Expand Down Expand Up @@ -384,7 +387,8 @@ void shouldThrowExceptionIfUnableToReplicateAnySnapshotsDueToClusterExceptions()
1,
archiveControlChannel,
archiveControlStreamId,
replicationChannel);
replicationChannel,
fileSyncLevel);

standbySnapshotReplicator.poll(nowNs);
standbySnapshotReplicator.poll(nowNs);
Expand Down Expand Up @@ -441,7 +445,8 @@ void shouldThrowExceptionIfUnableToReplicateAnySnapshotsDueToArchiveExceptions()
1,
archiveControlChannel,
archiveControlStreamId,
replicationChannel);
replicationChannel,
fileSyncLevel);

standbySnapshotReplicator.poll(nowNs);
standbySnapshotReplicator.poll(nowNs);
Expand Down Expand Up @@ -486,7 +491,8 @@ void shouldNotRetrieveSnapshotsIfRecordingLogAlreadyHasUpToDateCopies()
1,
archiveControlChannel,
archiveControlStreamId,
replicationChannel);
replicationChannel,
fileSyncLevel);

standbySnapshotReplicator.poll(nowNs);
standbySnapshotReplicator.poll(nowNs);
Expand Down

0 comments on commit a25c83d

Please sign in to comment.