Skip to content

Commit

Permalink
KAFKA-18041: Update key for storing global consumer instance id for c…
Browse files Browse the repository at this point in the history
…onsistency (apache#17869)

This PR updates the key for storing the KIP-714 client instance id for the global consumer to follow a more consistent pattern of the other embedded Kafka Streams consumer clients.

Reviewers: Matthias Sax <[email protected]>
  • Loading branch information
bbejeck authored Nov 20, 2024
1 parent aa7a3db commit fd9de50
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
6 changes: 6 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ <h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"><

<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>

<p>
In this release the <code>ClientInstanceIds</code> instance stores the global consumer<code>Uuid</code> for the
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientidentificationandtheclientinstanceid">KIP-714</a>
id with a key of global stream-thread name appended with <code>"-global-consumer"</code> where before it was only the global stream-thread name.
</p>

<p>
In this release two configs <code>default.deserialization.exception.handler</code> and <code>default.production.exception.handler</code> are deprecated, as they don't have any overwrites, which is described in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig">KIP-1056</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except

final Uuid mainConsumerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream()
.filter(entry -> !entry.getKey().endsWith("-restore-consumer")
&& !entry.getKey().endsWith("GlobalStreamThread"))
&& !entry.getKey().endsWith("GlobalStreamThread-global-consumer"))
.map(Map.Entry::getValue)
.findFirst().orElseThrow();
assertNotNull(adminInstanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1939,7 +1939,7 @@ public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout)
// could be `null` if telemetry is disabled on the client itself
if (instanceId != null) {
clientInstanceIds.addConsumerInstanceId(
globalStreamThread.getName(),
globalStreamThread.getName() + "-global-consumer",
instanceId
);
} else {
Expand Down

0 comments on commit fd9de50

Please sign in to comment.