Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[transactions] Implement KIP-664 - DESCRIBE_TRANSACTIONS (#77)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1f2fe99)
  • Loading branch information
eolivelli authored and gaoran10 committed Aug 9, 2023
1 parent 9d440e6 commit 6108953
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case LIST_TRANSACTIONS:
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DESCRIBE_TRANSACTIONS:
handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DELETE_GROUPS:
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -589,6 +592,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected abstract void
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.FetchRequestData;
Expand Down Expand Up @@ -161,6 +162,8 @@
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FetchRequest;
Expand Down Expand Up @@ -2043,6 +2046,16 @@ protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransacti
resultFuture.complete(new ListTransactionsResponse(listResult));
}

@Override
protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> response) {
checkArgument(listGroups.getRequest() instanceof DescribeTransactionsRequest);
DescribeTransactionsRequest request = (DescribeTransactionsRequest) listGroups.getRequest();
DescribeTransactionsResponseData describeResult = getTransactionCoordinator()
.handleDescribeTransactions(request.data().transactionalIds());
response.complete(new DescribeTransactionsResponse(describeResult));
}

@Override
protected void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups,
CompletableFuture<AbstractResponse> resultFuture) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.DEAD;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.ONGOING;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_ABORT;
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_COMMIT;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
Expand Down Expand Up @@ -233,6 +235,76 @@ public ListTransactionsResponseData handleListTransactions(List<String> filtered
return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates);
}

public DescribeTransactionsResponseData handleDescribeTransactions(List<String> transactionalIds) {
DescribeTransactionsResponseData response = new DescribeTransactionsResponseData();
if (transactionalIds != null) {
transactionalIds.forEach(transactionalId -> {
DescribeTransactionsResponseData.TransactionState transactionState =
handleDescribeTransactions(transactionalId);
response.transactionStates().add(transactionState);
});
}
return response;
}

private DescribeTransactionsResponseData.TransactionState handleDescribeTransactions(String transactionalId) {
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L270
if (transactionalId == null) {
throw new IllegalArgumentException("Invalid null transactionalId");
}

DescribeTransactionsResponseData.TransactionState transactionState =
new DescribeTransactionsResponseData.TransactionState()
.setTransactionalId(transactionalId);

if (!isActive.get()) {
transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
} else if (transactionalId.isEmpty()) {
transactionState.setErrorCode(Errors.INVALID_REQUEST.code());
} else {
Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> tState =
txnManager.getTransactionState(transactionalId);
if (tState.isLeft()) {
transactionState.setErrorCode(tState.getLeft().code());
} else {
Optional<CoordinatorEpochAndTxnMetadata> right = tState.getRight();
if (!right.isPresent()) {
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
} else {
CoordinatorEpochAndTxnMetadata coordinatorEpochAndMetadata = right.get();
TransactionMetadata txnMetadata = coordinatorEpochAndMetadata.getTransactionMetadata();
txnMetadata.inLock(() -> {
if (txnMetadata.getState() == DEAD) {
// The transaction state is being expired, so ignore it
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
} else {
txnMetadata.getTopicPartitions().forEach(topicPartition -> {
var topicData = transactionState.topics().find(topicPartition.topic());
if (topicData == null) {
topicData = new DescribeTransactionsResponseData.TopicData()
.setTopic(topicPartition.topic());
transactionState.topics().add(topicData);
}
topicData.partitions().add(topicPartition.partition());
});

transactionState
.setErrorCode(Errors.NONE.code())
.setProducerId(txnMetadata.getProducerId())
.setProducerEpoch(txnMetadata.getProducerEpoch())
.setTransactionState(txnMetadata.getState().toAdminState().toString())
.setTransactionTimeoutMs(txnMetadata.getTxnTimeoutMs())
.setTransactionStartTimeMs(txnMetadata.getTxnStartTimestamp());
}
return null;
});
}
}
}
return transactionState;
}

@Data
@EqualsAndHashCode
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -87,6 +88,8 @@
@Slf4j
public class TransactionTest extends KopProtocolHandlerTestBase {

private static final int TRANSACTION_TIMEOUT_CONFIG_VALUE = 600 * 1000;

protected void setupTransactions() {
this.conf.setDefaultNumberOfNamespaceBundles(4);
this.conf.setOffsetsTopicNumPartitions(10);
Expand Down Expand Up @@ -1162,7 +1165,7 @@ private KafkaProducer<Integer, String> buildTransactionProducer(String transacti
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout);
} else {
// very long time-out
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600 * 1000);
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, TRANSACTION_TIMEOUT_CONFIG_VALUE);
}
producerProps.put(CLIENT_ID_CONFIG, "dummy_client_" + UUID.randomUUID());
addCustomizeProps(producerProps);
Expand Down Expand Up @@ -1491,10 +1494,10 @@ public void testAbortedTxEventuallyPurged() throws Exception {
}
}

@Test(timeOut = 100000 * 30)
public void testListTransactions() throws Exception {
@Test(timeOut = 1000 * 30)
public void testListAndDescribeTransactions() throws Exception {

String topicName = "testListTransactions";
String topicName = "testListAndDescribeTransactions";
String transactionalId = "myProducer_" + UUID.randomUUID();

@Cleanup
Expand Down Expand Up @@ -1599,116 +1602,33 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa
.findFirst()
.get();
assertEquals(transactionState, transactionListing.state());
}

@Test(timeOut = 100000 * 30)
public void testListTransactions() throws Exception {

String topicName = "testListTransactions";
String transactionalId = "myProducer_" + UUID.randomUUID();

@Cleanup
KafkaProducer<Integer, String> producer = buildTransactionProducer(transactionalId);
@Cleanup
AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties());

producer.initTransactions();
producer.beginTransaction();
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.EMPTY);
producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
producer.flush();

ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions();
listTransactionsResult.all().get().forEach(t -> {
log.info("Found transactionalId: {} {} {}",
t.transactionalId(),
t.producerId(),
t.state());
});
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.ONGOING);
Awaitility.await().untilAsserted(() -> {
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.ONGOING);
});
producer.commitTransaction();
Awaitility.await().untilAsserted(() -> {
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT);
});
producer.beginTransaction();

assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT);

producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
producer.flush();
producer.abortTransaction();
Awaitility.await().untilAsserted(() -> {
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT);
});
producer.close();
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT);
}

private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId,
org.apache.kafka.clients.admin.TransactionState transactionState)
throws Exception {
ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions();
Collection<TransactionListing> transactionListings = listTransactionsResult.all().get();
transactionListings.forEach(t -> {
log.info("Found transactionalId: {} {} {}",
t.transactionalId(),
t.producerId(),
t.state());
});
TransactionListing transactionListing = transactionListings
.stream()
.filter(t -> t.transactionalId().equals(transactionalId))
.findFirst()
.get();
assertEquals(transactionState, transactionListing.state());

// filter for the same state
ListTransactionsOptions optionFilterState = new ListTransactionsOptions()
.filterStates(Collections.singleton(transactionState));
listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState);
transactionListings = listTransactionsResult.all().get();
transactionListing = transactionListings
.stream()
.filter(t -> t.transactionalId().equals(transactionalId))
.findFirst()
.get();
assertEquals(transactionState, transactionListing.state());


// filter for the same producer id
ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions()
.filterProducerIds(Collections.singleton(transactionListing.producerId()));
listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer);
transactionListings = listTransactionsResult.all().get();
transactionListing = transactionListings
.stream()
.filter(t -> t.transactionalId().equals(transactionalId))
.findFirst()
.get();
assertEquals(transactionState, transactionListing.state());
Map<String, TransactionDescription> map =
kafkaAdmin.describeTransactions(Collections.singleton(transactionalId))
.all().get();
assertEquals(1, map.size());
TransactionDescription transactionDescription = map.get(transactionalId);
log.info("transactionDescription {}", transactionDescription);
assertNotNull(transactionDescription);
assertEquals(transactionDescription.state(), transactionState);
assertTrue(transactionDescription.producerEpoch() >= 0);
assertEquals(TRANSACTION_TIMEOUT_CONFIG_VALUE, transactionDescription.transactionTimeoutMs());
assertTrue(transactionDescription.transactionStartTimeMs().isPresent());
assertTrue(transactionDescription.coordinatorId() >= 0);

switch (transactionState) {
case EMPTY:
case COMPLETE_COMMIT:
case COMPLETE_ABORT:
assertEquals(0, transactionDescription.topicPartitions().size());
break;
case ONGOING:
assertEquals(1, transactionDescription.topicPartitions().size());
break;
default:
fail("unhandled " + transactionState);
}

// filter for the same producer id and state
ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions()
.filterStates(Collections.singleton(transactionState))
.filterProducerIds(Collections.singleton(transactionListing.producerId()));
listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState);
transactionListings = listTransactionsResult.all().get();
transactionListing = transactionListings
.stream()
.filter(t -> t.transactionalId().equals(transactionalId))
.findFirst()
.get();
assertEquals(transactionState, transactionListing.state());
}

/**
Expand Down

0 comments on commit 6108953

Please sign in to comment.