Skip to content

Commit

Permalink
[Java] Emit WARN event when ControlSession is closed abruptly + add r…
Browse files Browse the repository at this point in the history
…eason to the ControlSession state transition log + increase default stale session check interval to 1s.
  • Loading branch information
vyazelenko committed Dec 18, 2024
1 parent 10a0362 commit d421bde
Show file tree
Hide file tree
Showing 22 changed files with 130 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,12 @@ static void dissectControlSessionStateChange(

builder.append(": controlSessionId=").append(controlSessionId);
builder.append(" ");
absoluteOffset += buffer.getStringAscii(absoluteOffset, builder, LITTLE_ENDIAN);
absoluteOffset += SIZE_OF_INT;

builder.append(" reason=\"");
buffer.getStringAscii(absoluteOffset, builder, LITTLE_ENDIAN);
builder.append("\"");
}

static void dissectReplaySessionError(
Expand Down
16 changes: 11 additions & 5 deletions aeron-agent/src/main/java/io/aeron/agent/ArchiveEventEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,26 +131,32 @@ static <E extends Enum<E>> int replicationSessionStateChangeLength(final E from,
return stateTransitionStringLength(from, to) + (4 * SIZE_OF_LONG) + (SIZE_OF_INT + reason.length());
}

static <E extends Enum<E>> int encodeSessionStateChange(
static <E extends Enum<E>> int encodeControlSessionStateChange(
final UnsafeBuffer encodingBuffer,
final int offset,
final int captureLength,
final int length,
final E from,
final E to,
final long id)
final long id,
final String reason)
{
int encodedLength = encodeLogHeader(encodingBuffer, offset, captureLength, length);

encodingBuffer.putLong(offset + encodedLength, id, LITTLE_ENDIAN);
encodedLength += SIZE_OF_LONG;

return encodeTrailingStateChange(encodingBuffer, offset, encodedLength, captureLength, from, to);
encodedLength += encodeStateChange(encodingBuffer, offset + encodedLength, from, to);

encodedLength += encodeTrailingString(encodingBuffer, offset + encodedLength,
captureLength + LOG_HEADER_LENGTH - encodedLength, reason);

return encodedLength;
}

static <E extends Enum<E>> int sessionStateChangeLength(final E from, final E to)
static <E extends Enum<E>> int sessionStateChangeLength(final E from, final E to, final String reason)
{
return stateTransitionStringLength(from, to) + SIZE_OF_LONG;
return stateTransitionStringLength(from, to) + SIZE_OF_LONG + (SIZE_OF_INT + reason.length());
}

static void encodeReplaySessionError(
Expand Down
12 changes: 7 additions & 5 deletions aeron-agent/src/main/java/io/aeron/agent/ArchiveEventLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,15 @@ public <E extends Enum<E>> void logReplicationSessionStateChange(
* @param oldState before the change.
* @param newState after the change.
* @param controlSessionId identity for the control session on the Archive.
* @param reason a string indicating the reason for the state change.
*/
public <E extends Enum<E>> void logControlSessionStateChange(
final E oldState,
final E newState,
final long controlSessionId)
final long controlSessionId,
final String reason)
{
final int length = sessionStateChangeLength(oldState, newState);
final int length = sessionStateChangeLength(oldState, newState, reason);
final int captureLength = captureLength(length);
final int encodedLength = encodedLength(captureLength);
final ManyToOneRingBuffer ringBuffer = this.ringBuffer;
Expand All @@ -275,15 +277,15 @@ public <E extends Enum<E>> void logControlSessionStateChange(
{
try
{
encodeSessionStateChange(
encodeControlSessionStateChange(
(UnsafeBuffer)ringBuffer.buffer(),
index,
captureLength,
length,
oldState,
newState,
controlSessionId
);
controlSessionId,
reason);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ static void logReplicationSessionDone(
static class ControlSessionStateChange
{
@Advice.OnMethodEnter
static <E extends Enum<E>> void logStateChange(final E oldState, final E newState, final long controlSessionId)
static <E extends Enum<E>> void logStateChange(
final E oldState, final E newState, final long controlSessionId, final String reason)
{
LOGGER.logControlSessionStateChange(oldState, newState, controlSessionId);
LOGGER.logControlSessionStateChange(oldState, newState, controlSessionId, reason);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,7 @@ static int encode(
}

static <E extends Enum<E>> int encodeStateChange(
final UnsafeBuffer encodingBuffer,
final int offset,
final E from,
final E to)
final UnsafeBuffer encodingBuffer, final int offset, final E from, final E to)
{
int encodedLength = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,13 +906,14 @@ void controlSessionStateChange()
{
internalEncodeLogHeader(buffer, 0, 10, 20, () -> 1_500_000_000L);
buffer.putLong(LOG_HEADER_LENGTH, -10_000_000_000L, LITTLE_ENDIAN);
buffer.putStringAscii(LOG_HEADER_LENGTH + SIZE_OF_LONG, "x -> y");
final int length = buffer.putStringAscii(LOG_HEADER_LENGTH + SIZE_OF_LONG, "x -> y");
buffer.putStringAscii(LOG_HEADER_LENGTH + SIZE_OF_LONG + length, "the very reason to report");

dissectControlSessionStateChange(buffer, 0, builder);

assertEquals("[1.500000000] " + CONTEXT + ": " + CONTROL_SESSION_STATE_CHANGE.name() + " [10/20]:" +
" controlSessionId=-10000000000" +
" x -> y",
" x -> y reason=\"the very reason to report\"",
builder.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,21 @@ enum State
private final UnsafeBuffer buffer = new UnsafeBuffer(new byte[MAX_EVENT_LENGTH]);

@Test
void testEncodeSessionStateChange()
void testEncodeControlSessionStateChange()
{
final int offset = 24;
final TimeUnit from = DAYS;
final TimeUnit to = MILLISECONDS;
final long sessionId = Long.MAX_VALUE;
final String payload = from.name() + STATE_SEPARATOR + to.name();
final int length = payload.length() + SIZE_OF_LONG + SIZE_OF_INT;
final String reason = "test reason for now";
final int length = payload.length() + SIZE_OF_LONG + SIZE_OF_INT + SIZE_OF_INT + reason.length();
final int captureLength = captureLength(length);

final int encodedLength = encodeSessionStateChange(
buffer, offset, captureLength, length, from, to, sessionId);
final int encodedLength = encodeControlSessionStateChange(
buffer, offset, captureLength, length, from, to, sessionId, reason);

assertEquals(encodedLength(sessionStateChangeLength(from, to)), encodedLength);
assertEquals(encodedLength(sessionStateChangeLength(from, to, reason)), encodedLength);
assertEquals(captureLength, buffer.getInt(offset, LITTLE_ENDIAN));
assertEquals(length, buffer.getInt(offset + SIZE_OF_INT, LITTLE_ENDIAN));
assertNotEquals(0, buffer.getLong(offset + SIZE_OF_INT * 2, LITTLE_ENDIAN));
Expand All @@ -69,8 +70,11 @@ void testSessionStateChangeLength()
final ChronoUnit from = ChronoUnit.ERAS;
final ChronoUnit to = ChronoUnit.MILLENNIA;
final String payload = from.name() + STATE_SEPARATOR + to.name();
final String reason = "hfskhflkdhfldshlfkhllkshflkhsldfhaslkfhsaklhflksahdflsahlhalks";

assertEquals(payload.length() + SIZE_OF_LONG + SIZE_OF_INT, sessionStateChangeLength(from, to));
assertEquals(
payload.length() + SIZE_OF_LONG + SIZE_OF_INT * 2 + reason.length(),
sessionStateChangeLength(from, to, reason));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,18 @@ void logControlSessionStateChange()
final ChronoUnit to = ChronoUnit.MICROS;
final long id = 555_000_000_000L;
final String payload = from.name() + STATE_SEPARATOR + to.name();
final int captureLength = SIZE_OF_LONG + SIZE_OF_INT + payload.length();
final String reason = "test reason to check";
final int captureLength = SIZE_OF_LONG + SIZE_OF_INT + payload.length() + SIZE_OF_INT + reason.length();

logger.logControlSessionStateChange(from, to, id);
logger.logControlSessionStateChange(from, to, id, reason);

verifyLogHeader(
logBuffer, offset, CONTROL_SESSION_STATE_CHANGE.toEventCodeId(), captureLength, captureLength);
assertEquals(id, logBuffer.getLong(encodedMsgOffset(offset + LOG_HEADER_LENGTH), LITTLE_ENDIAN));
assertEquals(
payload, logBuffer.getStringAscii(encodedMsgOffset(offset + LOG_HEADER_LENGTH + SIZE_OF_LONG)));
assertEquals(reason, logBuffer.getStringAscii(encodedMsgOffset(
offset + LOG_HEADER_LENGTH + SIZE_OF_LONG + SIZE_OF_INT + payload.length())));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class AbstractListRecordingsSession implements Session
/**
* {@inheritDoc}
*/
public void abort()
public void abort(final String reason)
{
isDone = true;
}
Expand Down
6 changes: 3 additions & 3 deletions aeron-archive/src/main/java/io/aeron/archive/Archive.java
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,10 @@ public static final class Configuration
/**
* Default time interval in nanoseconds for checking session liveness.
*
* @see #CONNECT_TIMEOUT_PROP_NAME
* @see #SESSION_LIVENESS_CHECK_INTERVAL_PROP_NAME
*/
@Config(defaultType = DefaultType.LONG, defaultLong = 100 * 1000 * 1000)
public static final long SESSION_LIVENESS_CHECK_INTERVAL_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(100);
@Config(defaultType = DefaultType.LONG, defaultLong = 1000L * 1000 * 1000)
public static final long SESSION_LIVENESS_CHECK_INTERVAL_DEFAULT_NS = TimeUnit.SECONDS.toNanos(1);

/**
* How long a replay publication should linger after all data is sent. Longer linger can help avoid tail loss.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ void stopReplay(final long correlationId, final long replaySessionId, final Cont
final ReplaySession replaySession = replaySessionByIdMap.get(replaySessionId);
if (null != replaySession)
{
replaySession.abort();
replaySession.abort("stop replay");
}

controlSession.sendOkResponse(correlationId);
Expand All @@ -886,7 +886,7 @@ void stopAllReplays(final long correlationId, final long recordingId, final Cont
{
if (NULL_VALUE == recordingId || replaySession.recordingId() == recordingId)
{
replaySession.abort();
replaySession.abort("stop all replays");
}
}

Expand Down Expand Up @@ -1136,7 +1136,7 @@ void stopRecordingByIdentity(final long correlationId, final long recordingId, f

if (null != recordingSession)
{
recordingSession.abort();
recordingSession.abort("stop recording by identity");

final long subscriptionId = recordingSession.subscription().registrationId();
final Subscription subscription = removeRecordingSubscription(subscriptionId);
Expand Down Expand Up @@ -1187,7 +1187,7 @@ void closeRecordingSession(final RecordingSession session)

if (subscriptionRefCountMap.decrementAndGet(subscriptionId) <= 0 || session.isAutoStop())
{
closeAndRemoveRecordingSubscription(subscription);
closeAndRemoveRecordingSubscription(subscription, "close recording session");
}
closeSession(session);
ctx.recordingSessionCounter().decrementOrdered();
Expand Down Expand Up @@ -1317,7 +1317,7 @@ void stopReplication(final long correlationId, final long replicationId, final C
}
else
{
session.abort();
session.abort("stop replication");
controlSession.sendOkResponse(correlationId);
}
}
Expand Down Expand Up @@ -1567,7 +1567,7 @@ private void abortRecordingSessionAndCloseSubscription(final Subscription subscr
{
if (subscription == session.subscription())
{
session.abort();
session.abort("stop recording");
}
}

Expand Down Expand Up @@ -1922,7 +1922,7 @@ private void extendRecordingSession(
errorHandler.onError(ex);
if (autoStop)
{
closeAndRemoveRecordingSubscription(image.subscription());
closeAndRemoveRecordingSubscription(image.subscription(), ex.getMessage());
}
}
}
Expand Down Expand Up @@ -2370,7 +2370,7 @@ private boolean eraseRemainingSegment(
return true;
}

private void closeAndRemoveRecordingSubscription(final Subscription subscription)
private void closeAndRemoveRecordingSubscription(final Subscription subscription, final String reason)
{
final long subscriptionId = subscription.registrationId();
subscriptionRefCountMap.remove(subscriptionId);
Expand All @@ -2379,7 +2379,7 @@ private void closeAndRemoveRecordingSubscription(final Subscription subscription
{
if (subscription == session.subscription())
{
session.abort();
session.abort(reason);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.client.ArchiveEvent;
import io.aeron.archive.codecs.*;
import io.aeron.exceptions.AeronException;
import io.aeron.logbuffer.BufferClaim;
Expand Down Expand Up @@ -218,21 +218,20 @@ private static void checkResult(final ControlSession session, final long result)
{
if (result == Publication.NOT_CONNECTED)
{
session.abort();
throw new ArchiveException(
"response publication is not connected: " + session, AeronException.Category.WARN);
session.abort("response publication is not connected");
throw new ArchiveEvent("response publication is not connected: " + session);
}

if (result == Publication.CLOSED)
{
session.abort();
throw new ArchiveException("response publication is closed: " + session);
session.abort("response publication is closed");
throw new ArchiveEvent("response publication is closed: " + session, AeronException.Category.ERROR);
}

if (result == Publication.MAX_POSITION_EXCEEDED)
{
session.abort();
throw new ArchiveException("response publication at max position: " + session);
session.abort("response publication at max position");
throw new ArchiveEvent("response publication at max position: " + session, AeronException.Category.ERROR);
}
}

Expand Down
Loading

0 comments on commit d421bde

Please sign in to comment.