Skip to content

Commit

Permalink
KAFKA-16116: Rebalance Metrics for AsyncKafkaConsumer (apache#15339)
Browse files Browse the repository at this point in the history
Adding the following rebalance metrics to the consumer:

rebalance-latency-avg
rebalance-latency-max
rebalance-latency-total
rebalance-rate-per-hour
rebalance-total
failed-rebalance-rate-per-hour
failed-rebalance-total

Due to the difference in protocol, we need to redefine when rebalance starts and ends.
Start of Rebalance:
Current: Right before sending out JoinGroup
ConsumerGroup: When the client receives assignments from the HB

End of Rebalance - Successful Case:
Current: Receiving SyncGroup request after transitioning to "COMPLETING_REBALANCE"
ConsumerGroup: After completing reconciliation and right before sending out "Ack" heartbeat

End of Rebalance - Failed Case:
Current: Any failure in the JoinGroup/SyncGroup response
ConsumerGroup: Failure in the heartbeat

Note: Afterall, we try to be consistent with the current protocol. Rebalances start and end with sending and receiving network requests. Failures in network requests signify the user failures in rebalance. And it is entirely possible to have multiple failures before having a successful one.

Reviewers: Lucas Brutschy <[email protected]>
  • Loading branch information
philipnee authored Feb 28, 2024
1 parent 5d6936a commit 53c41ac
Show file tree
Hide file tree
Showing 8 changed files with 408 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatRequestState.resetTimer();
membershipManager.onHeartbeatResponseReceived(response.data());
membershipManager.onHeartbeatSuccess(response.data());
maybeSendGroupMetadataUpdateEvent();
return;
}
Expand Down Expand Up @@ -357,6 +357,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,

this.heartbeatState.reset();
this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
membershipManager.onHeartbeatFailure();

switch (error) {
case NOT_COORDINATOR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public interface MembershipManager extends RequestManager {
*
* @param response Heartbeat response to extract member info and errors from.
*/
void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData response);
void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response);

/**
* Notify the member that an error heartbeat response was received.
*/
void onHeartbeatFailure();

/**
* Update state when a heartbeat is sent out. This will transition out of the states that end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
Expand Down Expand Up @@ -263,6 +265,11 @@ public class MembershipManagerImpl implements MembershipManager {
* when the timer is reset, only when it completes releasing its assignment.
*/
private CompletableFuture<Void> staleMemberAssignmentRelease;

/*
* Measures successful rebalance latency and number of failed rebalances.
*/
private final RebalanceMetricsManager metricsManager;

private final Time time;

Expand All @@ -284,7 +291,35 @@ public MembershipManagerImpl(String groupId,
LogContext logContext,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
BackgroundEventHandler backgroundEventHandler,
Time time) {
Time time,
Metrics metrics) {
this(groupId,
groupInstanceId,
rebalanceTimeoutMs,
serverAssignor,
subscriptions,
commitRequestManager,
metadata,
logContext,
clientTelemetryReporter,
backgroundEventHandler,
time,
new RebalanceMetricsManager(metrics));
}

// Visible for testing
MembershipManagerImpl(String groupId,
Optional<String> groupInstanceId,
int rebalanceTimeoutMs,
Optional<String> serverAssignor,
SubscriptionState subscriptions,
CommitRequestManager commitRequestManager,
ConsumerMetadata metadata,
LogContext logContext,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
BackgroundEventHandler backgroundEventHandler,
Time time,
RebalanceMetricsManager metricsManager) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.serverAssignor = serverAssignor;
Expand All @@ -301,6 +336,7 @@ public MembershipManagerImpl(String groupId,
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.backgroundEventHandler = backgroundEventHandler;
this.time = time;
this.metricsManager = metricsManager;
}

/**
Expand All @@ -314,10 +350,27 @@ private void transitionTo(MemberState nextState) {
throw new IllegalStateException(String.format("Invalid state transition from %s to %s",
state, nextState));
}

if (isCompletingRebalance(state, nextState)) {
metricsManager.recordRebalanceEnded(time.milliseconds());
}
if (isStartingRebalance(state, nextState)) {
metricsManager.recordRebalanceStarted(time.milliseconds());
}

log.trace("Member {} with epoch {} transitioned from {} to {}.", memberId, memberEpoch, state, nextState);
this.state = nextState;
}

private static boolean isCompletingRebalance(MemberState currentState, MemberState nextState) {
return currentState == MemberState.RECONCILING &&
(nextState == MemberState.STABLE || nextState == MemberState.ACKNOWLEDGING);
}

private static boolean isStartingRebalance(MemberState currentState, MemberState nextState) {
return currentState != MemberState.RECONCILING && nextState == MemberState.RECONCILING;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -354,7 +407,7 @@ public int memberEpoch() {
* {@inheritDoc}
*/
@Override
public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData response) {
public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response) {
if (response.errorCode() != Errors.NONE.code()) {
String errorMessage = String.format(
"Unexpected error in Heartbeat response. Expected no error, but received: %s",
Expand Down Expand Up @@ -403,6 +456,11 @@ public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData respo
}
}

@Override
public void onHeartbeatFailure() {
metricsManager.maybeRecordRebalanceFailed();
}

/**
* @return True if the consumer is not a member of the group.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ protected RequestManagers create() {
logContext,
clientTelemetryReporter,
backgroundEventHandler,
time);
time,
metrics);
membershipManager.registerStateListener(commit);
heartbeatRequestManager = new HeartbeatRequestManager(
logContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.metrics;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;

import java.util.concurrent.TimeUnit;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;

public class RebalanceMetricsManager {
private final Sensor successfulRebalanceSensor;
private final Sensor failedRebalanceSensor;
private final String metricGroupName;

public final MetricName rebalanceLatencyAvg;
public final MetricName rebalanceLatencyMax;
public final MetricName rebalanceLatencyTotal;
public final MetricName rebalanceTotal;
public final MetricName rebalanceRatePerHour;
public final MetricName lastRebalanceSecondsAgo;
public final MetricName failedRebalanceTotal;
public final MetricName failedRebalanceRate;
private long lastRebalanceEndMs = -1L;
private long lastRebalanceStartMs = -1L;

public RebalanceMetricsManager(Metrics metrics) {
metricGroupName = CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX;

rebalanceLatencyAvg = createMetric(metrics, "rebalance-latency-avg",
"The average time taken for a group to complete a rebalance");
rebalanceLatencyMax = createMetric(metrics, "rebalance-latency-max",
"The max time taken for a group to complete a rebalance");
rebalanceLatencyTotal = createMetric(metrics, "rebalance-latency-total",
"The total number of milliseconds spent in rebalances");
rebalanceTotal = createMetric(metrics, "rebalance-total",
"The total number of rebalance events");
rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour",
"The number of rebalance events per hour");
failedRebalanceTotal = createMetric(metrics, "failed-rebalance-total",
"The total number of failed rebalance events");
failedRebalanceRate = createMetric(metrics, "failed-rebalance-rate-per-hour",
"The number of failed rebalance events per hour");

successfulRebalanceSensor = metrics.sensor("rebalance-latency");
successfulRebalanceSensor.add(rebalanceLatencyAvg, new Avg());
successfulRebalanceSensor.add(rebalanceLatencyMax, new Max());
successfulRebalanceSensor.add(rebalanceLatencyTotal, new CumulativeSum());
successfulRebalanceSensor.add(rebalanceTotal, new CumulativeCount());
successfulRebalanceSensor.add(rebalanceRatePerHour, new Rate(TimeUnit.HOURS, new WindowedCount()));

failedRebalanceSensor = metrics.sensor("failed-rebalance");
failedRebalanceSensor.add(failedRebalanceTotal, new CumulativeSum());
failedRebalanceSensor.add(failedRebalanceRate, new Rate(TimeUnit.HOURS, new WindowedCount()));

Measurable lastRebalance = (config, now) -> {
if (lastRebalanceEndMs == -1L)
return -1d;
else
return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs, TimeUnit.MILLISECONDS);
};
lastRebalanceSecondsAgo = createMetric(metrics,
"last-rebalance-seconds-ago",
"The number of seconds since the last rebalance event");
metrics.addMetric(lastRebalanceSecondsAgo, lastRebalance);
}

private MetricName createMetric(Metrics metrics, String name, String description) {
return metrics.metricName(name, metricGroupName, description);
}

public void recordRebalanceStarted(long nowMs) {
lastRebalanceStartMs = nowMs;
}

public void recordRebalanceEnded(long nowMs) {
lastRebalanceEndMs = nowMs;
successfulRebalanceSensor.record(nowMs - lastRebalanceStartMs);
}

public void maybeRecordRebalanceFailed() {
if (lastRebalanceStartMs <= lastRebalanceEndMs)
return;
failedRebalanceSensor.record();
}

public boolean rebalanceStarted() {
return lastRebalanceStartMs > lastRebalanceEndMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.MetadataResponse;
Expand Down Expand Up @@ -196,18 +197,19 @@ public ConsumerTestBuilder(Optional<GroupInformation> groupInfo, boolean enableA
gi.groupInstanceId,
metrics));
MembershipManager mm = spy(
new MembershipManagerImpl(
gi.groupId,
gi.groupInstanceId,
groupRebalanceConfig.rebalanceTimeoutMs,
gi.serverAssignor,
subscriptions,
commit,
metadata,
logContext,
Optional.empty(),
backgroundEventHandler,
time
new MembershipManagerImpl(
gi.groupId,
gi.groupInstanceId,
groupRebalanceConfig.rebalanceTimeoutMs,
gi.serverAssignor,
subscriptions,
commit,
metadata,
logContext,
Optional.empty(),
backgroundEventHandler,
time,
mock(RebalanceMetricsManager.class)
)
);
HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) {
new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(memberEpoch));
membershipManager.onHeartbeatResponseReceived(result.data());
membershipManager.onHeartbeatSuccess(result.data());

// Create a ConsumerHeartbeatRequest and verify the payload
NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds());
Expand Down Expand Up @@ -441,7 +441,7 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
switch (error) {
case NONE:
verify(backgroundEventHandler).add(any(GroupMetadataUpdateEvent.class));
verify(membershipManager, times(2)).onHeartbeatResponseReceived(mockResponse.data());
verify(membershipManager, times(2)).onHeartbeatSuccess(mockResponse.data());
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
break;

Expand Down Expand Up @@ -547,7 +547,7 @@ public void testHeartbeatState() {
.setMemberEpoch(1)
.setAssignment(assignmentTopic1));
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, "topic1"));
membershipManager.onHeartbeatResponseReceived(rs1.data());
membershipManager.onHeartbeatSuccess(rs1.data());

// We remain in RECONCILING state, as the assignment will be reconciled on the next poll
assertEquals(MemberState.RECONCILING, membershipManager.state());
Expand Down Expand Up @@ -712,7 +712,7 @@ private void mockStableMember() {
.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
.setMemberId(memberId)
.setMemberEpoch(memberEpoch));
membershipManager.onHeartbeatResponseReceived(rs1.data());
membershipManager.onHeartbeatSuccess(rs1.data());
assertEquals(MemberState.STABLE, membershipManager.state());
}

Expand Down
Loading

0 comments on commit 53c41ac

Please sign in to comment.