Skip to content

Commit c45197c

Browse files
gaoyajun02RexXiong
gaoyajun02
authored andcommitted
[CELEBORN-1843] Optimize roundrobin for more balanced disk slot allocation
### What changes were proposed in this pull request? This PR optimizes the RoundRobin algorithm to achieve a more balanced disk slot allocation across workers. Previously, when allocating 3000 partitions using RoundRobin, the slot distribution across worker disks was [668, 666, 666], which resulted in one disk having 2 more slots than the others. After the optimization, the slot distribution is now [667, 667, 666], ensuring a more balanced allocation. ### Why are the changes needed? The changes are necessary to improve load balancing across worker disks, reducing the risk of overloading a single disk. This ensures a more predictable and fair distribution of slots, which can lead to better performance and resource utilization. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #3074 from gaoyajun02/1843. Authored-by: gaoyajun02 <[email protected]> Signed-off-by: Shuang <[email protected]>
1 parent f952602 commit c45197c

File tree

2 files changed

+58
-24
lines changed

2 files changed

+58
-24
lines changed

master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java

+6-13
Original file line numberDiff line numberDiff line change
@@ -334,9 +334,8 @@ private static List<Integer> roundRobin(
334334
boolean shouldReplicate,
335335
boolean shouldRackAware,
336336
int availableStorageTypes) {
337-
// workerInfo -> (diskIndexForPrimary, diskIndexForReplica)
338-
Map<WorkerInfo, Integer> workerDiskIndexForPrimary = new HashMap<>();
339-
Map<WorkerInfo, Integer> workerDiskIndexForReplica = new HashMap<>();
337+
// workerInfo -> (diskIndexForPrimaryAndReplica)
338+
Map<WorkerInfo, Integer> workerDiskIndex = new HashMap<>();
340339
List<Integer> partitionIdList = new LinkedList<>(partitionIds);
341340

342341
final int workerSize = workers.size();
@@ -361,11 +360,7 @@ private static List<Integer> roundRobin(
361360
}
362361
storageInfo =
363362
getStorageInfo(
364-
workers,
365-
nextPrimaryInd,
366-
slotsRestrictions,
367-
workerDiskIndexForPrimary,
368-
availableStorageTypes);
363+
workers, nextPrimaryInd, slotsRestrictions, workerDiskIndex, availableStorageTypes);
369364
} else {
370365
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
371366
while (!workers.get(nextPrimaryInd).haveDisk()) {
@@ -376,8 +371,7 @@ private static List<Integer> roundRobin(
376371
}
377372
}
378373
storageInfo =
379-
getStorageInfo(
380-
workers, nextPrimaryInd, null, workerDiskIndexForPrimary, availableStorageTypes);
374+
getStorageInfo(workers, nextPrimaryInd, null, workerDiskIndex, availableStorageTypes);
381375
}
382376
PartitionLocation primaryPartition =
383377
createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true);
@@ -398,7 +392,7 @@ private static List<Integer> roundRobin(
398392
workers,
399393
nextReplicaInd,
400394
slotsRestrictions,
401-
workerDiskIndexForReplica,
395+
workerDiskIndex,
402396
availableStorageTypes);
403397
} else if (shouldRackAware) {
404398
while (nextReplicaInd == nextPrimaryInd
@@ -418,8 +412,7 @@ private static List<Integer> roundRobin(
418412
}
419413
}
420414
storageInfo =
421-
getStorageInfo(
422-
workers, nextReplicaInd, null, workerDiskIndexForReplica, availableStorageTypes);
415+
getStorageInfo(workers, nextReplicaInd, null, workerDiskIndex, availableStorageTypes);
423416
}
424417
PartitionLocation replicaPartition =
425418
createLocation(

master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java

+52-11
Original file line numberDiff line numberDiff line change
@@ -216,26 +216,54 @@ public void testAllocate3000ReduceIdsWithoutReplicate() {
216216
check(workers, partitionIds, shouldReplicate, true);
217217
}
218218

219+
@Test
220+
public void testAllocate3000ReduceIdsWithReplicateOnRoundRobin() {
221+
final List<WorkerInfo> workers = prepareWorkers(true);
222+
final List<Integer> partitionIds = new ArrayList<>();
223+
for (int i = 0; i < 3000; i++) {
224+
partitionIds.add(i);
225+
}
226+
final boolean shouldReplicate = true;
227+
228+
check(workers, partitionIds, shouldReplicate, true, true);
229+
}
230+
219231
private void check(
220232
List<WorkerInfo> workers,
221233
List<Integer> partitionIds,
222234
boolean shouldReplicate,
223235
boolean expectSuccess) {
236+
check(workers, partitionIds, shouldReplicate, expectSuccess, false);
237+
}
238+
239+
private void check(
240+
List<WorkerInfo> workers,
241+
List<Integer> partitionIds,
242+
boolean shouldReplicate,
243+
boolean expectSuccess,
244+
boolean roundrobin) {
224245
String shuffleKey = "appId-1";
225246
CelebornConf conf = new CelebornConf();
226247
conf.set(CelebornConf.MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM().key(), "2");
227248
conf.set(CelebornConf.MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT().key(), "1");
228-
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots =
229-
SlotsAllocator.offerSlotsLoadAware(
230-
workers,
231-
partitionIds,
232-
shouldReplicate,
233-
false,
234-
conf.masterSlotAssignLoadAwareDiskGroupNum(),
235-
conf.masterSlotAssignLoadAwareDiskGroupGradient(),
236-
conf.masterSlotAssignLoadAwareFlushTimeWeight(),
237-
conf.masterSlotAssignLoadAwareFetchTimeWeight(),
238-
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
249+
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots;
250+
if (roundrobin) {
251+
slots =
252+
SlotsAllocator.offerSlotsRoundRobin(
253+
workers, partitionIds, shouldReplicate, false, StorageInfo.ALL_TYPES_AVAILABLE_MASK);
254+
} else {
255+
slots =
256+
SlotsAllocator.offerSlotsLoadAware(
257+
workers,
258+
partitionIds,
259+
shouldReplicate,
260+
false,
261+
conf.masterSlotAssignLoadAwareDiskGroupNum(),
262+
conf.masterSlotAssignLoadAwareDiskGroupGradient(),
263+
conf.masterSlotAssignLoadAwareFlushTimeWeight(),
264+
conf.masterSlotAssignLoadAwareFetchTimeWeight(),
265+
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
266+
}
239267
if (expectSuccess) {
240268
if (shouldReplicate) {
241269
slots.forEach(
@@ -266,6 +294,19 @@ private void check(
266294
if (allocationMap.containsKey("UNKNOWN_DISK")) {
267295
unknownDiskSlots += allocationMap.get("UNKNOWN_DISK");
268296
}
297+
if (roundrobin && !allocationMap.isEmpty()) {
298+
int maxSlots = Collections.max(allocationMap.values());
299+
int minSlots = Collections.min(allocationMap.values());
300+
assertTrue(
301+
"Worker "
302+
+ worker.host()
303+
+ " has unbalanced slot allocation. "
304+
+ "Max: "
305+
+ maxSlots
306+
+ ", Min: "
307+
+ minSlots,
308+
maxSlots - minSlots <= 1);
309+
}
269310
}
270311
int allocateToDiskSlots = 0;
271312
for (WorkerInfo worker : workers) {

0 commit comments

Comments
 (0)