Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[transactions] Better handling of network exceptions while sending TX markers #1907

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,9 @@ protected void handleAddPartitionsToTxn(KafkaHeaderAndRequest kafkaHeaderAndRequ
responseData.results().add(topicResult);
topicPartitionErrorsMap.forEach((TopicPartition tp, Errors error) -> {
if (tp.topic().equals(topicName)) {
if (log.isDebugEnabled() && error != Errors.NONE) {
log.info("Error {} for {}", error, tp);
}
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
topicResult.results()
.add(new AddPartitionsToTxnResponseData
.AddPartitionsToTxnPartitionResult()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public int getCorrelationId() {
return requestHeader.correlationId();
}

public AbstractResponse createErrorResponse(Throwable error) {
return request.getErrorResponse(error);
}

public void complete(final ResponseContext responseContext) {
responseConsumerHandler.accept(responseContext);
sendFuture.complete(responseContext.getResponse());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,17 @@ public void handleAddPartitionsToTransaction(String transactionalId,
return Either.left(producerEpochFenceErrors());
} else if (txnMetadata.getPendingState().isPresent()) {
// return a retriable exception to let the client backoff and retry
if (log.isDebugEnabled()) {
log.debug("Producer {} is in pending state {}, responding CONCURRENT_TRANSACTIONS",
transactionalId, txnMetadata.getPendingState());
}
return Either.left(Errors.CONCURRENT_TRANSACTIONS);
} else if (txnMetadata.getState() == PREPARE_COMMIT || txnMetadata.getState() == PREPARE_ABORT) {
if (log.isDebugEnabled()) {
log.debug("Producer {} is in state {}, responding CONCURRENT_TRANSACTIONS",
transactionalId, txnMetadata.getState()
);
}
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
return Either.left(Errors.CONCURRENT_TRANSACTIONS);
} else if (txnMetadata.getState() == ONGOING
&& txnMetadata.getTopicPartitions().containsAll(partitionList)) {
Expand Down Expand Up @@ -521,6 +530,7 @@ public void complete() {

@Override
public void fail(Errors e) {
log.error("Error writing to TX log for {}, answer {}", transactionalId, e);
responseCallback.accept(e);
}
}, errors -> true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.security.sasl.SaslException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -112,8 +113,15 @@ public void channelActive(ChannelHandlerContext channelHandlerContext) throws Ex
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
log.info("[TransactionMarkerChannelHandler] channelInactive, failing {} pending requests",
pendingRequestMap.size());
pendingRequestMap.forEach((__, pendingRequest) ->
log.warn("Pending request ({}) was not sent when the txn marker channel is inactive", pendingRequest));
pendingRequestMap.forEach((correlationId, pendingRequest) -> {
log.warn("Pending request ({}) was not sent when the txn marker channel is inactive", pendingRequest);
pendingRequest.complete(responseContext.set(
channelHandlerContext.channel().remoteAddress(),
pendingRequest.getApiVersion(),
(int) correlationId,
pendingRequest.createErrorResponse(new NetworkException())
));
});
pendingRequestMap.clear();
transactionMarkerChannelManager.channelFailed((InetSocketAddress) channelHandlerContext
.channel()
Expand Down Expand Up @@ -151,9 +159,15 @@ public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) t
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
log.error("Transaction marker channel handler caught exception.", throwable);
pendingRequestMap.forEach((__, pendingRequest) ->
pendingRequestMap.forEach((correlationId, pendingRequest) -> {
log.warn("Pending request ({}) failed because the txn marker channel caught exception",
pendingRequest, throwable));
pendingRequest, throwable);
pendingRequest.complete(responseContext.set(
channelHandlerContext.channel().remoteAddress(),
pendingRequest.getApiVersion(),
(int) correlationId,
pendingRequest.createErrorResponse(new NetworkException(throwable))));
});
pendingRequestMap.clear();
channelHandlerContext.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions(
case NOT_ENOUGH_REPLICAS:
case NOT_ENOUGH_REPLICAS_AFTER_APPEND:
case REQUEST_TIMED_OUT:
case NETWORK_EXCEPTION:
case UNKNOWN_SERVER_ERROR:
case KAFKA_STORAGE_ERROR: // these are retriable errors
log.info("Sending {}'s transaction marker for partition {} has failed with error {}, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,10 @@ private void completeLoadedTransactions(TopicPartition topicPartition, long star
public void removeTransactionsForTxnTopicPartition(int partition) {
TopicPartition topicPartition =
new TopicPartition(transactionConfig.getTransactionMetadataTopicName(), partition);
if (scheduler.isShutdown()) {
log.info("Skip unloading transaction metadata from {} as broker is stopping", topicPartition);
return;
}
log.info("Scheduling unloading transaction metadata from {}", topicPartition);

CoreUtils.inWriteLock(stateLock, () -> {
Expand Down