From fdb3a35f2f6a01d100119c0c039abdc2c43faa7f Mon Sep 17 00:00:00 2001 From: "denis.plotnikov" Date: Mon, 30 Sep 2024 16:21:49 +0300 Subject: [PATCH] Added recovery logs --- README.md | 1 + build.gradle | 2 +- .../java/com/exactpro/th2/FixHandler.java | 32 +++++++++++++++---- .../th2/conn/dirty/fix/MessageLoader.kt | 25 +++++++++++++-- 4 files changed, 49 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 0c1b60a..f95141a 100644 --- a/README.md +++ b/README.md @@ -336,6 +336,7 @@ spec: ## 1.4.1 * Use keep open gRPC query to recover messages for Resend Request +* Update `com.exactpro.th2.gradle` plugin to `0.1.3` ## 1.4.0 diff --git a/build.gradle b/build.gradle index a71b4c5..5f68075 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,6 @@ plugins { id "application" - id "com.exactpro.th2.gradle.component" version "0.1.1" + id "com.exactpro.th2.gradle.component" version "0.1.3" id 'org.jetbrains.kotlin.jvm' version '1.8.22' id "org.jetbrains.kotlin.kapt" version "1.8.22" } diff --git a/src/main/java/com/exactpro/th2/FixHandler.java b/src/main/java/com/exactpro/th2/FixHandler.java index d22b726..ca4405a 100644 --- a/src/main/java/com/exactpro/th2/FixHandler.java +++ b/src/main/java/com/exactpro/th2/FixHandler.java @@ -854,23 +854,36 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi Function1 processMessage = (buf) -> { FixField seqNum = findField(buf, MSG_SEQ_NUM_TAG); FixField msgTypeField = findField(buf, MSG_TYPE_TAG); + + LOGGER.info("Processing cradle recovery message {}", buf.toString(US_ASCII)); + if(seqNum == null || seqNum.getValue() == null || msgTypeField == null || msgTypeField.getValue() == null) { + LOGGER.info("Dropping recovery message. Missing SeqNum tag: {}", buf.toString(US_ASCII)); return true; } int sequence = Integer.parseInt(seqNum.getValue()); String msgType = msgTypeField.getValue(); - if(sequence < beginSeqNo) return true; - if(sequence > endSeq) return false; + if(sequence < beginSeqNo) { + LOGGER.info("Dropping recovery message. SeqNum is less than BeginSeqNo: {}", buf.toString(US_ASCII)); + return true; + } + if(sequence > endSeq) { + LOGGER.info("Finishing recovery. SeqNum > EndSeq: {}", buf.toString(US_ASCII)); + return false; + } - if(recoveryConfig.getSequenceResetForAdmin() && ADMIN_MESSAGES.contains(msgType)) return true; + if(recoveryConfig.getSequenceResetForAdmin() && ADMIN_MESSAGES.contains(msgType)) { + LOGGER.info("Dropping recovery message. Admin message sequence reset: {}", buf.toString(US_ASCII)); + return true; + } FixField possDup = findField(buf, POSS_DUP_TAG); if(possDup != null && Objects.equals(possDup.getValue(), IS_POSS_DUP)) return true; if(sequence - 1 != lastProcessedSequence.get() ) { int seqNo = Math.max(beginSeqNo, lastProcessedSequence.get() + 1); - LOGGER.error("Messages [{}, {}] couldn't be recovered", seqNo, sequence); + LOGGER.error("Messages [{}, {}] couldn't be recovered in the middle of recovery", seqNo, sequence); StringBuilder sequenceReset = createSequenceReset(seqNo, sequence); channel.send(Unpooled.wrappedBuffer(sequenceReset.toString().getBytes(StandardCharsets.UTF_8)), strategy.getState().enrichProperties(), @@ -884,6 +897,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi updateLength(buf); updateChecksum(buf); if(!skip.get()) { + LOGGER.info("Sending recovery message: {}", buf.toString(US_ASCII)); channel.send(buf, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { @@ -894,11 +908,13 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi } if(skip.get() && recoveryConfig.getOutOfOrder()) { + LOGGER.info("Skipping recovery message. OutOfOrder: {}", buf.toString(US_ASCII)); skipped.set(buf); skip.set(false); } if(!skip.get() && recoveryConfig.getOutOfOrder()) { + LOGGER.info("Sending recovery message. OutOfOrder: {}", skipped.get().toString(US_ASCII)); skip.set(true); channel.send(skipped.get(), strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); @@ -926,7 +942,7 @@ private void recovery(int beginSeqNo, int endSeqNo, RecoveryConfig recoveryConfi if(lastProcessedSequence.get() < endSeq && msgSeqNum.get() + 1 != lastProcessedSequence.get() + 1) { int seqNo = Math.max(lastProcessedSequence.get() + 1, beginSeqNo); int newSeqNo = msgSeqNum.get() + 1; - LOGGER.error("Messages [{}, {}] couldn't be recovered", seqNo, newSeqNo); + LOGGER.error("Messages [{}, {}] couldn't be recovered in the end of recovery", seqNo, newSeqNo); String seqReset = createSequenceReset(seqNo, newSeqNo).toString(); channel.send( Unpooled.wrappedBuffer(seqReset.getBytes(StandardCharsets.UTF_8)), @@ -2323,8 +2339,8 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { updateLength(missedMessage); updateChecksum(missedMessage); - LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII)); if(!skip) { + LOGGER.info("Sending recovery message from state: {}", missedMessage.toString(US_ASCII)); channel.send(missedMessage, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { @@ -2335,11 +2351,13 @@ private void recoveryFromState(Integer beginSeqNo, Integer endSeqNo) { } if(skip && recoveryConfig.getOutOfOrder()) { + LOGGER.info("Skip recovery message out of order: {}", missedMessage.toString(US_ASCII)); skip = false; skipped = missedMessage; } - if(!skip && recoveryConfig.getOutOfOrder()) { + if(!skip && recoveryConfig.getOutOfOrder() && skipped != null) { + LOGGER.info("Sending recovery message from state out of order: {}", skipped.toString(US_ASCII)); channel.send(skipped, strategy.getState().enrichProperties(), null, SendMode.MANGLE) .thenAcceptAsync(x -> strategy.getState().addMessageID(x), executorService); try { diff --git a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt index 23992a5..a1adf81 100644 --- a/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt +++ b/src/main/kotlin/com/exactpro/th2/conn/dirty/fix/MessageLoader.kt @@ -47,6 +47,7 @@ import kotlin.concurrent.withLock import mu.KotlinLogging import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit +import kotlin.text.Charsets.US_ASCII class MessageLoader( private val executor: ScheduledExecutorService, @@ -145,7 +146,15 @@ class MessageLoader( ) ) - val firstValidMessage = firstValidMessageDetails(backwardIterator) ?: return@withCancellation + val firstValidMessage = firstValidMessageDetails(backwardIterator) + + if (firstValidMessage == null) { + K_LOGGER.info { "Not found valid messages to recover." } + return@withCancellation + } + firstValidMessage.let { + K_LOGGER.info { "Backward search. First valid message seq num: ${it.payloadSequence} timestamp: ${it.timestamp} cradle sequence: ${it.messageSequence}" } + } var messagesToSkip = firstValidMessage.payloadSequence - fromSequence @@ -157,14 +166,19 @@ class MessageLoader( continue } timestamp = message.messageId.timestamp + val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) + val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() + + K_LOGGER.debug { "Backward search: Skip message with sequence - $sequence" } + messagesToSkip -= 1 if(messagesToSkip == 0L) { - val buf = Unpooled.copiedBuffer(message.bodyRaw.toByteArray()) - val sequence = buf.findField(MSG_SEQ_NUM_TAG)?.value?.toInt() ?: continue + sequence ?: continue if(sequence > 1 && lastProcessedSequence == 1 || sequence > 2 && lastProcessedSequence == 2) { skipRetransmission = true + K_LOGGER.info { "Retransmission will be skipped. Not found valid message with sequence more than 1." } return@withCancellation } @@ -175,17 +189,21 @@ class MessageLoader( timestamp = validMessage.timestamp if(validMessage.payloadSequence <= fromSequence) { + K_LOGGER.info { "Found valid message with start recovery sequence: ${buf.toString(US_ASCII)}" } break } else { messagesToSkip = validMessage.payloadSequence - fromSequence + K_LOGGER.info { "Adjusted number of messages to skip: $messagesToSkip using ${validMessage.payloadSequence} - $fromSequence" } } } else { if(sequence <= fromSequence) { + K_LOGGER.info { "Found valid message with start recovery sequence: ${buf.toString(US_ASCII)}" } break } else { messagesToSkip = sequence - fromSequence + K_LOGGER.info { "Adjusted number of messages to skip: $messagesToSkip using $sequence - $fromSequence" } } } } @@ -213,6 +231,7 @@ class MessageLoader( while (iterator.hasNext()) { val message = Unpooled.buffer().writeBytes(iterator.next().message.bodyRaw.toByteArray()) + K_LOGGER.info { "Sending message to recovery processor: ${message.toString(US_ASCII)}" } if (!processMessage(message)) break } }.onFailure {