Skip to content

Commit

Permalink
Make the broker round robin selection in task execution configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
allenxwang committed Nov 19, 2024
1 parent 54078cf commit 6f399b6
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,13 @@ public final class ExecutorConfig {
public static final String MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_DOC = "The minimum execution progress check interval that users "
+ "can dynamically set the execution progress check interval to.";

/**
* <code>prefer.broker.roundrobin.in.execution</code>
*/
public static final String PREFER_BROKER_ROUND_ROBIN_CONFIG = "prefer.broker.roundrobin.in.execution";
public static final boolean DEFAULT_PREFER_BROKER_ROUND_ROBIN = true;
public static final String PREFER_BROKER_ROUND_ROBIN_DOC = "whether to prefer round-robin of brokers in rebalance execution.";

/**
* <code>slow.task.alerting.backoff.ms</code>
*/
Expand Down Expand Up @@ -990,6 +997,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.BOOLEAN,
DEFAULT_AUTO_STOP_EXTERNAL_AGENT,
ConfigDef.Importance.MEDIUM,
AUTO_STOP_EXTERNAL_AGENT_DOC);
AUTO_STOP_EXTERNAL_AGENT_DOC)
.define(PREFER_BROKER_ROUND_ROBIN_CONFIG,
ConfigDef.Type.BOOLEAN,
DEFAULT_PREFER_BROKER_ROUND_ROBIN,
ConfigDef.Importance.MEDIUM,
PREFER_BROKER_ROUND_ROBIN_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTER_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTRA_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.*;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.LEADER_ACTION;
import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;

/**
Expand Down Expand Up @@ -80,6 +83,7 @@ public class ExecutionTaskPlanner {
private final long _taskExecutionAlertingThresholdMs;
private final double _interBrokerReplicaMovementRateAlertingThreshold;
private final double _intraBrokerReplicaMovementRateAlertingThreshold;
private final boolean _preferRoundRobin;
private static final int PRIORITIZE_BROKER_1 = -1;
private static final int PRIORITIZE_BROKER_2 = 1;
private static final int PRIORITIZE_NONE = 0;
Expand Down Expand Up @@ -120,6 +124,7 @@ public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig co

_defaultReplicaMovementTaskStrategy = _defaultReplicaMovementTaskStrategy.chainBaseReplicaMovementStrategyIfAbsent();
}
_preferRoundRobin = config.getBoolean(PREFER_BROKER_ROUND_ROBIN_CONFIG);
}

/**
Expand Down Expand Up @@ -379,7 +384,7 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
break;
}
// If this broker has already involved in this round, skip it.
if (brokerInvolved.contains(brokerId)) {
if (_preferRoundRobin && brokerInvolved.contains(brokerId)) {
continue;
}
// Check the available balancing proposals of this broker to see if we can find one ready to execute.
Expand All @@ -398,8 +403,8 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
int sourceBroker = task.proposal().oldLeader().brokerId();
Set<Integer> destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId)
.boxed().collect(Collectors.toSet());
if (brokerInvolved.contains(sourceBroker)
|| KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) {
if (_preferRoundRobin && (brokerInvolved.contains(sourceBroker)
|| KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers))) {
continue;
}
TopicPartition tp = task.proposal().topicPartition();
Expand Down

0 comments on commit 6f399b6

Please sign in to comment.