diff --git a/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java b/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java index 67ec52db24..3ab1844128 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java +++ b/aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java @@ -85,9 +85,32 @@ public final class AeronArchive implements AutoCloseable */ public static final String NOT_CONNECTED_MSG = "not connected"; + /** + * Describes state of the client instance. + */ + public enum State + { + /** + * Client connected to the archive. + */ + CONNECTED, + + /** + * Connection to the archive was lost. It is only possible to close this client instance. A new client instance + * must be created in order to establish connection with archive again. + */ + DISCONNECTED, + + /** + * Client was closed and can no longer be used. A new client instance must be created in order to establish + * connection with archive again. + */ + CLOSED; + } + private static final int FRAGMENT_LIMIT = 10; - private boolean isClosed = false; + private volatile State state; private boolean isInCallback = false; private long lastCorrelationId = Aeron.NULL_VALUE; private final long controlSessionId; @@ -124,6 +147,7 @@ public final class AeronArchive implements AutoCloseable this.archiveProxy = archiveProxy; this.controlSessionId = controlSessionId; this.archiveId = archiveId; + state = State.CONNECTED; } /** @@ -146,6 +170,16 @@ public static long segmentFileBasePosition( return startTermBasePosition + segments; } + /** + * Returns the state of this client. + * + * @return client state. + */ + public State state() + { + return state; + } + /** * Notify the archive that this control session is closed, so it can promptly release resources then close the * local resources associated with the client. @@ -155,9 +189,9 @@ public void close() lock.lock(); try { - if (!isClosed) + if (State.CLOSED != state) { - isClosed = true; + state = State.CLOSED; final ErrorHandler errorHandler = context.errorHandler(); Exception resultEx = null; @@ -399,11 +433,12 @@ public String pollForErrorResponse() lock.lock(); try { - ensureOpen(); + ensureConnected(); final ControlResponsePoller poller = controlResponsePoller; if (!poller.subscription().isConnected()) { + state = State.DISCONNECTED; return NOT_CONNECTED_MSG; } @@ -443,11 +478,12 @@ public void checkForErrorResponse() lock.lock(); try { - ensureOpen(); + ensureConnected(); final ControlResponsePoller poller = controlResponsePoller; if (!poller.subscription().isConnected()) { + state = State.DISCONNECTED; if (null != context.errorHandler()) { context.errorHandler().onError(new ArchiveException(NOT_CONNECTED_MSG)); @@ -501,7 +537,7 @@ public int pollForRecordingSignals() lock.lock(); try { - ensureOpen(); + ensureConnected(); final ControlResponsePoller poller = controlResponsePoller; if (poller.poll() != 0 && poller.isPollComplete()) @@ -557,7 +593,7 @@ public Publication addRecordedPublication(final String channel, final int stream lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); publication = aeron.addPublication(channel, streamId); @@ -597,7 +633,7 @@ public ExclusivePublication addRecordedExclusivePublication(final String channel lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); publication = aeron.addExclusivePublication(channel, streamId); @@ -634,7 +670,7 @@ public long startRecording(final String channel, final int streamId, final Sourc lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -673,7 +709,7 @@ public long startRecording( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -715,7 +751,7 @@ public long extendRecording( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -760,7 +796,7 @@ public long extendRecording( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -794,7 +830,7 @@ public void stopRecording(final String channel, final int streamId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -828,7 +864,7 @@ public boolean tryStopRecording(final String channel, final int streamId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -858,7 +894,7 @@ public void stopRecording(final long subscriptionId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -889,7 +925,7 @@ public boolean tryStopRecording(final long subscriptionId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -918,7 +954,7 @@ public boolean tryStopRecordingByIdentity(final long recordingId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -973,7 +1009,7 @@ public long startReplay( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1027,7 +1063,7 @@ public long startBoundedReplay( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1074,7 +1110,7 @@ public long startReplay( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); final ChannelUri replayChannelUri = ChannelUri.parse(replayChannel); @@ -1115,7 +1151,7 @@ public void stopReplay(final long replaySessionId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1143,7 +1179,7 @@ public void stopAllReplays(final long recordingId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1182,7 +1218,7 @@ public Subscription replay( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); final ChannelUri replayChannelUri = ChannelUri.parse(replayChannel); @@ -1236,7 +1272,7 @@ public Subscription replay( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); final ChannelUri replayChannelUri = ChannelUri.parse(replayChannel); @@ -1286,7 +1322,7 @@ public Subscription replay( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); final ChannelUri replayChannelUri = ChannelUri.parse(replayChannel); @@ -1335,7 +1371,7 @@ public int listRecordings( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); isInCallback = true; @@ -1378,7 +1414,7 @@ public int listRecordingsForUri( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); isInCallback = true; @@ -1418,7 +1454,7 @@ public int listRecording(final long recordingId, final RecordingDescriptorConsum lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); isInCallback = true; @@ -1450,7 +1486,7 @@ public long getStartPosition(final long recordingId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1480,7 +1516,7 @@ public long getRecordingPosition(final long recordingId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1510,7 +1546,7 @@ public long getStopPosition(final long recordingId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1541,7 +1577,7 @@ public long getMaxRecordedPosition(final long recordingId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1585,7 +1621,7 @@ public long findLastMatchingRecording( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1617,7 +1653,7 @@ public long truncateRecording(final long recordingId, final long position) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1648,7 +1684,7 @@ public long purgeRecording(final long recordingId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1691,7 +1727,7 @@ public int listRecordingSubscriptions( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); isInCallback = true; @@ -1748,7 +1784,7 @@ public long replicate( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1810,7 +1846,7 @@ public long replicate( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1871,7 +1907,7 @@ public long taggedReplicate( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1937,7 +1973,7 @@ public long taggedReplicate( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -1993,7 +2029,7 @@ public long replicate( lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -2028,7 +2064,7 @@ public void stopReplication(final long replicationId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -2059,7 +2095,7 @@ public boolean tryStopReplication(final long replicationId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -2093,7 +2129,7 @@ public void detachSegments(final long recordingId, final long newStartPosition) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -2123,7 +2159,7 @@ public long deleteDetachedSegments(final long recordingId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -2160,7 +2196,7 @@ public long purgeSegments(final long recordingId, final long newStartPosition) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -2193,7 +2229,7 @@ public long attachSegments(final long recordingId) lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -2232,7 +2268,7 @@ public long migrateSegments(final long srcRecordingId, final long dstRecordingId lock.lock(); try { - ensureOpen(); + ensureConnected(); ensureNotReentrant(); lastCorrelationId = aeron.nextCorrelationId(); @@ -2289,14 +2325,8 @@ private void pollNextResponse(final long correlationId, final long deadlineNs, f continue; } - if (!poller.subscription().isConnected()) - { - throw new ArchiveException( - "response channel from archive is not connected, " + - "channel=" + poller.subscription().channel() + - ", streamId=" + poller.subscription().streamId() + - ", imageCount=" + poller.subscription().imageCount()); - } + final Subscription subscription = poller.subscription(); + checkForDisconnect(subscription); checkDeadline(deadlineNs, "awaiting response", correlationId); idleStrategy.idle(); @@ -2304,6 +2334,19 @@ private void pollNextResponse(final long correlationId, final long deadlineNs, f } } + private void checkForDisconnect(final Subscription subscription) + { + if (!subscription.isConnected()) + { + state = State.DISCONNECTED; + throw new ArchiveException( + "response channel from archive is not connected, " + + "channel=" + subscription.channel() + + ", streamId=" + subscription.streamId() + + ", imageCount=" + subscription.imageCount()); + } + } + private long pollForResponse(final long correlationId) { final long deadlineNs = nanoClock.nanoTime() + messageTimeoutNs; @@ -2431,14 +2474,7 @@ private int pollForDescriptors( continue; } - if (!poller.subscription().isConnected()) - { - throw new ArchiveException( - "response channel from archive is not connected, " + - "channel=" + poller.subscription().channel() + - ", streamId=" + poller.subscription().streamId() + - ", imageCount=" + poller.subscription().imageCount()); - } + checkForDisconnect(poller.subscription()); checkDeadline(deadlineNs, "awaiting recording descriptors", correlationId); idleStrategy.idle(); @@ -2477,14 +2513,7 @@ private int pollForSubscriptionDescriptors( continue; } - if (!poller.subscription().isConnected()) - { - throw new ArchiveException( - "response channel from archive is not connected, " + - "channel=" + poller.subscription().channel() + - ", streamId=" + poller.subscription().streamId() + - ", imageCount=" + poller.subscription().imageCount()); - } + checkForDisconnect(poller.subscription()); checkDeadline(deadlineNs, "awaiting subscription descriptors", correlationId); idleStrategy.idle(); @@ -2515,10 +2544,11 @@ private void invokeInvokers() } } - private void ensureOpen() + private void ensureConnected() { - if (isClosed) + if (State.CONNECTED != state) { + close(); throw new ArchiveException("client is closed"); } } @@ -3599,6 +3629,7 @@ public enum State private byte[] encodedCredentialsFromChallenge = null; private State state = State.ADD_PUBLICATION; private ArchiveProxy archiveProxy; + private AeronArchive aeronArchive; AsyncConnect(final Context ctx) { @@ -3609,7 +3640,18 @@ public enum State final Aeron aeron = ctx.aeron(); controlResponsePoller = new ControlResponsePoller( - aeron.addSubscription(ctx.controlResponseChannel(), ctx.controlResponseStreamId())); + aeron.addSubscription( + ctx.controlResponseChannel(), + ctx.controlResponseStreamId(), + null, + (image) -> + { + final AeronArchive client = aeronArchive; + if (null != client) + { + client.state = AeronArchive.State.DISCONNECTED; + } + })); checkAndSetupResponseChannel(ctx, controlResponsePoller.subscription()); @@ -3699,7 +3741,6 @@ public State state() public AeronArchive poll() { checkDeadline(); - AeronArchive aeronArchive = null; if (State.ADD_PUBLICATION == state) { diff --git a/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java b/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java index d2b178b0e4..de57496944 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java +++ b/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java @@ -941,10 +941,19 @@ void shouldTimeoutInactiveArchiveClients(final String controlRequestChannel, fin } } + assertEquals(AeronArchive.State.CONNECTED, client1.state()); assertTrue(client1.archiveProxy().publication().isConnected()); assertTrue(client1.controlResponsePoller().subscription().isConnected()); + assertEquals(AeronArchive.State.DISCONNECTED, client2.state()); assertTrue(client2.archiveProxy().publication().isConnected()); assertFalse(client2.controlResponsePoller().subscription().isConnected()); + + final ArchiveException exception = + assertThrowsExactly(ArchiveException.class, () -> client2.getMaxRecordedPosition(4)); + assertEquals("ERROR - client is closed", exception.getMessage()); + assertEquals(AeronArchive.State.CLOSED, client2.state()); + assertFalse(client2.archiveProxy().publication().isConnected()); + assertFalse(client2.controlResponsePoller().subscription().isConnected()); } } } diff --git a/aeron-archive/src/test/java/io/aeron/archive/client/AeronArchiveTest.java b/aeron-archive/src/test/java/io/aeron/archive/client/AeronArchiveTest.java index 1ed5f6a37e..75cc0cb86e 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/client/AeronArchiveTest.java +++ b/aeron-archive/src/test/java/io/aeron/archive/client/AeronArchiveTest.java @@ -16,9 +16,11 @@ package io.aeron.archive.client; import io.aeron.Aeron; +import io.aeron.AvailableImageHandler; import io.aeron.ChannelUri; import io.aeron.Publication; import io.aeron.Subscription; +import io.aeron.UnavailableImageHandler; import io.aeron.archive.client.AeronArchive.Context; import io.aeron.archive.codecs.ControlResponseCode; import io.aeron.exceptions.AeronException; @@ -76,7 +78,11 @@ void asyncConnectedShouldCloseContext() when(ctx.controlResponseChannel()).thenReturn(responseChannel); when(ctx.controlResponseStreamId()).thenReturn(responseStreamId); final RuntimeException error = new RuntimeException("subscription"); - when(aeron.addSubscription(responseChannel, responseStreamId)).thenThrow(error); + when(aeron.addSubscription( + eq(responseChannel), + eq(responseStreamId), + nullable(AvailableImageHandler.class), + any(UnavailableImageHandler.class))).thenThrow(error); final RuntimeException actualException = assertThrowsExactly(RuntimeException.class, () -> AeronArchive.asyncConnect(ctx)); @@ -87,7 +93,11 @@ void asyncConnectedShouldCloseContext() inOrder.verify(ctx).aeron(); inOrder.verify(ctx).controlResponseChannel(); inOrder.verify(ctx).controlResponseStreamId(); - inOrder.verify(aeron).addSubscription(responseChannel, responseStreamId); + inOrder.verify(aeron).addSubscription( + eq(responseChannel), + eq(responseStreamId), + nullable(AvailableImageHandler.class), + any(UnavailableImageHandler.class)); inOrder.verify(ctx).close(); inOrder.verifyNoMoreInteractions(); } @@ -108,7 +118,11 @@ void asyncConnectedShouldCloseResourceInCaseOfExceptionUponStartup() when(ctx.controlRequestChannel()).thenReturn(requestChannel); when(ctx.controlRequestStreamId()).thenReturn(requestStreamId); final Subscription subscription = mock(Subscription.class); - when(aeron.addSubscription(responseChannel, responseStreamId)).thenReturn(subscription); + when(aeron.addSubscription( + eq(responseChannel), + eq(responseStreamId), + nullable(AvailableImageHandler.class), + any(UnavailableImageHandler.class))).thenReturn(subscription); when(aeron.asyncAddExclusivePublication(requestChannel, requestStreamId)).thenReturn(pubId); final IndexOutOfBoundsException error = new IndexOutOfBoundsException("exception"); when(aeron.context()).thenThrow(error); @@ -122,7 +136,11 @@ void asyncConnectedShouldCloseResourceInCaseOfExceptionUponStartup() inOrder.verify(ctx).aeron(); inOrder.verify(ctx).controlResponseChannel(); inOrder.verify(ctx).controlResponseStreamId(); - inOrder.verify(aeron).addSubscription(responseChannel, responseStreamId); + inOrder.verify(aeron).addSubscription( + eq(responseChannel), + eq(responseStreamId), + nullable(AvailableImageHandler.class), + any(UnavailableImageHandler.class)); inOrder.verify(aeron).asyncAddExclusivePublication(requestChannel, requestStreamId); inOrder.verify(subscription).close(); inOrder.verify(aeron).asyncRemovePublication(pubId);