Skip to content

Commit

Permalink
[Java] Use ClusterEvent to report issues when stopping recording/repl…
Browse files Browse the repository at this point in the history
…ay + prevent an NPE when stopping a replay as `clusterArchive` could have been closed while in the BACKUP_QUERY stage.
  • Loading branch information
vyazelenko committed Jan 17, 2025
1 parent cc2d0df commit c37546e
Showing 1 changed file with 37 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,8 @@ public void onClose()
{
aeron.removeUnavailableCounterHandler(unavailableCounterHandlerRegistrationId);

if (NULL_VALUE != liveLogRecordingSubscriptionId)
{
backupArchive.tryStopRecording(liveLogRecordingSubscriptionId);
}

if (NULL_VALUE != liveLogReplaySessionId)
{
try
{
clusterArchive.stopReplay(liveLogReplaySessionId);
}
catch (final Exception ignore)
{
}
}
stopRecording();
stopReplay();

CloseHelper.close(snapshotReplication);

Expand Down Expand Up @@ -346,43 +333,54 @@ private void reset()
CloseHelper.close(snapshotReplication);
snapshotReplication = null;

if (NULL_VALUE != liveLogRecordingSubscriptionId)
stopRecording();
stopReplay();

CloseHelper.closeAll(
(ErrorHandler)ctx.countedErrorHandler(),
consensusPublicationGroup,
clusterArchive,
clusterArchiveAsyncConnect,
recordingSubscription);

clusterArchive = null;
clusterArchiveAsyncConnect = null;
recordingSubscription = null;
}

private void stopReplay()
{
if (NULL_VALUE != liveLogReplaySessionId)
{
try
{
backupArchive.tryStopRecording(liveLogRecordingSubscriptionId);
}
catch (final Exception ex)
if (null != clusterArchive)
{
ctx.countedErrorHandler().onError(new ClusterException(
"failed to stop log recording", ex, Category.WARN));
try
{
clusterArchive.stopReplay(liveLogReplaySessionId);
}
catch (final Exception ex)
{
ctx.countedErrorHandler().onError(new ClusterEvent("failed to stop log replay: " + ex));
}
}
liveLogRecordingSubscriptionId = NULL_VALUE;
liveLogReplaySessionId = NULL_VALUE;
}
}

if (NULL_VALUE != liveLogReplaySessionId)
private void stopRecording()
{
if (NULL_VALUE != liveLogRecordingSubscriptionId)
{
try
{
clusterArchive.stopReplay(liveLogReplaySessionId);
backupArchive.tryStopRecording(liveLogRecordingSubscriptionId);
}
catch (final Exception ex)
{
ctx.countedErrorHandler().onError(new ClusterException("failed to stop log replay", ex, Category.WARN));
ctx.countedErrorHandler().onError(new ClusterEvent("failed to stop log recording: " + ex));
}
liveLogReplaySessionId = NULL_VALUE;
liveLogRecordingSubscriptionId = NULL_VALUE;
}

CloseHelper.closeAll(
(ErrorHandler)ctx.countedErrorHandler(),
consensusPublicationGroup,
clusterArchive,
clusterArchiveAsyncConnect,
recordingSubscription);

clusterArchive = null;
clusterArchiveAsyncConnect = null;
recordingSubscription = null;
}

private void onUnavailableCounter(final CountersReader counters, final long registrationId, final int counterId)
Expand Down

0 comments on commit c37546e

Please sign in to comment.