From fd9de50de120ec400220e6b960d11f302cd88486 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 20 Nov 2024 16:14:03 -0500 Subject: [PATCH] KAFKA-18041: Update key for storing global consumer instance id for consistency (#17869) This PR updates the key for storing the KIP-714 client instance id for the global consumer to follow a more consistent pattern of the other embedded Kafka Streams consumer clients. Reviewers: Matthias Sax --- docs/streams/upgrade-guide.html | 6 ++++++ .../integration/KafkaStreamsTelemetryIntegrationTest.java | 2 +- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 8d199263adffa..28da54b04b699 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -135,6 +135,12 @@

<

Streams API changes in 4.0.0

+

+ In this release the ClientInstanceIds instance stores the global consumerUuid for the + KIP-714 + id with a key of global stream-thread name appended with "-global-consumer" where before it was only the global stream-thread name. +

+

In this release two configs default.deserialization.exception.handler and default.production.exception.handler are deprecated, as they don't have any overwrites, which is described in KIP-1056 diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index bc6e09b6598f4..18dbd2fa6d8a6 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -160,7 +160,7 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except final Uuid mainConsumerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream() .filter(entry -> !entry.getKey().endsWith("-restore-consumer") - && !entry.getKey().endsWith("GlobalStreamThread")) + && !entry.getKey().endsWith("GlobalStreamThread-global-consumer")) .map(Map.Entry::getValue) .findFirst().orElseThrow(); assertNotNull(adminInstanceId); diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 991b073e2fdb1..79e6af29be7ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1939,7 +1939,7 @@ public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout) // could be `null` if telemetry is disabled on the client itself if (instanceId != null) { clientInstanceIds.addConsumerInstanceId( - globalStreamThread.getName(), + globalStreamThread.getName() + "-global-consumer", instanceId ); } else {