Skip to content

Commit

Permalink
KAFKA-15277: Design & implement support for internal Consumer delegat…
Browse files Browse the repository at this point in the history
…es (apache#14670)

The consumer refactoring project introduced another `Consumer` implementation, creating two different, coexisting implementations of the `Consumer` interface:

* `KafkaConsumer` (AKA "existing", "legacy" consumer)
* `PrototypeAsyncConsumer` (AKA "new", "refactored" consumer)

The goal of this task is to refactor the code via the delegation pattern so that we can keep a top-level `KafkaConsumer` but then delegate to another implementation under the covers. There will be two delegates at first:

* `LegacyKafkaConsumer`
* `AsyncKafkaConsumer`

`LegacyKafkaConsumer` is essentially a renamed `KafkaConsumer`. That implementation handles the existing group protocol. `AsyncKafkaConsumer` is renamed from `PrototypeAsyncConsumer` and will implement the new consumer group protocol from KIP-848. Both of those implementations will live in the `internals` sub-package to discourage their use.

This task is part of the work to implement support for the new KIP-848 consumer group protocol.

Reviewers: Philip Nee <[email protected]>, Andrew Schofield <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
kirktrue authored Nov 15, 2023
1 parent a64037c commit 22f7ffe
Show file tree
Hide file tree
Showing 18 changed files with 2,434 additions and 1,495 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
CommonClientConfigs.warnDisablingExponentialBackoff(this);
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideClientId(refinedConfigs);
maybeOverrideEnableAutoCommit(refinedConfigs);
return refinedConfigs;
}

Expand Down Expand Up @@ -695,17 +696,17 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
return newConfigs;
}

boolean maybeOverrideEnableAutoCommit() {
private void maybeOverrideEnableAutoCommit(Map<String, Object> configs) {
Optional<String> groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
boolean enableAutoCommit = getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
Map<String, Object> originals = originals();
boolean enableAutoCommit = originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG) ? getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false;
if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided
if (!originals().containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
enableAutoCommit = false;
if (!originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
configs.put(ENABLE_AUTO_COMMIT_CONFIG, false);
} else if (enableAutoCommit) {
throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
throw new InvalidConfigurationException(ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
}
}
return enableAutoCommit;
}

public ConsumerConfig(Properties props) {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
import static org.apache.kafka.common.protocol.Errors.COORDINATOR_LOAD_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
Expand All @@ -64,8 +65,6 @@

public class CommitRequestManager implements RequestManager {

// TODO: current in ConsumerConfig but inaccessible in the internal package.
private static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
private final SubscriptionState subscriptions;
private final LogContext logContext;
private final Logger log;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Timer;

/**
* This extension interface provides a handful of methods to expose internals of the {@link Consumer} for
* various tests.
*
* <p/>
*
* <em>Note</em>: this is for internal use only and is not intended for use by end users. Internal users should
* not attempt to determine the underlying implementation to avoid coding to an unstable interface. Rather, it is
* the {@link Consumer} API contract that should serve as the caller's interface.
*/
public interface ConsumerDelegate<K, V> extends Consumer<K, V> {

String clientId();

Metrics metricsRegistry();

KafkaConsumerMetrics kafkaConsumerMetrics();

boolean updateAssignmentMetadataIfNeeded(final Timer timer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;

import java.util.List;
import java.util.Locale;

/**
* {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the
* underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer}
* can remain the top-level facade for implementations, but allow different implementations to co-exist under
* the covers.
*
* <p/>
*
* The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if
* it is using the new consumer group protocol (KIP-848) or if it should fall back to the existing, legacy group
* protocol. This is based on the presence and value of the {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG group.protocol}
* configuration. If the value is present and equal to &quot;{@code consumer}&quot;, the {@link AsyncKafkaConsumer}
* will be returned. Otherwise, the {@link LegacyKafkaConsumer} will be returned.
*
*
* <p/>
*
* <em>Note</em>: this is for internal use only and is not intended for use by end users. Internal users should
* not attempt to determine the underlying implementation to avoid coding to an unstable interface. Rather, it is
* the {@link Consumer} API contract that should serve as the caller's interface.
*/
public class ConsumerDelegateCreator {

public <K, V> ConsumerDelegate<K, V> create(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
try {
GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));

if (groupProtocol == GroupProtocol.CONSUMER)
return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
else
return new LegacyKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
} catch (KafkaException e) {
throw e;
} catch (Throwable t) {
throw new KafkaException("Failed to construct Kafka consumer", t);
}
}

public <K, V> ConsumerDelegate<K, V> create(LogContext logContext,
Time time,
ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
KafkaClient client,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
List<ConsumerPartitionAssignor> assignors) {
try {
GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));

if (groupProtocol == GroupProtocol.CONSUMER)
return new AsyncKafkaConsumer<>(
logContext,
time,
config,
keyDeserializer,
valueDeserializer,
client,
subscriptions,
metadata,
assignors
);
else
return new LegacyKafkaConsumer<>(
logContext,
time,
config,
keyDeserializer,
valueDeserializer,
client,
subscriptions,
metadata,
assignors
);
} catch (KafkaException e) {
throw e;
} catch (Throwable t) {
throw new KafkaException("Failed to construct Kafka consumer", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,10 @@ private void closeInternal(final Duration timeout) {
closeTimeout = timeout;
wakeup();

if (timeoutMs > 0) {
try {
join(timeoutMs);
} catch (InterruptedException e) {
log.error("Interrupted while waiting for consumer network thread to complete", e);
}
try {
join();
} catch (InterruptedException e) {
log.error("Interrupted while waiting for consumer network thread to complete", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@

public final class ConsumerUtils {

/**
* This configuration has only package-level visibility in {@link ConsumerConfig}, so it's inaccessible in the
* internals package where most of its uses live. Attempts were made to move things around, but it was deemed
* better to leave it as is.
*/
static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
public static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000;
public static final String CONSUMER_JMX_PREFIX = "kafka.consumer";
public static final String CONSUMER_METRIC_GROUP_PREFIX = "consumer";
Expand Down
Loading

0 comments on commit 22f7ffe

Please sign in to comment.