Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[fix][transaction] Handle some failures in transaction (#1977)
Browse files Browse the repository at this point in the history
### Modifications

Handle some failures while writing transaction logs and markers.

---------

Co-authored-by: Enrico Olivelli <[email protected]>
  • Loading branch information
gaoran10 and eolivelli authored Aug 7, 2023
1 parent 19801c1 commit 0c9f946
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
category = CATEGORY_KOP_TRANSACTION,
doc = "Interval for purging aborted transactions from memory (requires reads from storage)"
)
private int kafkaTxnPurgeAbortedTxnIntervalSeconds = 60 * 60;
private int kafkaTxnPurgeAbortedTxnIntervalSeconds = 60 * 60 * 24;

@FieldContext(
category = CATEGORY_KOP_TRANSACTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,27 +653,43 @@ public void fail(Errors errors) {
if (isEpochFence.get()) {
Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>>
errorsAndData = txnManager.getTransactionState(transactionalId);
if (!errorsAndData.getRight().isPresent()) {
log.warn("The coordinator still owns the transaction partition for {}, but there "
if (errorsAndData.isLeft()) {
log.error("Cannot get transaction metadata for {}, status {}", transactionalId,
errorsAndData.getLeft());
} else if (errorsAndData.isLeft() || !errorsAndData.getRight().isPresent()) {
log.error("The coordinator still owns the transaction partition for {}, but there "
+ "is no metadata in the cache; this is not expected", transactionalId);
return;
}
CoordinatorEpochAndTxnMetadata epochAndMetadata = errorsAndData.getRight().get();
if (epochAndMetadata.getCoordinatorEpoch() == coordinatorEpoch) {
} else {
CoordinatorEpochAndTxnMetadata epochAndMetadata = errorsAndData.getRight().get();
if (epochAndMetadata.getCoordinatorEpoch() == coordinatorEpoch) {
// This was attempted epoch fence that failed, so mark this state on the metadata
epochAndMetadata.getTransactionMetadata().setHasFailedEpochFence(true);
epochAndMetadata.getTransactionMetadata().setHasFailedEpochFence(true);

// this line is not present in Kafka code base ?
epochAndMetadata.getTransactionMetadata().setPendingState(Optional.empty());
// this line is not present in Kafka code base ?
epochAndMetadata.getTransactionMetadata().setPendingState(Optional.empty());

log.warn("The coordinator failed to write an epoch fence transition for producer "
+ "{} to the transaction log with error {}. The epoch was increased to {} "
+ "but not returned to the client", transactionalId, errors,
+ "{} to the transaction log with error {}. "
+ "The epoch was increased to {} "
+ "but not returned to the client", transactionalId, errors,
preAppendResult.getRight().getProducerEpoch());
}
}
} else {
Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>>
errorsAndData = txnManager.getTransactionState(transactionalId);
if (errorsAndData.isLeft()) {
log.error("Cannot get transaction metadata for {}, status {}", transactionalId,
errorsAndData.getLeft());
} else if (errorsAndData.getRight().isPresent()) {
log.error("Resetting transactionalId {} pendingState to EMPTY, status {}",
transactionalId,
errorsAndData.getLeft());
CoordinatorEpochAndTxnMetadata epochAndMetadata = errorsAndData.getRight().get();
epochAndMetadata.getTransactionMetadata().setPendingState(Optional.empty());
}
}


callback.accept(errors);
}
}, retryErrors -> true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,25 @@ public CompletableFuture<TransactionMarkerChannelHandler> getChannel(InetSocketA
return FutureUtil.failedFuture(new Exception("This TransactionMarkerChannelManager is closed"));
}
ensureDrainQueuedTransactionMarkersActivity();
return handlerMap.computeIfAbsent(socketAddress, address -> {
CompletableFuture<TransactionMarkerChannelHandler> handlerFuture = new CompletableFuture<>();
ChannelFutures.toCompletableFuture(bootstrap.connect(socketAddress))
.thenAccept(channel -> {
handlerFuture.complete(
(TransactionMarkerChannelHandler) channel.pipeline().get("txnHandler"));
}).exceptionally(e -> {
handlerFuture.completeExceptionally(e);
return null;
});
return handlerFuture;
});
CompletableFuture<TransactionMarkerChannelHandler> result =
handlerMap.computeIfAbsent(socketAddress, address -> new CompletableFuture<>());

ChannelFutures.toCompletableFuture(bootstrap.connect(socketAddress))
.thenAccept(channel -> {
result.complete(
(TransactionMarkerChannelHandler) channel.pipeline().get("txnHandler"));
}).exceptionally(e -> {
log.error("getChannel failed {} {}", socketAddress, e.getMessage(), e);
result.completeExceptionally(e);
handlerMap.remove(socketAddress, result);
return null;
});

if (result.isCompletedExceptionally()) {
// edge case, the future failed before it was cached
handlerMap.remove(socketAddress, result);
}
return result;
}

public void channelFailed(InetSocketAddress socketAddress, TransactionMarkerChannelHandler handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void safeRun() {
}

void maybeTakeSnapshot(Executor executor) {
if (mapEndOffset == -1) {
if (mapEndOffset == -1 || kafkaTxnProducerStateTopicSnapshotIntervalSeconds <= 0) {
return;
}
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -181,7 +181,7 @@ void updateAbortedTxnsPurgeOffset(long abortedTxnsPurgeOffset) {
}

long maybePurgeAbortedTx() {
if (mapEndOffset == -1) {
if (mapEndOffset == -1 || kafkaTxnPurgeAbortedTxnIntervalSeconds <= 0) {
return 0;
}
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -332,9 +332,6 @@ public long purgeAbortedTxns(long offset) {
}
return toRemove;
});
if (!abortedIndexList.isEmpty()) {
log.info("There are still {} aborted tx on {}", abortedIndexList.size(), topicPartition);
}
}
return count.get();
}
Expand Down

0 comments on commit 0c9f946

Please sign in to comment.