Skip to content

Commit

Permalink
KAFKA-17869: Adding tests to ensure KIP-1076 doesn't interfere with c…
Browse files Browse the repository at this point in the history
…onsumer metrics[1/3] (apache#17781)

Adding tests to ensure the KIP-1076 methods don't interfere with existing metrics in clients

Reviewers: Apoorv Mittal <[email protected]>, Matthias Sax <[email protected]>
  • Loading branch information
bbejeck authored Nov 21, 2024
1 parent f5781d5 commit 1c998f8
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 12 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
<allow pkg="org.apache.kafka.test" />
<allow class="org.apache.log4j.Level" />

<subpackage name="consumer">
<allow pkg="org.apache.kafka.clients.consumer" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,17 +662,19 @@ void setGroupAssignmentSnapshot(final Set<TopicPartition> partitions) {

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
if (!metrics().containsKey(metric.metricName())) {
clientTelemetryReporter.ifPresent(reporter -> reporter.metricChange(metric));
} else {
log.debug("Skipping registration for metric {}. Existing consumer metrics cannot be overwritten.", metric.metricName());
}
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
if (!metrics().containsKey(metric.metricName())) {
clientTelemetryReporter.ifPresent(reporter -> reporter.metricRemoval(metric));
} else {
log.debug("Skipping unregistration for metric {}. Existing consumer metrics cannot be removed.", metric.metricName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,17 +430,19 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
if (!metrics().containsKey(metric.metricName())) {
clientTelemetryReporter.ifPresent(reporter -> reporter.metricChange(metric));
} else {
log.debug("Skipping registration for metric {}. Existing consumer metrics cannot be overwritten.", metric.metricName());
}
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
if (!metrics().containsKey(metric.metricName())) {
clientTelemetryReporter.ifPresent(reporter -> reporter.metricRemoval(metric));
} else {
log.debug("Skipping unregistration for metric {}. Existing consumer metrics cannot be removed.", metric.metricName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
Expand Down Expand Up @@ -58,6 +60,9 @@
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
Expand Down Expand Up @@ -88,6 +93,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
Expand All @@ -96,11 +102,13 @@
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;

import org.apache.log4j.Level;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.CallsRealMethods;

import java.lang.management.ManagementFactory;
Expand Down Expand Up @@ -145,6 +153,7 @@
import static org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON;
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.apache.kafka.common.utils.Utils.propsToMap;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand All @@ -157,8 +166,10 @@
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -224,6 +235,107 @@ public void cleanup() {
}
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testSubscribingCustomMetricsDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());

Map<MetricName, KafkaMetric> customMetrics = customMetrics();
customMetrics.forEach((name, metric) -> consumer.registerMetricForSubscription(metric));

Map<MetricName, ? extends Metric> consumerMetrics = consumer.metrics();
customMetrics.forEach((name, metric) -> assertFalse(consumerMetrics.containsKey(name)));
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testSubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
Class<?> consumerClass = groupProtocol == GroupProtocol.CLASSIC ? ClassicKafkaConsumer.class : AsyncKafkaConsumer.class;
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
appender.setClassLogger(consumerClass, Level.DEBUG);
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
KafkaMetric existingMetricToAdd = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue();
consumer.registerMetricForSubscription(existingMetricToAdd);
final String expectedMessage = String.format("Skipping registration for metric %s. Existing consumer metrics cannot be overwritten.", existingMetricToAdd.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
}
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testUnsubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
Class<?> consumerClass = groupProtocol == GroupProtocol.CLASSIC ? ClassicKafkaConsumer.class : AsyncKafkaConsumer.class;
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
appender.setClassLogger(consumerClass, Level.DEBUG);
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
KafkaMetric existingMetricToRemove = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue();
consumer.unregisterMetricFromSubscription(existingMetricToRemove);
final String expectedMessage = String.format("Skipping unregistration for metric %s. Existing consumer metrics cannot be removed.", existingMetricToRemove.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
}
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingConsumerMetric(GroupProtocol groupProtocol) {
try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
clientTelemetryReporter.configure(any());
mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));

Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());

KafkaMetric existingMetric = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue();
consumer.registerMetricForSubscription(existingMetric);
// This test would fail without the check as the exising metric is registered in the consumer on startup
Mockito.verify(clientTelemetryReporter, atMostOnce()).metricChange(existingMetric);
}
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testShouldNotCallMetricReporterMetricRemovalWithExistingConsumerMetric(GroupProtocol groupProtocol) {
try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs = mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
ClientTelemetryReporter clientTelemetryReporter = mock(ClientTelemetryReporter.class);
clientTelemetryReporter.configure(any());
mockedCommonClientConfigs.when(() -> CommonClientConfigs.telemetryReporter(anyString(), any())).thenReturn(Optional.of(clientTelemetryReporter));

Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());

KafkaMetric existingMetric = (KafkaMetric) consumer.metrics().entrySet().iterator().next().getValue();
consumer.unregisterMetricFromSubscription(existingMetric);
Mockito.verify(clientTelemetryReporter, never()).metricRemoval(existingMetric);
}
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testUnSubscribingNonExisingMetricsDoesntCauseError(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());

Map<MetricName, KafkaMetric> customMetrics = customMetrics();
//Metrics never registered but removed should not cause an error
customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> consumer.unregisterMetricFromSubscription(metric)));
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testMetricsReporterAutoGeneratedClientId(GroupProtocol groupProtocol) {
Expand Down Expand Up @@ -3501,6 +3613,17 @@ private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey));
}

private Map<MetricName, KafkaMetric> customMetrics() {
MetricConfig metricConfig = new MetricConfig();
Object lock = new Object();
MetricName metricNameOne = new MetricName("metricOne", "stream-metrics", "description for metric one", new HashMap<>());
MetricName metricNameTwo = new MetricName("metricTwo", "stream-metrics", "description for metric two", new HashMap<>());

KafkaMetric streamClientMetricOne = new KafkaMetric(lock, metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
KafkaMetric streamClientMetricTwo = new KafkaMetric(lock, metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo, streamClientMetricTwo);
}

private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements Deserializer<byte[]> {
@Override
Expand Down

0 comments on commit 1c998f8

Please sign in to comment.