From 1c998f8ef3620ed6bf782a01696f61811da2dea2 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Thu, 21 Nov 2024 13:41:29 -0500 Subject: [PATCH] KAFKA-17869: Adding tests to ensure KIP-1076 doesn't interfere with consumer metrics[1/3] (#17781) Adding tests to ensure the KIP-1076 methods don't interfere with existing metrics in clients Reviewers: Apoorv Mittal , Matthias Sax --- checkstyle/import-control.xml | 1 + .../internals/AsyncKafkaConsumer.java | 14 +- .../internals/ClassicKafkaConsumer.java | 14 +- .../clients/consumer/KafkaConsumerTest.java | 123 ++++++++++++++++++ 4 files changed, 140 insertions(+), 12 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index eb4131fbc47f1..bb6f48a73f071 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -225,6 +225,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index da3f3f2f25b45..ba7eed19f11fd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -662,17 +662,19 @@ void setGroupAssignmentSnapshot(final Set partitions) { @Override public void registerMetricForSubscription(KafkaMetric metric) { - if (clientTelemetryReporter.isPresent()) { - ClientTelemetryReporter reporter = clientTelemetryReporter.get(); - reporter.metricChange(metric); + if (!metrics().containsKey(metric.metricName())) { + clientTelemetryReporter.ifPresent(reporter -> reporter.metricChange(metric)); + } else { + log.debug("Skipping registration for metric {}. Existing consumer metrics cannot be overwritten.", metric.metricName()); } } @Override public void unregisterMetricFromSubscription(KafkaMetric metric) { - if (clientTelemetryReporter.isPresent()) { - ClientTelemetryReporter reporter = clientTelemetryReporter.get(); - reporter.metricRemoval(metric); + if (!metrics().containsKey(metric.metricName())) { + clientTelemetryReporter.ifPresent(reporter -> reporter.metricRemoval(metric)); + } else { + log.debug("Skipping unregistration for metric {}. Existing consumer metrics cannot be removed.", metric.metricName()); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 82a9bd2a53bfc..e423c26176349 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -430,17 +430,19 @@ public void subscribe(Collection topics, ConsumerRebalanceListener liste @Override public void registerMetricForSubscription(KafkaMetric metric) { - if (clientTelemetryReporter.isPresent()) { - ClientTelemetryReporter reporter = clientTelemetryReporter.get(); - reporter.metricChange(metric); + if (!metrics().containsKey(metric.metricName())) { + clientTelemetryReporter.ifPresent(reporter -> reporter.metricChange(metric)); + } else { + log.debug("Skipping registration for metric {}. Existing consumer metrics cannot be overwritten.", metric.metricName()); } } @Override public void unregisterMetricFromSubscription(KafkaMetric metric) { - if (clientTelemetryReporter.isPresent()) { - ClientTelemetryReporter reporter = clientTelemetryReporter.get(); - reporter.metricRemoval(metric); + if (!metrics().containsKey(metric.metricName())) { + clientTelemetryReporter.ifPresent(reporter -> reporter.metricRemoval(metric)); + } else { + log.debug("Skipping unregistration for metric {}. Existing consumer metrics cannot be removed.", metric.metricName()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index c260fa48c019b..7d122c2986c0d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -21,6 +21,8 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; @@ -58,6 +60,9 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -88,6 +93,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -96,11 +102,13 @@ import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.TestUtils; +import org.apache.log4j.Level; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.mockito.internal.stubbing.answers.CallsRealMethods; import java.lang.management.ManagementFactory; @@ -145,6 +153,7 @@ import static org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON; import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; import static org.apache.kafka.common.utils.Utils.propsToMap; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -157,8 +166,10 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -224,6 +235,107 @@ public void cleanup() { } } + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testSubscribingCustomMetricsDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + + Map customMetrics = customMetrics(); + customMetrics.forEach((name, metric) -> consumer.registerMetricForSubscription(metric)); + + Map consumerMetrics = consumer.metrics(); + customMetrics.forEach((name, metric) -> assertFalse(consumerMetrics.containsKey(name))); + } + + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testSubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + Class consumerClass = groupProtocol == GroupProtocol.CLASSIC ? ClassicKafkaConsumer.class : AsyncKafkaConsumer.class; + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + appender.setClassLogger(consumerClass, Level.DEBUG); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + KafkaMetric existingMetricToAdd = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue(); + consumer.registerMetricForSubscription(existingMetricToAdd); + final String expectedMessage = String.format("Skipping registration for metric %s. Existing consumer metrics cannot be overwritten.", existingMetricToAdd.metricName()); + assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage))); + } + } + + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testUnsubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + Class consumerClass = groupProtocol == GroupProtocol.CLASSIC ? ClassicKafkaConsumer.class : AsyncKafkaConsumer.class; + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + appender.setClassLogger(consumerClass, Level.DEBUG); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + KafkaMetric existingMetricToRemove = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue(); + consumer.unregisterMetricFromSubscription(existingMetricToRemove); + final String expectedMessage = String.format("Skipping unregistration for metric %s. Existing consumer metrics cannot be removed.", existingMetricToRemove.metricName()); + assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage))); + } + } + + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingConsumerMetric(GroupProtocol groupProtocol) { + try (MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods())) { + ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class); + clientTelemetryReporter.configure(any()); + mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter)); + + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + + KafkaMetric existingMetric = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue(); + consumer.registerMetricForSubscription(existingMetric); + // This test would fail without the check as the exising metric is registered in the consumer on startup + Mockito.verify(clientTelemetryReporter, atMostOnce()).metricChange(existingMetric); + } + } + + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testShouldNotCallMetricReporterMetricRemovalWithExistingConsumerMetric(GroupProtocol groupProtocol) { + try (MockedStatic mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods())) { + ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class); + clientTelemetryReporter.configure(any()); + mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter)); + + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + + KafkaMetric existingMetric = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue(); + consumer.unregisterMetricFromSubscription(existingMetric); + Mockito.verify(clientTelemetryReporter, never()).metricRemoval(existingMetric); + } + } + + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testUnSubscribingNonExisingMetricsDoesntCauseError(GroupProtocol groupProtocol) { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + + Map customMetrics = customMetrics(); + //Metrics never registered but removed should not cause an error + customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> consumer.unregisterMetricFromSubscription(metric))); + } + @ParameterizedTest @EnumSource(GroupProtocol.class) public void testMetricsReporterAutoGeneratedClientId(GroupProtocol groupProtocol) { @@ -3501,6 +3613,17 @@ private boolean requestGenerated(MockClient client, ApiKeys apiKey) { return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey)); } + private Map customMetrics() { + MetricConfig metricConfig = new MetricConfig(); + Object lock = new Object(); + MetricName metricNameOne = new MetricName("metricOne", "stream-metrics", "description for metric one", new HashMap<>()); + MetricName metricNameTwo = new MetricName("metricTwo", "stream-metrics", "description for metric two", new HashMap<>()); + + KafkaMetric streamClientMetricOne = new KafkaMetric(lock, metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM); + KafkaMetric streamClientMetricTwo = new KafkaMetric(lock, metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM); + return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo, streamClientMetricTwo); + } + private static final List CLIENT_IDS = new ArrayList<>(); public static class DeserializerForClientId implements Deserializer { @Override