diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index 2cc91493c28..d28d5d67695 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -334,9 +334,8 @@ private static List roundRobin( boolean shouldReplicate, boolean shouldRackAware, int availableStorageTypes) { - // workerInfo -> (diskIndexForPrimary, diskIndexForReplica) - Map workerDiskIndexForPrimary = new HashMap<>(); - Map workerDiskIndexForReplica = new HashMap<>(); + // workerInfo -> (diskIndexForPrimaryAndReplica) + Map workerDiskIndex = new HashMap<>(); List partitionIdList = new LinkedList<>(partitionIds); final int workerSize = workers.size(); @@ -361,11 +360,7 @@ private static List roundRobin( } storageInfo = getStorageInfo( - workers, - nextPrimaryInd, - slotsRestrictions, - workerDiskIndexForPrimary, - availableStorageTypes); + workers, nextPrimaryInd, slotsRestrictions, workerDiskIndex, availableStorageTypes); } else { if (StorageInfo.localDiskAvailable(availableStorageTypes)) { while (!workers.get(nextPrimaryInd).haveDisk()) { @@ -376,8 +371,7 @@ private static List roundRobin( } } storageInfo = - getStorageInfo( - workers, nextPrimaryInd, null, workerDiskIndexForPrimary, availableStorageTypes); + getStorageInfo(workers, nextPrimaryInd, null, workerDiskIndex, availableStorageTypes); } PartitionLocation primaryPartition = createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true); @@ -398,7 +392,7 @@ private static List roundRobin( workers, nextReplicaInd, slotsRestrictions, - workerDiskIndexForReplica, + workerDiskIndex, availableStorageTypes); } else if (shouldRackAware) { while (nextReplicaInd == nextPrimaryInd @@ -418,8 +412,7 @@ private static List roundRobin( } } storageInfo = - getStorageInfo( - workers, nextReplicaInd, null, workerDiskIndexForReplica, availableStorageTypes); + getStorageInfo(workers, nextReplicaInd, null, workerDiskIndex, availableStorageTypes); } PartitionLocation replicaPartition = createLocation( diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java index 0fb4c9d42ec..e5b7595a98a 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java @@ -216,26 +216,54 @@ public void testAllocate3000ReduceIdsWithoutReplicate() { check(workers, partitionIds, shouldReplicate, true); } + @Test + public void testAllocate3000ReduceIdsWithReplicateOnRoundRobin() { + final List workers = prepareWorkers(true); + final List partitionIds = new ArrayList<>(); + for (int i = 0; i < 3000; i++) { + partitionIds.add(i); + } + final boolean shouldReplicate = true; + + check(workers, partitionIds, shouldReplicate, true, true); + } + private void check( List workers, List partitionIds, boolean shouldReplicate, boolean expectSuccess) { + check(workers, partitionIds, shouldReplicate, expectSuccess, false); + } + + private void check( + List workers, + List partitionIds, + boolean shouldReplicate, + boolean expectSuccess, + boolean roundrobin) { String shuffleKey = "appId-1"; CelebornConf conf = new CelebornConf(); conf.set(CelebornConf.MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM().key(), "2"); conf.set(CelebornConf.MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT().key(), "1"); - Map, List>> slots = - SlotsAllocator.offerSlotsLoadAware( - workers, - partitionIds, - shouldReplicate, - false, - conf.masterSlotAssignLoadAwareDiskGroupNum(), - conf.masterSlotAssignLoadAwareDiskGroupGradient(), - conf.masterSlotAssignLoadAwareFlushTimeWeight(), - conf.masterSlotAssignLoadAwareFetchTimeWeight(), - StorageInfo.ALL_TYPES_AVAILABLE_MASK); + Map, List>> slots; + if (roundrobin) { + slots = + SlotsAllocator.offerSlotsRoundRobin( + workers, partitionIds, shouldReplicate, false, StorageInfo.ALL_TYPES_AVAILABLE_MASK); + } else { + slots = + SlotsAllocator.offerSlotsLoadAware( + workers, + partitionIds, + shouldReplicate, + false, + conf.masterSlotAssignLoadAwareDiskGroupNum(), + conf.masterSlotAssignLoadAwareDiskGroupGradient(), + conf.masterSlotAssignLoadAwareFlushTimeWeight(), + conf.masterSlotAssignLoadAwareFetchTimeWeight(), + StorageInfo.ALL_TYPES_AVAILABLE_MASK); + } if (expectSuccess) { if (shouldReplicate) { slots.forEach( @@ -266,6 +294,19 @@ private void check( if (allocationMap.containsKey("UNKNOWN_DISK")) { unknownDiskSlots += allocationMap.get("UNKNOWN_DISK"); } + if (roundrobin && !allocationMap.isEmpty()) { + int maxSlots = Collections.max(allocationMap.values()); + int minSlots = Collections.min(allocationMap.values()); + assertTrue( + "Worker " + + worker.host() + + " has unbalanced slot allocation. " + + "Max: " + + maxSlots + + ", Min: " + + minSlots, + maxSlots - minSlots <= 1); + } } int allocateToDiskSlots = 0; for (WorkerInfo worker : workers) {