From 8287e6015d3b9a8cb956aad5e9afb51f67e9e4fe Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Wed, 28 Jun 2023 20:17:00 +0800 Subject: [PATCH] [transactions] Better handling of network exceptions while sending TX markers (#1907) This PR is cherry-picking from https://github.com/datastax/starlight-for-kafka/commit/e0636995 ### Motivation Complete the pending request features when the channel is Inactive, or the exception caught. Co-authored-by: Enrico Olivelli --- .../transaction/PendingRequest.java | 4 ++++ .../transaction/TransactionCoordinator.java | 10 +++++++++ .../TransactionMarkerChannelHandler.java | 22 +++++++++++++++---- ...sactionMarkerRequestCompletionHandler.java | 1 + .../transaction/TransactionStateManager.java | 4 ++++ 5 files changed, 37 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java index c7e5ef6d60..6199d1b476 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java @@ -57,6 +57,10 @@ public int getCorrelationId() { return requestHeader.correlationId(); } + public AbstractResponse createErrorResponse(Throwable error) { + return request.getErrorResponse(error); + } + public void complete(final ResponseContext responseContext) { responseConsumerHandler.accept(responseContext); sendFuture.complete(responseContext.getResponse()); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index 62a8e66819..d81d135265 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -492,8 +492,17 @@ public void handleAddPartitionsToTransaction(String transactionalId, return Either.left(producerEpochFenceErrors()); } else if (txnMetadata.getPendingState().isPresent()) { // return a retriable exception to let the client backoff and retry + if (log.isDebugEnabled()) { + log.debug("Producer {} is in pending state {}, responding CONCURRENT_TRANSACTIONS", + transactionalId, txnMetadata.getPendingState()); + } return Either.left(Errors.CONCURRENT_TRANSACTIONS); } else if (txnMetadata.getState() == PREPARE_COMMIT || txnMetadata.getState() == PREPARE_ABORT) { + if (log.isDebugEnabled()) { + log.debug("Producer {} is in state {}, responding CONCURRENT_TRANSACTIONS", + transactionalId, txnMetadata.getState() + ); + } return Either.left(Errors.CONCURRENT_TRANSACTIONS); } else if (txnMetadata.getState() == ONGOING && txnMetadata.getTopicPartitions().containsAll(partitionList)) { @@ -521,6 +530,7 @@ public void complete() { @Override public void fail(Errors e) { + log.error("Error writing to TX log for {}, answer {}", transactionalId, e); responseCallback.accept(e); } }, errors -> true); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java index 79b91884df..40fef2aa2f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java @@ -30,6 +30,7 @@ import javax.security.sasl.SaslException; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; +import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.message.SaslAuthenticateRequestData; import org.apache.kafka.common.message.SaslHandshakeRequestData; import org.apache.kafka.common.protocol.ApiKeys; @@ -112,8 +113,15 @@ public void channelActive(ChannelHandlerContext channelHandlerContext) throws Ex public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception { log.info("[TransactionMarkerChannelHandler] channelInactive, failing {} pending requests", pendingRequestMap.size()); - pendingRequestMap.forEach((__, pendingRequest) -> - log.warn("Pending request ({}) was not sent when the txn marker channel is inactive", pendingRequest)); + pendingRequestMap.forEach((correlationId, pendingRequest) -> { + log.warn("Pending request ({}) was not sent when the txn marker channel is inactive", pendingRequest); + pendingRequest.complete(responseContext.set( + channelHandlerContext.channel().remoteAddress(), + pendingRequest.getApiVersion(), + (int) correlationId, + pendingRequest.createErrorResponse(new NetworkException()) + )); + }); pendingRequestMap.clear(); transactionMarkerChannelManager.channelFailed((InetSocketAddress) channelHandlerContext .channel() @@ -151,9 +159,15 @@ public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) t @Override public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception { log.error("Transaction marker channel handler caught exception.", throwable); - pendingRequestMap.forEach((__, pendingRequest) -> + pendingRequestMap.forEach((correlationId, pendingRequest) -> { log.warn("Pending request ({}) failed because the txn marker channel caught exception", - pendingRequest, throwable)); + pendingRequest, throwable); + pendingRequest.complete(responseContext.set( + channelHandlerContext.channel().remoteAddress(), + pendingRequest.getApiVersion(), + (int) correlationId, + pendingRequest.createErrorResponse(new NetworkException(throwable)))); + }); pendingRequestMap.clear(); channelHandlerContext.close(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java index e09f84ab19..9bf88d49c1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java @@ -169,6 +169,7 @@ private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions( case NOT_ENOUGH_REPLICAS: case NOT_ENOUGH_REPLICAS_AFTER_APPEND: case REQUEST_TIMED_OUT: + case NETWORK_EXCEPTION: case UNKNOWN_SERVER_ERROR: case KAFKA_STORAGE_ERROR: // these are retriable errors log.info("Sending {}'s transaction marker for partition {} has failed with error {}, " diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java index a77cedd568..31b1519748 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java @@ -775,6 +775,10 @@ private void completeLoadedTransactions(TopicPartition topicPartition, long star public void removeTransactionsForTxnTopicPartition(int partition) { TopicPartition topicPartition = new TopicPartition(transactionConfig.getTransactionMetadataTopicName(), partition); + if (scheduler.isShutdown()) { + log.info("Skip unloading transaction metadata from {} as broker is stopping", topicPartition); + return; + } log.info("Scheduling unloading transaction metadata from {}", topicPartition); CoreUtils.inWriteLock(stateLock, () -> {