From 6f399b646cdd99b93961642228f0570a5f18fc5a Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Tue, 19 Nov 2024 15:00:35 -0800 Subject: [PATCH] Make the broker round robin selection in task execution configurable --- .../config/constants/ExecutorConfig.java | 14 +++++++++++++- .../executor/ExecutionTaskPlanner.java | 17 +++++++++++------ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index 7efba8246..137cd8074 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -559,6 +559,13 @@ public final class ExecutorConfig { public static final String MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_DOC = "The minimum execution progress check interval that users " + "can dynamically set the execution progress check interval to."; + /** + * prefer.broker.roundrobin.in.execution + */ + public static final String PREFER_BROKER_ROUND_ROBIN_CONFIG = "prefer.broker.roundrobin.in.execution"; + public static final boolean DEFAULT_PREFER_BROKER_ROUND_ROBIN = true; + public static final String PREFER_BROKER_ROUND_ROBIN_DOC = "whether to prefer round-robin of brokers in rebalance execution."; + /** * slow.task.alerting.backoff.ms */ @@ -990,6 +997,11 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.BOOLEAN, DEFAULT_AUTO_STOP_EXTERNAL_AGENT, ConfigDef.Importance.MEDIUM, - AUTO_STOP_EXTERNAL_AGENT_DOC); + AUTO_STOP_EXTERNAL_AGENT_DOC) + .define(PREFER_BROKER_ROUND_ROBIN_CONFIG, + ConfigDef.Type.BOOLEAN, + DEFAULT_PREFER_BROKER_ROUND_ROBIN, + ConfigDef.Importance.MEDIUM, + PREFER_BROKER_ROUND_ROBIN_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index 2f3f671c0..32db67f0d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -39,11 +39,14 @@ import org.slf4j.LoggerFactory; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG; -import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG; -import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTER_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTRA_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG; -import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.*; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG; +import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION; +import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION; +import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.LEADER_ACTION; import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; /** @@ -80,6 +83,7 @@ public class ExecutionTaskPlanner { private final long _taskExecutionAlertingThresholdMs; private final double _interBrokerReplicaMovementRateAlertingThreshold; private final double _intraBrokerReplicaMovementRateAlertingThreshold; + private final boolean _preferRoundRobin; private static final int PRIORITIZE_BROKER_1 = -1; private static final int PRIORITIZE_BROKER_2 = 1; private static final int PRIORITIZE_NONE = 0; @@ -120,6 +124,7 @@ public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig co _defaultReplicaMovementTaskStrategy = _defaultReplicaMovementTaskStrategy.chainBaseReplicaMovementStrategyIfAbsent(); } + _preferRoundRobin = config.getBoolean(PREFER_BROKER_ROUND_ROBIN_CONFIG); } /** @@ -379,7 +384,7 @@ public List getInterBrokerReplicaMovementTasks(Map getInterBrokerReplicaMovementTasks(Map destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId) .boxed().collect(Collectors.toSet()); - if (brokerInvolved.contains(sourceBroker) - || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) { + if (_preferRoundRobin && (brokerInvolved.contains(sourceBroker) + || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers))) { continue; } TopicPartition tp = task.proposal().topicPartition();