From fa70e37433647538b24f3e0a0a2dc0cf7a649aec Mon Sep 17 00:00:00 2001 From: Jon Lee Date: Tue, 16 Jul 2019 20:53:10 -0700 Subject: [PATCH] Federated Clients: implement commitSync() and committed() --- .../clients/common/LocationLookupResult.java | 33 +++ .../common/PartitionKeyedMapLookupResult.java | 37 +++ .../clients/common/PartitionLookupResult.java | 17 +- .../clients/common/TopicLookupResult.java | 17 +- .../consumer/LiKafkaConsumerConfig.java | 8 +- .../LiKafkaFederatedConsumerImpl.java | 211 +++++++++++++++++- .../MetadataServiceClient.java | 4 +- .../LiKafkaFederatedConsumerImplTest.java | 12 + 8 files changed, 306 insertions(+), 33 deletions(-) create mode 100644 li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/LocationLookupResult.java create mode 100644 li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/PartitionKeyedMapLookupResult.java diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/LocationLookupResult.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/LocationLookupResult.java new file mode 100644 index 0000000..e9c9875 --- /dev/null +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/LocationLookupResult.java @@ -0,0 +1,33 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information. + */ + +package com.linkedin.kafka.clients.common; + +import java.util.Collections; +import java.util.Set; + + +// This contains the result of the location lookup for a set of topics/partitions or a map of values keyed by +// topic/partitition. +public abstract class LocationLookupResult { + public enum ValueType { + PARTITIONS, TOPICS, PARTITION_KEYED_MAP + } + + private Set _nonexistentTopics; + + public LocationLookupResult() { + _nonexistentTopics = Collections.emptySet(); + } + + public LocationLookupResult(/*Map valuesByCluster, */Set nonexistentTopics) { + _nonexistentTopics = nonexistentTopics; + } + + public abstract ValueType getValueType(); + + public Set getNonexistentTopics() { + return _nonexistentTopics; + } +} \ No newline at end of file diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/PartitionKeyedMapLookupResult.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/PartitionKeyedMapLookupResult.java new file mode 100644 index 0000000..84b16ab --- /dev/null +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/PartitionKeyedMapLookupResult.java @@ -0,0 +1,37 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information. + */ + +package com.linkedin.kafka.clients.common; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.TopicPartition; + + +// This contains the result of the location lookup for a map keyed by partition across multiple clusters in a cluster +// group. +public class PartitionKeyedMapLookupResult extends LocationLookupResult { + private Map> _partitionKeyedMapsByCluster; + + public PartitionKeyedMapLookupResult() { + this(Collections.emptyMap(), Collections.emptySet()); + } + + public PartitionKeyedMapLookupResult(Map> partitionKeyedMapsByCluster, + Set nonexistentTopics) { + super(nonexistentTopics); + _partitionKeyedMapsByCluster = partitionKeyedMapsByCluster; + } + + @Override + public LocationLookupResult.ValueType getValueType() { + return LocationLookupResult.ValueType.PARTITION_KEYED_MAP; + } + + public Map> getPartitionKeyedMapsByCluster() { + return _partitionKeyedMapsByCluster; + } +} \ No newline at end of file diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/PartitionLookupResult.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/PartitionLookupResult.java index 08759a0..f82f177 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/PartitionLookupResult.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/PartitionLookupResult.java @@ -12,26 +12,25 @@ // This contains the result of the location lookup for a set of partitions across multiple clusters in a cluster group. -public class PartitionLookupResult { +public class PartitionLookupResult extends LocationLookupResult { private Map> _partitionsByCluster; - private Set _nonexistentTopics; public PartitionLookupResult() { - _partitionsByCluster = Collections.emptyMap(); - _nonexistentTopics = Collections.emptySet(); + this(Collections.emptyMap(), Collections.emptySet()); } public PartitionLookupResult(Map> partitionsByCluster, Set nonexistentTopics) { + super(nonexistentTopics); _partitionsByCluster = partitionsByCluster; - _nonexistentTopics = nonexistentTopics; } - public Map> getPartitionsByCluster() { - return _partitionsByCluster; + @Override + public LocationLookupResult.ValueType getValueType() { + return LocationLookupResult.ValueType.PARTITIONS; } - public Set getNonexistentTopics() { - return _nonexistentTopics; + public Map> getPartitionsByCluster() { + return _partitionsByCluster; } } \ No newline at end of file diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/TopicLookupResult.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/TopicLookupResult.java index 8d25c41..61bd4d8 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/TopicLookupResult.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/TopicLookupResult.java @@ -10,25 +10,24 @@ // This contains the result of the location lookup for a set of topics across multiple clusters in a cluster group. -public class TopicLookupResult { +public class TopicLookupResult extends LocationLookupResult { private Map> _topicsByCluster; - private Set _nonexistentTopics; public TopicLookupResult() { - _topicsByCluster = Collections.emptyMap(); - _nonexistentTopics = Collections.emptySet(); + this(Collections.emptyMap(), Collections.emptySet()); } public TopicLookupResult(Map> topicsByCluster, Set nonexistentTopics) { + super(nonexistentTopics); _topicsByCluster = topicsByCluster; - _nonexistentTopics = nonexistentTopics; } - public Map> getTopicsByCluster() { - return _topicsByCluster; + @Override + public LocationLookupResult.ValueType getValueType() { + return LocationLookupResult.ValueType.TOPICS; } - public Set getNonexistentTopics() { - return _nonexistentTopics; + public Map> getTopicsByCluster() { + return _topicsByCluster; } } \ No newline at end of file diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerConfig.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerConfig.java index e73ed4d..cc1d25b 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerConfig.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerConfig.java @@ -200,7 +200,13 @@ public class LiKafkaConsumerConfig extends AbstractConfig { 60 * 1000, atLeast(0), Importance.MEDIUM, - ConsumerConfig.DEFAULT_API_TIMEOUT_MS_DOC); + ConsumerConfig.DEFAULT_API_TIMEOUT_MS_DOC) + .define(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, + Type.LONG, + 100L, + atLeast(0L), + Importance.LOW, + CommonClientConfigs.RETRY_BACKOFF_MS_DOC); } public LiKafkaConsumerConfig(Map props) { diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java index 5aab5a8..3450c41 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImpl.java @@ -8,6 +8,7 @@ import com.linkedin.kafka.clients.common.ClusterGroupDescriptor; import com.linkedin.kafka.clients.common.LiKafkaFederatedClient; import com.linkedin.kafka.clients.common.LiKafkaFederatedClientType; +import com.linkedin.kafka.clients.common.PartitionKeyedMapLookupResult; import com.linkedin.kafka.clients.common.PartitionLookupResult; import com.linkedin.kafka.clients.common.TopicLookupResult; import com.linkedin.kafka.clients.metadataservice.MetadataServiceClient; @@ -46,6 +47,8 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +109,10 @@ public LiKafkaConsumer getConsumer() { // The default timeout for blocking calls specified by default.api.timeout.ms consumer properties. // This is exposed for federated consumer methods with the default timeout that need to invoke the individual // consumer's corresponding methods concurrently. - private int _defaultApiTimeoutMs; + private Duration _defaultApiTimeout; + + // retry.backoff.ms + private long _retryBackoffMs; // The number of clusters in this cluster group to connect to for the current assignment/subscription private int _numClustersToConnectTo; @@ -189,7 +195,8 @@ private LiKafkaFederatedConsumerImpl(LiKafkaConsumerConfig configs, MetadataServ _mdsRequestTimeoutMs = configs.getInt(LiKafkaConsumerConfig.METADATA_SERVICE_REQUEST_TIMEOUT_MS_CONFIG); _maxPollRecordsForFederatedConsumer = configs.getInt(LiKafkaConsumerConfig.MAX_POLL_RECORDS_CONFIG); _metadataMaxAgeMs = configs.getInt(ConsumerConfig.METADATA_MAX_AGE_CONFIG); - _defaultApiTimeoutMs = configs.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); + _defaultApiTimeout = Duration.ofMillis(configs.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)); + _retryBackoffMs = configs.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); _currentSubscription = Unsubscribed.getInstance(); _nextClusterIndexToPoll = 0; @@ -339,7 +346,7 @@ synchronized public void subscribe(Collection topics, ConsumerRebalanceL // consumers for the new subscription, close all existing consumers first. if (!_consumers.isEmpty()) { LOG.debug("closing all existing LiKafkaConsumers due to subscription change"); - closeNoLock(Duration.ofMillis(_defaultApiTimeoutMs)); + closeNoLock(_defaultApiTimeout); _consumers.clear(); } @@ -390,7 +397,7 @@ synchronized public void assign(Collection partitions) { // consumers for the new assignment, close all existing consumers first. if (!_consumers.isEmpty()) { LOG.debug("closing all existing LiKafkaConsumers due to assignment change"); - closeNoLock(Duration.ofMillis(_defaultApiTimeoutMs)); + closeNoLock(_defaultApiTimeout); _consumers.clear(); } @@ -487,22 +494,52 @@ public ConsumerRecords poll(Duration timeout) { @Override synchronized public void commitSync() { - throw new UnsupportedOperationException("Not implemented yet"); + commitSync(_defaultApiTimeout); } @Override synchronized public void commitSync(Duration timeout) { - throw new UnsupportedOperationException("Not implemented yet"); + executePerClusterCallback("commitSync", null /* call for all consumers */, + (consumer, ignored, remainingTimeout) -> consumer.commitSync(remainingTimeout), timeout); } @Override public void commitSync(Map offsets) { - throw new UnsupportedOperationException("Not implemented yet"); + commitSync(offsets, _defaultApiTimeout); } + // ATTN: UnknownTopicOrPartitionException may be received - this is a retriable exception.. + // if not resolved by the time, timeout exception @Override synchronized public void commitSync(Map offsets, Duration timeout) { - throw new UnsupportedOperationException("Not implemented yet"); + // Vanilla consumer's commitSync() retries on nonexistent partitions until the given timeout is reached. Since + // federated consumer does not know where the currently nonexistent topics in the input will be created, it mimics + // the vanilla consumer's behavior by retrying after waiting retry.backoff.ms until the given timeout is reached. + long now = System.currentTimeMillis(); + long deadlineTimeMs = now + timeout.toMillis(); + while (now < deadlineTimeMs) { + PartitionKeyedMapLookupResult offsetsByClusterResult = getPartitionKeyedMapsByCluster(offsets); + // Commit offset with existing partitions first. + executePerClusterCallback("commitSync", offsetsByClusterResult.getPartitionKeyedMapsByCluster(), + (consumer, perClusterOffsets, remainingTimeout) -> { + consumer.commitSync((Map) perClusterOffsets, remainingTimeout); + }, Duration.ofMillis(Math.min(deadlineTimeMs - System.currentTimeMillis(), 0))); + + if (offsetsByClusterResult.getNonexistentTopics().isEmpty()) { + return; + } + + // There are nonexistent topics in the given offsets. Sleep and retry; + try { + Thread.sleep(Math.min(deadlineTimeMs - System.currentTimeMillis(), _retryBackoffMs)); + } catch (InterruptedException e) { + LOG.warn("interrupted while retrying commitSync()"); + throw new InterruptException(e); + } + + now = System.currentTimeMillis(); + } + throw new TimeoutException("Failed to complete commitSync within " + timeout); } @Override @@ -552,17 +589,48 @@ synchronized public long position(TopicPartition partition, Duration timeout) { @Override synchronized public OffsetAndMetadata committed(TopicPartition partition) { - throw new UnsupportedOperationException("Not implemented yet"); + + return committed(partition, _defaultApiTimeout); } @Override synchronized public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) { - throw new UnsupportedOperationException("Not implemented yet"); + if (partition == null) { + throw new IllegalArgumentException("partition cannot be null"); + } + + long now = System.currentTimeMillis(); + long deadlineTimeMs = now + timeout.toMillis(); + while (now < deadlineTimeMs) { + LiKafkaConsumer consumer = getPerClusterConsumerForTopic(partition.topic()); + if (consumer != null) { + return consumer.committed(partition, timeout); + } + + try { + Thread.sleep(Math.min(deadlineTimeMs - System.currentTimeMillis(), _retryBackoffMs)); + } catch (InterruptedException e) { + LOG.warn("interrupted while retrying committed()"); + throw new InterruptException(e); + } + + now = System.currentTimeMillis(); + } + throw new TimeoutException("Failed to complete committed() within " + timeout); } @Override - synchronized public Long committedSafeOffset(TopicPartition tp) { - throw new UnsupportedOperationException("Not implemented yet"); + synchronized public Long committedSafeOffset(TopicPartition partition) { + if (partition == null) { + throw new IllegalArgumentException("partition cannot be null"); + } + + LiKafkaConsumer consumer = getPerClusterConsumerForTopic(partition.topic()); + if (consumer != null) { + return consumer.committedSafeOffset(partition); + } else { + throw new IllegalStateException("partition " + partition + " does not exist"); + } } @Override @@ -838,6 +906,20 @@ void waitForReloadConfigFinish() throws InterruptedException { } } + private LiKafkaConsumer getPerClusterConsumerForTopic(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("Topic cannot be null or empty"); + } + + ClusterDescriptor cluster = null; + try { + cluster = _mdsClient.getClusterForTopic(topic, _clusterGroup, _mdsRequestTimeoutMs); + } catch (MetadataServiceClientException e) { + throw new KafkaException("failed to get cluster for topic " + topic + ": ", e); + } + return (cluster == null) ? null : getPerClusterConsumer(cluster); + } + // Intended for testing only LiKafkaConsumer getPerClusterConsumer(ClusterDescriptor cluster) { // Get an immutable copy of the current set of consumers. @@ -929,4 +1011,109 @@ private void refreshSubscriptionMetadata() { throw new IllegalStateException("Unsupportd subscription type: " + _currentSubscription.getSubscriptionType()); } } + + // Callback interface for concurrently calling a method against per-cluster consumers with cluster-specific input + // values with timeout. + interface PerClusterExecutorCallback { + void execute(LiKafkaConsumer consumer, I perClusterInput, Duration timeout); + } + + // For each cluster in the perClusterInput key set, concurrently call the given callback against the corresponding + // consumer with the input values for that cluster. If perClusterInput is null, the callback will be called for all + // existing consumers. + private void executePerClusterCallback(String methodName, + Map perClusterInput, PerClusterExecutorCallback callback, Duration timeout) { + List> consumers; + if (perClusterInput == null) { + // The callback needs to be executed against all consumers + consumers = _consumers; + } else { + consumers = new ArrayList<>(); + for (ClusterDescriptor cluster : perClusterInput.keySet()) { + LiKafkaConsumer consumer = getPerClusterConsumer(cluster); + if (cluster == null) { + LOG.warn("consumer for cluster {} does not exist for {}. ignoring.", cluster, methodName); + } else { + consumers.add(new ClusterConsumerPair(cluster, consumer)); + } + } + } + + if (consumers.isEmpty()) { + // Nothing to do + return; + } + + LOG.debug("starting {} for {} LiKafkaConsumers for cluster group {} in {} {}", methodName, consumers.size(), + _clusterGroup, timeout); + + long startTimeMs = System.currentTimeMillis(); + long deadlineTimeMs = startTimeMs + timeout.toMillis(); + CountDownLatch countDownLatch = new CountDownLatch(consumers.size()); + Set threads = new HashSet<>(); + for (ClusterConsumerPair entry : consumers) { + ClusterDescriptor cluster = entry.getCluster(); + LiKafkaConsumer consumer = entry.getConsumer(); + Thread t = new Thread(() -> { + try { + Duration remainingTime = Duration.ofMillis(Math.max(deadlineTimeMs - System.currentTimeMillis(), 0)); + callback.execute(consumer, perClusterInput == null ? null : perClusterInput.get(cluster), remainingTime); + } finally { + countDownLatch.countDown(); + } + }); + t.setDaemon(true); + t.setName("LiKafkaConsumer-" + methodName + "-" + cluster.getName()); + t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + throw new KafkaException("Thread " + t.getName() + " throws exception", e); + } + }); + t.start(); + threads.add(t); + } + + try { + if (!countDownLatch.await(deadlineTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { + LiKafkaClientsUtils.dumpStacksForAllLiveThreads(threads); + throw new KafkaException( + "fail to perform " + methodName + " for cluster group " + _clusterGroup + " in " + timeout); + } + } catch (InterruptedException e) { + throw new KafkaException("fail to perform " + methodName + " for cluster group " + _clusterGroup, e); + } + + LOG.info("{}: {} LiKafkaConsumers for cluster group {} complete in {} milliseconds", methodName, consumers.size(), + _clusterGroup, (System.currentTimeMillis() - startTimeMs)); + } + + // For the given parition to value map, construct a map ofConstruct a map of maps by cluster, where the inner map is keyed by partition. + private PartitionKeyedMapLookupResult getPartitionKeyedMapsByCluster(Map partitionKeyedMap) { + if (partitionKeyedMap == null) { + throw new IllegalArgumentException("partition map cannot be null"); + } + + if (partitionKeyedMap.isEmpty()) { + return new PartitionKeyedMapLookupResult(); + } + + PartitionLookupResult partitionLookupResult; + try { + partitionLookupResult = + _mdsClient.getClustersForTopicPartitions(partitionKeyedMap.keySet(), _clusterGroup, _mdsRequestTimeoutMs); + } catch (MetadataServiceClientException e) { + throw new KafkaException("failed to get clusters for topic partitions " + partitionKeyedMap.keySet() + ": ", e); + } + Map> partitionKeyedMapsByCluster = new HashMap<>(); + for (Map.Entry> entry : partitionLookupResult.getPartitionsByCluster().entrySet()) { + ClusterDescriptor cluster = entry.getKey(); + Set topicPartitions = entry.getValue(); + for (TopicPartition topicPartition : entry.getValue()) { + partitionKeyedMapsByCluster.computeIfAbsent(cluster, k -> new HashMap()) + .put(topicPartition, partitionKeyedMap.get(topicPartition)); + } + } + + return new PartitionKeyedMapLookupResult(partitionKeyedMapsByCluster, partitionLookupResult.getNonexistentTopics()); + } } diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MetadataServiceClient.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MetadataServiceClient.java index 7d77088..62e1c3a 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MetadataServiceClient.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/metadataservice/MetadataServiceClient.java @@ -49,7 +49,7 @@ ClusterDescriptor getClusterForTopic(String topicName, ClusterGroupDescriptor cl * @param topicPartitions The topic partitions * @param clusterGroup The cluster group descriptor * @param timeoutMs Timeout in milliseconds - * @return Location lookup result. + * @return Location lookup result */ PartitionLookupResult getClustersForTopicPartitions(Set topicPartitions, ClusterGroupDescriptor clusterGroup, int timeoutMs) throws MetadataServiceClientException; @@ -60,7 +60,7 @@ PartitionLookupResult getClustersForTopicPartitions(Set topicPar * @param topics The topics * @param clusterGroup The cluster group descriptor * @param timeoutMs Timeout in milliseconds - * @return Location lookup result. + * @return Location lookup result */ TopicLookupResult getClustersForTopics(Set topics, ClusterGroupDescriptor clusterGroup, int timeoutMs) throws MetadataServiceClientException; diff --git a/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImplTest.java b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImplTest.java index be925a1..2042d0d 100644 --- a/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImplTest.java +++ b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaFederatedConsumerImplTest.java @@ -105,6 +105,9 @@ public void testBasicWorkflowWithAssignment() throws MetadataServiceClientExcept Collections.emptySet()); when(_mdsClient.getClustersForTopicPartitions(eq(expectedTopicPartitions), eq(CLUSTER_GROUP), anyInt())) .thenReturn(expectedPartitionLookupResult); + when(_mdsClient.getClusterForTopic(eq(TOPIC1), eq(CLUSTER_GROUP), anyInt())).thenReturn(CLUSTER1); + when(_mdsClient.getClusterForTopic(eq(TOPIC2), eq(CLUSTER_GROUP), anyInt())).thenReturn(CLUSTER2); + when(_mdsClient.getClusterForTopic(eq(TOPIC3), eq(CLUSTER_GROUP), anyInt())).thenReturn(CLUSTER1); _federatedConsumer = new LiKafkaFederatedConsumerImpl<>(_consumerConfig, _mdsClient, new MockConsumerBuilder()); @@ -161,6 +164,15 @@ public void testBasicWorkflowWithAssignment() throws MetadataServiceClientExcept assertEquals("poll should return two records for topic2", new ArrayList<>(Arrays.asList(record2, record3)), pollResult.records(TOPIC_PARTITION2)); assertEquals("poll should return one record for topic3", new ArrayList<>(Arrays.asList(record4)), pollResult.records(TOPIC_PARTITION3)); + assertEquals("There should be no prior offset commit for topic1 partition 0", null, _federatedConsumer.committed(TOPIC_PARTITION1)); + assertEquals("There should be no prior offset commit for topic2 partition 0", null, _federatedConsumer.committed(TOPIC_PARTITION2)); + assertEquals("There should be no prior offset commit for topic3 partition 0", null, _federatedConsumer.committed(TOPIC_PARTITION3)); + + _federatedConsumer.commitSync(); + assertEquals("Commit offset for topic1 partition 0 should be 1", 1, _federatedConsumer.committed(TOPIC_PARTITION1).offset()); + assertEquals("Commit offset for topic2 partition 0 should be 2", 2, _federatedConsumer.committed(TOPIC_PARTITION2).offset()); + assertEquals("Commit offset for topic3 partition 0 should be 1", 1, _federatedConsumer.committed(TOPIC_PARTITION3).offset()); + // Verify per-cluster consumers are not in closed state. assertFalse("Consumer for cluster 1 should have not been closed", consumer1.closed()); assertFalse("Consumer for cluster 2 should have not been closed", consumer2.closed());