Skip to content

Commit

Permalink
[transactions] Better handling of network exceptions while sending TX…
Browse files Browse the repository at this point in the history
… markers (streamnative#1907)

This PR is cherry-picking from
datastax/starlight-for-kafka@e0636995

### Motivation

Complete the pending request features when the channel is Inactive, or
the exception caught.

Co-authored-by: Enrico Olivelli <[email protected]>
(cherry picked from commit 8287e60)
  • Loading branch information
Demogorgon314 authored and BewareMyPower committed Jul 5, 2023
1 parent 1d7f6c5 commit 541edc5
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public int getCorrelationId() {
return requestHeader.correlationId();
}

public AbstractResponse createErrorResponse(Throwable error) {
return request.getErrorResponse(error);
}

public void complete(final ResponseContext responseContext) {
responseConsumerHandler.accept(responseContext);
sendFuture.complete(responseContext.getResponse());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,17 @@ public void handleAddPartitionsToTransaction(String transactionalId,
return Either.left(producerEpochFenceErrors());
} else if (txnMetadata.getPendingState().isPresent()) {
// return a retriable exception to let the client backoff and retry
if (log.isDebugEnabled()) {
log.debug("Producer {} is in pending state {}, responding CONCURRENT_TRANSACTIONS",
transactionalId, txnMetadata.getPendingState());
}
return Either.left(Errors.CONCURRENT_TRANSACTIONS);
} else if (txnMetadata.getState() == PREPARE_COMMIT || txnMetadata.getState() == PREPARE_ABORT) {
if (log.isDebugEnabled()) {
log.debug("Producer {} is in state {}, responding CONCURRENT_TRANSACTIONS",
transactionalId, txnMetadata.getState()
);
}
return Either.left(Errors.CONCURRENT_TRANSACTIONS);
} else if (txnMetadata.getState() == ONGOING
&& txnMetadata.getTopicPartitions().containsAll(partitionList)) {
Expand Down Expand Up @@ -521,6 +530,7 @@ public void complete() {

@Override
public void fail(Errors e) {
log.error("Error writing to TX log for {}, answer {}", transactionalId, e);
responseCallback.accept(e);
}
}, errors -> true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.security.sasl.SaslException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -112,8 +113,15 @@ public void channelActive(ChannelHandlerContext channelHandlerContext) throws Ex
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
log.info("[TransactionMarkerChannelHandler] channelInactive, failing {} pending requests",
pendingRequestMap.size());
pendingRequestMap.forEach((__, pendingRequest) ->
log.warn("Pending request ({}) was not sent when the txn marker channel is inactive", pendingRequest));
pendingRequestMap.forEach((correlationId, pendingRequest) -> {
log.warn("Pending request ({}) was not sent when the txn marker channel is inactive", pendingRequest);
pendingRequest.complete(responseContext.set(
channelHandlerContext.channel().remoteAddress(),
pendingRequest.getApiVersion(),
(int) correlationId,
pendingRequest.createErrorResponse(new NetworkException())
));
});
pendingRequestMap.clear();
transactionMarkerChannelManager.channelFailed((InetSocketAddress) channelHandlerContext
.channel()
Expand Down Expand Up @@ -151,9 +159,15 @@ public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) t
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
log.error("Transaction marker channel handler caught exception.", throwable);
pendingRequestMap.forEach((__, pendingRequest) ->
pendingRequestMap.forEach((correlationId, pendingRequest) -> {
log.warn("Pending request ({}) failed because the txn marker channel caught exception",
pendingRequest, throwable));
pendingRequest, throwable);
pendingRequest.complete(responseContext.set(
channelHandlerContext.channel().remoteAddress(),
pendingRequest.getApiVersion(),
(int) correlationId,
pendingRequest.createErrorResponse(new NetworkException(throwable))));
});
pendingRequestMap.clear();
channelHandlerContext.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions(
case NOT_ENOUGH_REPLICAS:
case NOT_ENOUGH_REPLICAS_AFTER_APPEND:
case REQUEST_TIMED_OUT:
case NETWORK_EXCEPTION:
case UNKNOWN_SERVER_ERROR:
case KAFKA_STORAGE_ERROR: // these are retriable errors
log.info("Sending {}'s transaction marker for partition {} has failed with error {}, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,10 @@ private void completeLoadedTransactions(TopicPartition topicPartition, long star
public void removeTransactionsForTxnTopicPartition(int partition) {
TopicPartition topicPartition =
new TopicPartition(transactionConfig.getTransactionMetadataTopicName(), partition);
if (scheduler.isShutdown()) {
log.info("Skip unloading transaction metadata from {} as broker is stopping", topicPartition);
return;
}
log.info("Scheduling unloading transaction metadata from {}", topicPartition);

CoreUtils.inWriteLock(stateLock, () -> {
Expand Down

0 comments on commit 541edc5

Please sign in to comment.