diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index eb4131fbc47f1..bb6f48a73f071 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -225,6 +225,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index da3f3f2f25b45..ba7eed19f11fd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -662,17 +662,19 @@ void setGroupAssignmentSnapshot(final Set partitions) {
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricChange(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter -> reporter.metricChange(metric));
+ } else {
+ log.debug("Skipping registration for metric {}. Existing consumer metrics cannot be overwritten.", metric.metricName());
}
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricRemoval(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter -> reporter.metricRemoval(metric));
+ } else {
+ log.debug("Skipping unregistration for metric {}. Existing consumer metrics cannot be removed.", metric.metricName());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 82a9bd2a53bfc..e423c26176349 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -430,17 +430,19 @@ public void subscribe(Collection topics, ConsumerRebalanceListener liste
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricChange(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter -> reporter.metricChange(metric));
+ } else {
+ log.debug("Skipping registration for metric {}. Existing consumer metrics cannot be overwritten.", metric.metricName());
}
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricRemoval(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter -> reporter.metricRemoval(metric));
+ } else {
+ log.debug("Skipping unregistration for metric {}. Existing consumer metrics cannot be removed.", metric.metricName());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c260fa48c019b..7d122c2986c0d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -21,6 +21,8 @@
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
@@ -58,6 +60,9 @@
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -88,6 +93,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -96,11 +102,13 @@
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
+import org.apache.log4j.Level;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import java.lang.management.ManagementFactory;
@@ -145,6 +153,7 @@
import static org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON;
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.apache.kafka.common.utils.Utils.propsToMap;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -157,8 +166,10 @@
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -224,6 +235,107 @@ public void cleanup() {
}
}
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSubscribingCustomMetricsDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+
+ Map customMetrics = customMetrics();
+ customMetrics.forEach((name, metric) -> consumer.registerMetricForSubscription(metric));
+
+ Map consumerMetrics = consumer.metrics();
+ customMetrics.forEach((name, metric) -> assertFalse(consumerMetrics.containsKey(name)));
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ Class> consumerClass = groupProtocol == GroupProtocol.CLASSIC ? ClassicKafkaConsumer.class : AsyncKafkaConsumer.class;
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(consumerClass, Level.DEBUG);
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+ KafkaMetric existingMetricToAdd = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue();
+ consumer.registerMetricForSubscription(existingMetricToAdd);
+ final String expectedMessage = String.format("Skipping registration for metric %s. Existing consumer metrics cannot be overwritten.", existingMetricToAdd.metricName());
+ assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testUnsubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ Class> consumerClass = groupProtocol == GroupProtocol.CLASSIC ? ClassicKafkaConsumer.class : AsyncKafkaConsumer.class;
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(consumerClass, Level.DEBUG);
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+ KafkaMetric existingMetricToRemove = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue();
+ consumer.unregisterMetricFromSubscription(existingMetricToRemove);
+ final String expectedMessage = String.format("Skipping unregistration for metric %s. Existing consumer metrics cannot be removed.", existingMetricToRemove.metricName());
+ assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingConsumerMetric(GroupProtocol groupProtocol) {
+ try (MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+ ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
+ clientTelemetryReporter.configure(any());
+ mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+
+ KafkaMetric existingMetric = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue();
+ consumer.registerMetricForSubscription(existingMetric);
+ // This test would fail without the check as the exising metric is registered in the consumer on startup
+ Mockito.verify(clientTelemetryReporter, atMostOnce()).metricChange(existingMetric);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testShouldNotCallMetricReporterMetricRemovalWithExistingConsumerMetric(GroupProtocol groupProtocol) {
+ try (MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+ ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
+ clientTelemetryReporter.configure(any());
+ mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+
+ KafkaMetric existingMetric = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue();
+ consumer.unregisterMetricFromSubscription(existingMetric);
+ Mockito.verify(clientTelemetryReporter, never()).metricRemoval(existingMetric);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testUnSubscribingNonExisingMetricsDoesntCauseError(GroupProtocol groupProtocol) {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+
+ Map customMetrics = customMetrics();
+ //Metrics never registered but removed should not cause an error
+ customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> consumer.unregisterMetricFromSubscription(metric)));
+ }
+
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testMetricsReporterAutoGeneratedClientId(GroupProtocol groupProtocol) {
@@ -3501,6 +3613,17 @@ private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey));
}
+ private Map customMetrics() {
+ MetricConfig metricConfig = new MetricConfig();
+ Object lock = new Object();
+ MetricName metricNameOne = new MetricName("metricOne", "stream-metrics", "description for metric one", new HashMap<>());
+ MetricName metricNameTwo = new MetricName("metricTwo", "stream-metrics", "description for metric two", new HashMap<>());
+
+ KafkaMetric streamClientMetricOne = new KafkaMetric(lock, metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
+ KafkaMetric streamClientMetricTwo = new KafkaMetric(lock, metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
+ return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo, streamClientMetricTwo);
+ }
+
private static final List CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements Deserializer {
@Override