From b74b182841a340b7b92d852142201cd5c1ee1b85 Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Thu, 6 Jun 2024 09:45:36 +0200 Subject: [PATCH] KAFKA-16786: Remove old assignment strategy usage in new consumer (#16214) Remove usage of the partition.assignment.strategy config in the new consumer. This config is deprecated with the new consumer protocol, so the AsyncKafkaConsumer should not use or validate the property. Reviewers: Lucas Brutschy --- .../internals/AsyncKafkaConsumer.java | 21 +----------------- .../internals/ConsumerDelegateCreator.java | 3 +-- .../clients/consumer/KafkaConsumerTest.java | 4 ++-- .../internals/AsyncKafkaConsumerTest.java | 22 ++++++++++--------- 4 files changed, 16 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 6930cd02955d..248073e8b48d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -26,7 +26,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerInterceptor; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.GroupProtocol; @@ -240,7 +239,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final int defaultApiTimeoutMs; private final boolean autoCommitEnabled; private volatile boolean closed = false; - private final List assignors; private final Optional clientTelemetryReporter; // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates @@ -373,10 +371,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { rebalanceListenerInvoker ); this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext); - this.assignors = ConsumerPartitionAssignor.getAssignorInstances( - config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), - config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) - ); // The FetchCollector is only used on the application thread. this.fetchCollector = fetchCollectorFactory.build(logContext, @@ -424,7 +418,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { ConsumerMetadata metadata, long retryBackoffMs, int defaultApiTimeoutMs, - List assignors, String groupId, boolean autoCommitEnabled) { this.log = logContext.logger(getClass()); @@ -445,7 +438,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.deserializers = deserializers; this.applicationEventHandler = applicationEventHandler; - this.assignors = assignors; this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); this.clientTelemetryReporter = Optional.empty(); this.autoCommitEnabled = autoCommitEnabled; @@ -460,8 +452,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { Deserializer valueDeserializer, KafkaClient client, SubscriptionState subscriptions, - ConsumerMetadata metadata, - List assignors) { + ConsumerMetadata metadata) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); @@ -475,7 +466,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer); - this.assignors = assignors; this.clientTelemetryReporter = Optional.empty(); ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX); @@ -1687,12 +1677,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { } } - private void throwIfNoAssignorsConfigured() { - if (assignors.isEmpty()) - throw new IllegalStateException("Must configure at least one partition assigner class name to " + - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property"); - } - private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) { if (offsetAndMetadata != null) offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); @@ -1780,7 +1764,6 @@ private void subscribeInternal(Pattern pattern, Optional topics, Optional currentTopicPartitions = new HashSet<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java index bd95e06c8644..81c45aba69c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java @@ -91,8 +91,7 @@ public ConsumerDelegate create(LogContext logContext, valueDeserializer, client, subscriptions, - metadata, - assignors + metadata ); else return new LegacyKafkaConsumer<>( diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 850ac2bd8f9f..d34d09cd8a92 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -495,7 +495,7 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) { } @ParameterizedTest - @EnumSource(GroupProtocol.class) + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) { Properties props = new Properties(); props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); @@ -3227,7 +3227,7 @@ public void testUnusedConfigs(GroupProtocol groupProtocol) { } @ParameterizedTest - @EnumSource(GroupProtocol.class) + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") public void testAssignorNameConflict(GroupProtocol groupProtocol) { Map configs = new HashMap<>(); configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 66ee724a0e51..c32d2e5e5c0f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.Metadata.LeaderAndEpoch; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -30,7 +29,6 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; -import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; @@ -205,7 +203,6 @@ private AsyncKafkaConsumer newConsumer( ConsumerInterceptors interceptors, ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, SubscriptionState subscriptions, - List assignors, String groupId, String clientId) { long retryBackoffMs = 100L; @@ -228,7 +225,6 @@ private AsyncKafkaConsumer newConsumer( metadata, retryBackoffMs, defaultApiTimeoutMs, - assignors, groupId, autoCommitEnabled); } @@ -564,7 +560,6 @@ public void testCommitAsyncLeaderEpochUpdate() { new ConsumerInterceptors<>(Collections.emptyList()), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); completeCommitSyncApplicationEventSuccessfully(); @@ -784,7 +779,6 @@ public void testPartitionRevocationOnClose() { mock(ConsumerInterceptors.class), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); @@ -806,7 +800,6 @@ public void testFailedPartitionRevocationOnClose() { new ConsumerInterceptors<>(Collections.emptyList()), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); subscriptions.subscribe(singleton("topic"), Optional.of(listener)); @@ -844,7 +837,6 @@ public void testAutoCommitSyncEnabled() { mock(ConsumerInterceptors.class), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); @@ -862,7 +854,6 @@ public void testAutoCommitSyncDisabled() { mock(ConsumerInterceptors.class), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); @@ -1624,6 +1615,18 @@ public void testGroupRemoteAssignorUsedInConsumerProtocol() { assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } + @Test + public void testPartitionAssignmentStrategyUnusedInAsyncConsumer() { + final Properties props = requiredConsumerConfig(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1"); + props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "CooperativeStickyAssignor"); + final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); + + assertTrue(config.unused().contains(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); + } + @Test public void testGroupIdNull() { final Properties props = requiredConsumerConfig(); @@ -1666,7 +1669,6 @@ public void testEnsurePollEventSentOnConsumerPoll() { new ConsumerInterceptors<>(Collections.emptyList()), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, - singletonList(new RoundRobinAssignor()), "group-id", "client-id"); final TopicPartition tp = new TopicPartition("topic", 0);