Skip to content

Commit

Permalink
KAFKA-15196 Additional ZK migration metrics (apache#14028)
Browse files Browse the repository at this point in the history
This patch adds several metrics defined in KIP-866:

* MigratingZkBrokerCount: the number of zk brokers registered with KRaft
* ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK
* ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK
* Adds value 4 for "ZK" to ZkMigrationState

Also fixes a typo in the metric name introduced in apache#14009 (ZKWriteBehindLag -> ZkWriteBehindLag)

Reviewers: Luke Chen <[email protected]>, Colin P. McCabe <[email protected]>
  • Loading branch information
mumrah authored Jul 26, 2023
1 parent 6d81698 commit a900794
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 25 deletions.
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import kafka.api._
import kafka.common._
import kafka.cluster.Broker
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback}
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback, ZkMigrationStateMetricName}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
Expand All @@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.migration.ZkMigrationState
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
Expand Down Expand Up @@ -81,9 +82,11 @@ object KafkaController extends Logging {
private val ReplicasIneligibleToDeleteCountMetricName = "ReplicasIneligibleToDeleteCount"
private val ActiveBrokerCountMetricName = "ActiveBrokerCount"
private val FencedBrokerCountMetricName = "FencedBrokerCount"
private val ZkMigrationStateMetricName = "ZkMigrationState"

// package private for testing
private[controller] val MetricNames = Set(
ZkMigrationStateMetricName,
ActiveControllerCountMetricName,
OfflinePartitionsCountMetricName,
PreferredReplicaImbalanceCountMetricName,
Expand Down Expand Up @@ -172,6 +175,7 @@ class KafkaController(val config: KafkaConfig,
/* single-thread scheduler to clean expired tokens */
private val tokenCleanScheduler = new KafkaScheduler(1, true, "delegation-token-cleaner")

metricsGroup.newGauge(ZkMigrationStateMetricName, () => ZkMigrationState.ZK)
metricsGroup.newGauge(ActiveControllerCountMetricName, () => if (isActive) 1 else 0)
metricsGroup.newGauge(OfflinePartitionsCountMetricName, () => offlinePartitionCount)
metricsGroup.newGauge(PreferredReplicaImbalanceCountMetricName, () => preferredReplicaImbalanceCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,8 @@ public static List<ApiMessageAndVersion> generateActivationRecords(
"has been completed.");
}
break;
default:
throw new IllegalStateException("Unsupported ZkMigrationState " + featureControl.zkMigrationState());
}
} else {
if (zkMigrationEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"KafkaController", "FencedBrokerCount");
private final static MetricName ACTIVE_BROKER_COUNT = getMetricName(
"KafkaController", "ActiveBrokerCount");
private final static MetricName MIGRATING_ZK_BROKER_COUNT = getMetricName(
"KafkaController", "MigratingZkBrokerCount");
private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
Expand All @@ -55,6 +57,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
private final AtomicInteger migratingZkBrokerCount = new AtomicInteger(0);
private final AtomicInteger globalTopicCount = new AtomicInteger(0);
private final AtomicInteger globalPartitionCount = new AtomicInteger(0);
private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
Expand All @@ -65,7 +68,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
/**
* Create a new ControllerMetadataMetrics object.
*
* @param registry The metrics registry, or Optional.empty if this is a test and we don't have one.
* @param registry The metrics registry, or Optional.empty if this is a test and we don't have one.
*/
public ControllerMetadataMetrics(Optional<MetricsRegistry> registry) {
this.registry = registry;
Expand Down Expand Up @@ -117,6 +120,14 @@ public Integer value() {
return (int) zkMigrationState();
}
}));

registry.ifPresent(r -> r.newGauge(MIGRATING_ZK_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return migratingZkBrokerCount();
}
}));

}

public void setFencedBrokerCount(int brokerCount) {
Expand All @@ -143,6 +154,18 @@ public int activeBrokerCount() {
return this.activeBrokerCount.get();
}

public void setMigratingZkBrokerCount(int brokerCount) {
this.migratingZkBrokerCount.set(brokerCount);
}

public void addToMigratingZkBrokerCount(int brokerCountDelta) {
this.migratingZkBrokerCount.addAndGet(brokerCountDelta);
}

public int migratingZkBrokerCount() {
return this.migratingZkBrokerCount.get();
}

public void setGlobalTopicCount(int topicCount) {
this.globalTopicCount.set(topicCount);
}
Expand Down Expand Up @@ -212,6 +235,7 @@ public void close() {
registry.ifPresent(r -> Arrays.asList(
FENCED_BROKER_COUNT,
ACTIVE_BROKER_COUNT,
MIGRATING_ZK_BROKER_COUNT,
GLOBAL_TOPIC_COUNT,
GLOBAL_PARTITION_COUNT,
OFFLINE_PARTITION_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,21 @@ private void publishSnapshot(MetadataImage newImage) {
metrics.setGlobalTopicCount(newImage.topics().topicsById().size());
int fencedBrokers = 0;
int activeBrokers = 0;
int zkBrokers = 0;
for (BrokerRegistration broker : newImage.cluster().brokers().values()) {
if (broker.fenced()) {
fencedBrokers++;
} else {
activeBrokers++;
}
if (broker.isMigratingZkBroker()) {
zkBrokers++;
}
}
metrics.setFencedBrokerCount(fencedBrokers);
metrics.setActiveBrokerCount(activeBrokers);
metrics.setMigratingZkBrokerCount(zkBrokers);

int totalPartitions = 0;
int offlinePartitions = 0;
int partitionsWithoutPreferredLeader = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ static int delta(boolean prev, boolean next) {

private int fencedBrokersChange = 0;
private int activeBrokersChange = 0;
private int migratingZkBrokersChange = 0;
private int globalTopicsChange = 0;
private int globalPartitionsChange = 0;
private int offlinePartitionsChange = 0;
Expand All @@ -56,6 +57,10 @@ public int activeBrokersChange() {
return activeBrokersChange;
}

public int migratingZkBrokersChange() {
return migratingZkBrokersChange;
}

public int globalTopicsChange() {
return globalTopicsChange;
}
Expand All @@ -75,18 +80,23 @@ public int partitionsWithoutPreferredLeaderChange() {
void handleBrokerChange(BrokerRegistration prev, BrokerRegistration next) {
boolean wasFenced = false;
boolean wasActive = false;
boolean wasZk = false;
if (prev != null) {
wasFenced = prev.fenced();
wasActive = !prev.fenced();
wasZk = prev.isMigratingZkBroker();
}
boolean isFenced = false;
boolean isActive = false;
boolean isZk = false;
if (next != null) {
isFenced = next.fenced();
isActive = !next.fenced();
isZk = next.isMigratingZkBroker();
}
fencedBrokersChange += delta(wasFenced, isFenced);
activeBrokersChange += delta(wasActive, isActive);
migratingZkBrokersChange += delta(wasZk, isZk);
}

void handleDeletedTopic(TopicImage deletedTopic) {
Expand Down Expand Up @@ -141,6 +151,9 @@ void apply(ControllerMetadataMetrics metrics) {
if (activeBrokersChange != 0) {
metrics.addToActiveBrokerCount(activeBrokersChange);
}
if (migratingZkBrokersChange != 0) {
metrics.addToMigratingZkBrokerCount(migratingZkBrokersChange);
}
if (globalTopicsChange != 0) {
metrics.addToGlobalTopicCount(globalTopicsChange);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName(
"KafkaController", "ZKWriteBehindLag");
"KafkaController", "ZkWriteBehindLag");
private final static MetricName ZK_WRITE_SNAPSHOT_TIME_MS = getMetricName(
"KafkaController", "ZkWriteSnapshotTimeMs");
private final static MetricName ZK_WRITE_DELTA_TIME_MS = getMetricName(
"KafkaController", "ZkWriteDeltaTimeMs");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
Expand All @@ -71,6 +75,9 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong dualWriteOffset = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final Consumer<Long> zkWriteSnapshotTimeHandler;
private final Consumer<Long> zkWriteDeltaTimeHandler;

private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
private final AtomicLong operationsTimedOut = new AtomicLong(0);
Expand All @@ -88,7 +95,7 @@ private Consumer<Long> newHistogram(MetricName name, boolean biased) {
public QuorumControllerMetrics(
Optional<MetricsRegistry> registry,
Time time,
boolean zkMigrationState
boolean zkMigrationEnabled
) {
this.registry = registry;
this.active = false;
Expand Down Expand Up @@ -148,7 +155,8 @@ public Long value() {
return newActiveControllers();
}
}));
if (zkMigrationState) {

if (zkMigrationEnabled) {
registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge<Long>() {
@Override
public Long value() {
Expand All @@ -158,6 +166,11 @@ public Long value() {
else return lastCommittedRecordOffset() - dualWriteOffset();
}
}));
this.zkWriteSnapshotTimeHandler = newHistogram(ZK_WRITE_SNAPSHOT_TIME_MS, true);
this.zkWriteDeltaTimeHandler = newHistogram(ZK_WRITE_DELTA_TIME_MS, true);
} else {
this.zkWriteSnapshotTimeHandler = __ -> { };
this.zkWriteDeltaTimeHandler = __ -> { };
}
}

Expand All @@ -177,6 +190,14 @@ public void updateEventQueueProcessingTime(long durationMs) {
eventQueueProcessingTimeUpdater.accept(durationMs);
}

public void updateZkWriteSnapshotTimeMs(long durationMs) {
zkWriteSnapshotTimeHandler.accept(durationMs);
}

public void updateZkWriteDeltaTimeMs(long durationMs) {
zkWriteDeltaTimeHandler.accept(durationMs);
}

public void setLastAppliedRecordOffset(long offset) {
lastAppliedRecordOffset.set(offset);
}
Expand Down Expand Up @@ -255,7 +276,9 @@ public void close() {
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
NEW_ACTIVE_CONTROLLERS_COUNT,
ZK_WRITE_BEHIND_LAG
ZK_WRITE_BEHIND_LAG,
ZK_WRITE_SNAPSHOT_TIME_MS,
ZK_WRITE_DELTA_TIME_MS
).forEach(r::removeMetric));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,12 +475,17 @@ public void run() throws Exception {
}

Map<String, Integer> dualWriteCounts = new TreeMap<>();
long startTime = time.nanoseconds();
if (isSnapshot) {
zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime));
} else {
zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer(
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
if (zkMetadataWriter.handleDelta(prevImage, image, delta, countingOperationConsumer(
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation))) {
// Only record delta write time if we changed something. Otherwise, no-op records will skew timings.
controllerMetrics.updateZkWriteDeltaTimeMs(NANOSECONDS.toMillis(time.nanoseconds() - startTime));
}
}
if (dualWriteCounts.isEmpty()) {
log.trace("Did not make any ZK writes when handling KRaft {}", isSnapshot ? "snapshot" : "delta");
Expand Down Expand Up @@ -556,6 +561,8 @@ public void run() throws Exception {
log.error("KRaft controller indicates a completed migration, but the migration driver is somehow active.");
transitionTo(MigrationDriverState.INACTIVE);
break;
default:
throw new IllegalStateException("Unsupported ZkMigrationState " + zkMigrationState);
}
}
}
Expand Down Expand Up @@ -658,8 +665,11 @@ public void run() throws Exception {
if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) {
log.info("Performing a full metadata sync from KRaft to ZK.");
Map<String, Integer> dualWriteCounts = new TreeMap<>();
long startTime = time.nanoseconds();
zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
dualWriteCounts, KRaftMigrationDriver.this::applyMigrationOperation));
long endTime = time.nanoseconds();
controllerMetrics.updateZkWriteSnapshotTimeMs(NANOSECONDS.toMillis(startTime - endTime));
log.info("Made the following ZK writes when reconciling with KRaft state: {}", dualWriteCounts);
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,34 @@ public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer
handleAclsSnapshot(image.acls(), operationConsumer);
}

public void handleDelta(
public boolean handleDelta(
MetadataImage previousImage,
MetadataImage image,
MetadataDelta delta,
KRaftMigrationOperationConsumer operationConsumer
) {
boolean updated = false;
if (delta.topicsDelta() != null) {
handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, image.topics(), delta.topicsDelta(), operationConsumer);
updated = true;
}
if (delta.configsDelta() != null) {
handleConfigsDelta(image.configs(), delta.configsDelta(), operationConsumer);
updated = true;
}
if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
handleClientQuotasDelta(image, delta, operationConsumer);
updated = true;
}
if (delta.producerIdsDelta() != null) {
handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer);
updated = true;
}
if (delta.aclsDelta() != null) {
handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer);
updated = true;
}
return updated;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ public enum ZkMigrationState {
* will persist indefinitely after the migration. In operational terms, this is the same as the NONE
* state.
*/
POST_MIGRATION((byte) 3);
POST_MIGRATION((byte) 3),

/**
* The controller is a ZK controller. No migration has been performed. This state is never persisted
* and is only used by KafkaController in order to have a unified metric that indicates what kind of
* metadata state the controller is in.
*/
ZK((byte) 4);

private final byte value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void testMetricNames() {
new HashSet<>(Arrays.asList(
"kafka.controller:type=KafkaController,name=ActiveBrokerCount",
"kafka.controller:type=KafkaController,name=FencedBrokerCount",
"kafka.controller:type=KafkaController,name=MigratingZkBrokerCount",
"kafka.controller:type=KafkaController,name=GlobalPartitionCount",
"kafka.controller:type=KafkaController,name=GlobalTopicCount",
"kafka.controller:type=KafkaController,name=MetadataErrorCount",
Expand Down
Loading

0 comments on commit a900794

Please sign in to comment.