From 45ac1f198d55bd61bc76dc6c70786d2965d52931 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 17 Feb 2023 18:34:04 +0800 Subject: [PATCH 1/2] [transactions] Better handling of network exceptions while sending TX markers (#60) --- .../handlers/kop/KafkaRequestHandler.java | 3 +++ .../transaction/PendingRequest.java | 4 ++++ .../transaction/TransactionCoordinator.java | 10 +++++++++ .../TransactionMarkerChannelHandler.java | 22 +++++++++++++++---- ...sactionMarkerRequestCompletionHandler.java | 1 + .../transaction/TransactionStateManager.java | 4 ++++ 6 files changed, 40 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index f4de60ce0a..cad441bba4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -2195,6 +2195,9 @@ protected void handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequ responseData.results().add(topicResult); topicPartitionErrorsMap.forEach((TopicPartition tp, Errors error) -> { if (tp.topic().equals(topicName)) { + if (log.isDebugEnabled() && error != Errors.NONE) { + log.info("Error {} for {}", error, tp); + } topicResult.results() .add(new AddPartitionsToTxnResponseData .AddPartitionsToTxnPartitionResult() diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java index c7e5ef6d60..6199d1b476 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java @@ -57,6 +57,10 @@ public int getCorrelationId() { return requestHeader.correlationId(); } + public AbstractResponse createErrorResponse(Throwable error) { + return request.getErrorResponse(error); + } + public void complete(final ResponseContext responseContext) { responseConsumerHandler.accept(responseContext); sendFuture.complete(responseContext.getResponse()); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index 62a8e66819..d81d135265 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -492,8 +492,17 @@ public void handleAddPartitionsToTransaction(String transactionalId, return Either.left(producerEpochFenceErrors()); } else if (txnMetadata.getPendingState().isPresent()) { // return a retriable exception to let the client backoff and retry + if (log.isDebugEnabled()) { + log.debug("Producer {} is in pending state {}, responding CONCURRENT_TRANSACTIONS", + transactionalId, txnMetadata.getPendingState()); + } return Either.left(Errors.CONCURRENT_TRANSACTIONS); } else if (txnMetadata.getState() == PREPARE_COMMIT || txnMetadata.getState() == PREPARE_ABORT) { + if (log.isDebugEnabled()) { + log.debug("Producer {} is in state {}, responding CONCURRENT_TRANSACTIONS", + transactionalId, txnMetadata.getState() + ); + } return Either.left(Errors.CONCURRENT_TRANSACTIONS); } else if (txnMetadata.getState() == ONGOING && txnMetadata.getTopicPartitions().containsAll(partitionList)) { @@ -521,6 +530,7 @@ public void complete() { @Override public void fail(Errors e) { + log.error("Error writing to TX log for {}, answer {}", transactionalId, e); responseCallback.accept(e); } }, errors -> true); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java index 79b91884df..40fef2aa2f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java @@ -30,6 +30,7 @@ import javax.security.sasl.SaslException; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; +import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.message.SaslAuthenticateRequestData; import org.apache.kafka.common.message.SaslHandshakeRequestData; import org.apache.kafka.common.protocol.ApiKeys; @@ -112,8 +113,15 @@ public void channelActive(ChannelHandlerContext channelHandlerContext) throws Ex public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception { log.info("[TransactionMarkerChannelHandler] channelInactive, failing {} pending requests", pendingRequestMap.size()); - pendingRequestMap.forEach((__, pendingRequest) -> - log.warn("Pending request ({}) was not sent when the txn marker channel is inactive", pendingRequest)); + pendingRequestMap.forEach((correlationId, pendingRequest) -> { + log.warn("Pending request ({}) was not sent when the txn marker channel is inactive", pendingRequest); + pendingRequest.complete(responseContext.set( + channelHandlerContext.channel().remoteAddress(), + pendingRequest.getApiVersion(), + (int) correlationId, + pendingRequest.createErrorResponse(new NetworkException()) + )); + }); pendingRequestMap.clear(); transactionMarkerChannelManager.channelFailed((InetSocketAddress) channelHandlerContext .channel() @@ -151,9 +159,15 @@ public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) t @Override public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception { log.error("Transaction marker channel handler caught exception.", throwable); - pendingRequestMap.forEach((__, pendingRequest) -> + pendingRequestMap.forEach((correlationId, pendingRequest) -> { log.warn("Pending request ({}) failed because the txn marker channel caught exception", - pendingRequest, throwable)); + pendingRequest, throwable); + pendingRequest.complete(responseContext.set( + channelHandlerContext.channel().remoteAddress(), + pendingRequest.getApiVersion(), + (int) correlationId, + pendingRequest.createErrorResponse(new NetworkException(throwable)))); + }); pendingRequestMap.clear(); channelHandlerContext.close(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java index e09f84ab19..9bf88d49c1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java @@ -169,6 +169,7 @@ private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions( case NOT_ENOUGH_REPLICAS: case NOT_ENOUGH_REPLICAS_AFTER_APPEND: case REQUEST_TIMED_OUT: + case NETWORK_EXCEPTION: case UNKNOWN_SERVER_ERROR: case KAFKA_STORAGE_ERROR: // these are retriable errors log.info("Sending {}'s transaction marker for partition {} has failed with error {}, " diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java index a77cedd568..31b1519748 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java @@ -775,6 +775,10 @@ private void completeLoadedTransactions(TopicPartition topicPartition, long star public void removeTransactionsForTxnTopicPartition(int partition) { TopicPartition topicPartition = new TopicPartition(transactionConfig.getTransactionMetadataTopicName(), partition); + if (scheduler.isShutdown()) { + log.info("Skip unloading transaction metadata from {} as broker is stopping", topicPartition); + return; + } log.info("Scheduling unloading transaction metadata from {}", topicPartition); CoreUtils.inWriteLock(stateLock, () -> { From c0bad1a4f9bf2a885c2265097675153014d0f59c Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 26 Jun 2023 12:05:15 +0800 Subject: [PATCH 2/2] Remove unused log --- .../streamnative/pulsar/handlers/kop/KafkaRequestHandler.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index cad441bba4..f4de60ce0a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -2195,9 +2195,6 @@ protected void handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequ responseData.results().add(topicResult); topicPartitionErrorsMap.forEach((TopicPartition tp, Errors error) -> { if (tp.topic().equals(topicName)) { - if (log.isDebugEnabled() && error != Errors.NONE) { - log.info("Error {} for {}", error, tp); - } topicResult.results() .add(new AddPartitionsToTxnResponseData .AddPartitionsToTxnPartitionResult()