Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[transactions] Implement KIP-664 - DESCRIBE_TRANSACTIONS (#77)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1f2fe99)
  • Loading branch information
eolivelli authored and gaoran10 committed Jul 30, 2023
1 parent 3a4e749 commit c9e90c2
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case LIST_TRANSACTIONS:
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_TRANSACTIONS:
handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DELETE_GROUPS:
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -578,6 +581,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected abstract void
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.FetchRequestData;
Expand Down Expand Up @@ -159,6 +160,8 @@
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FetchRequest;
Expand Down Expand Up @@ -2064,6 +2067,16 @@ protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransacti
resultFuture.complete(new ListTransactionsResponse(listResult));
}

@Override
protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> response) {
checkArgument(listGroups.getRequest() instanceof DescribeTransactionsRequest);
DescribeTransactionsRequest request = (DescribeTransactionsRequest) listGroups.getRequest();
DescribeTransactionsResponseData describeResult = getTransactionCoordinator()
.handleDescribeTransactions(request.data().transactionalIds());
response.complete(new DescribeTransactionsResponse(describeResult));
}

@Override
protected void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups,
CompletableFuture<AbstractResponse> resultFuture) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.DEAD;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.ONGOING;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_ABORT;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_COMMIT;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
Expand Down Expand Up @@ -230,6 +232,76 @@ public ListTransactionsResponseData handleListTransactions(List<String> filtered
return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates);
}

public DescribeTransactionsResponseData handleDescribeTransactions(List<String> transactionalIds) {
DescribeTransactionsResponseData response = new DescribeTransactionsResponseData();
if (transactionalIds != null) {
transactionalIds.forEach(transactionalId -> {
DescribeTransactionsResponseData.TransactionState transactionState =
handleDescribeTransactions(transactionalId);
response.transactionStates().add(transactionState);
});
}
return response;
}

private DescribeTransactionsResponseData.TransactionState handleDescribeTransactions(String transactionalId) {
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L270
if (transactionalId == null) {
throw new IllegalArgumentException("Invalid null transactionalId");
}

DescribeTransactionsResponseData.TransactionState transactionState =
new DescribeTransactionsResponseData.TransactionState()
.setTransactionalId(transactionalId);

if (!isActive.get()) {
transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
} else if (transactionalId.isEmpty()) {
transactionState.setErrorCode(Errors.INVALID_REQUEST.code());
} else {
Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> tState =
txnManager.getTransactionState(transactionalId);
if (tState.isLeft()) {
transactionState.setErrorCode(tState.getLeft().code());
} else {
Optional<CoordinatorEpochAndTxnMetadata> right = tState.getRight();
if (!right.isPresent()) {
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
} else {
CoordinatorEpochAndTxnMetadata coordinatorEpochAndMetadata = right.get();
TransactionMetadata txnMetadata = coordinatorEpochAndMetadata.getTransactionMetadata();
txnMetadata.inLock(() -> {
if (txnMetadata.getState() == DEAD) {
// The transaction state is being expired, so ignore it
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
} else {
txnMetadata.getTopicPartitions().forEach(topicPartition -> {
var topicData = transactionState.topics().find(topicPartition.topic());
if (topicData == null) {
topicData = new DescribeTransactionsResponseData.TopicData()
.setTopic(topicPartition.topic());
transactionState.topics().add(topicData);
}
topicData.partitions().add(topicPartition.partition());
});

transactionState
.setErrorCode(Errors.NONE.code())
.setProducerId(txnMetadata.getProducerId())
.setProducerEpoch(txnMetadata.getProducerEpoch())
.setTransactionState(txnMetadata.getState().toAdminState().toString())
.setTransactionTimeoutMs(txnMetadata.getTxnTimeoutMs())
.setTransactionStartTimeMs(txnMetadata.getTxnStartTimestamp());
}
return null;
});
}
}
}
return transactionState;
}

@Data
@EqualsAndHashCode
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -81,6 +83,8 @@
@Slf4j
public class TransactionTest extends KopProtocolHandlerTestBase {

private static final int TRANSACTION_TIMEOUT_CONFIG_VALUE = 600 * 1000;

protected void setupTransactions() {
this.conf.setDefaultNumberOfNamespaceBundles(4);
this.conf.setOffsetsTopicNumPartitions(10);
Expand Down Expand Up @@ -1157,7 +1161,7 @@ private KafkaProducer<Integer, String> buildTransactionProducer(String transacti
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout);
} else {
// very long time-out
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600 * 1000);
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, TRANSACTION_TIMEOUT_CONFIG_VALUE);
}
addCustomizeProps(producerProps);

Expand Down Expand Up @@ -1324,10 +1328,10 @@ public void testNotFencedWithBeginTransaction() throws Exception {
producer2.close();
}

@Test(timeOut = 100000 * 30)
public void testListTransactions() throws Exception {
@Test(timeOut = 1000 * 30)
public void testListAndDescribeTransactions() throws Exception {

String topicName = "testListTransactions";
String topicName = "testListAndDescribeTransactions";
String transactionalId = "myProducer_" + UUID.randomUUID();

@Cleanup
Expand Down Expand Up @@ -1432,6 +1436,33 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa
.findFirst()
.get();
assertEquals(transactionState, transactionListing.state());

Map<String, TransactionDescription> map =
kafkaAdmin.describeTransactions(Collections.singleton(transactionalId))
.all().get();
assertEquals(1, map.size());
TransactionDescription transactionDescription = map.get(transactionalId);
log.info("transactionDescription {}", transactionDescription);
assertNotNull(transactionDescription);
assertEquals(transactionDescription.state(), transactionState);
assertTrue(transactionDescription.producerEpoch() >= 0);
assertEquals(TRANSACTION_TIMEOUT_CONFIG_VALUE, transactionDescription.transactionTimeoutMs());
assertTrue(transactionDescription.transactionStartTimeMs().isPresent());
assertTrue(transactionDescription.coordinatorId() >= 0);

switch (transactionState) {
case EMPTY:
case COMPLETE_COMMIT:
case COMPLETE_ABORT:
assertEquals(0, transactionDescription.topicPartitions().size());
break;
case ONGOING:
assertEquals(1, transactionDescription.topicPartitions().size());
break;
default:
fail("unhandled " + transactionState);
}

}

/**
Expand Down

0 comments on commit c9e90c2

Please sign in to comment.