Skip to content

Commit

Permalink
add unittest for LoadWeightBasedPlacement
Browse files Browse the repository at this point in the history
  • Loading branch information
fanjianye committed Mar 27, 2024
1 parent be25428 commit 28056fb
Showing 1 changed file with 164 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1932,6 +1932,170 @@ public void testRemoveBookieFromCluster() {
repp.onClusterChanged(addrs, new HashSet<BookieId>());
}

@Test
public void testLoadWeightedPlacementAndNewEnsemble() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r1");
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
// Update cluster
Set<BookieId> addrs = new HashSet<>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());


conf.setLoadWeightBasedPlacementEnabled(true);
conf.setLowLoadBookieRatio(0.5);
conf.setLoadThreshold(70);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

repp.onClusterChanged(addrs, new HashSet<>());
Map<BookieId, BookieInfo> bookieInfoMap = new HashMap<>();
bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024));
bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024));
bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024));
bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L, 75, 60, 60, 1024 * 1024));
repp.updateBookieInfo(bookieInfoMap);


// low-load-bookie's number <= all-selected-bookie's number * ratio
// fallback to random select in LoadWeightedSelectionImpl
// so addr4 still can be selected
Map<BookieId, Long> selectionCounts = new HashMap<>();
for (BookieId b : addrs) {
selectionCounts.put(b, 0L);
}
int numTries = 50000;
Set<BookieId> excludeList = new HashSet<>();
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse;
List<BookieId> ensemble;
int ensembleSize = 3;
int writeQuorumSize = 2;
int acqQuorumSize = 2;
for (int i = 0; i < numTries; i++) {
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList);
ensemble = ensembleResponse.getResult();
assertTrue(
"Rackaware selection not happening "
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver),
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver) >= 2);
for (BookieId b : ensemble) {
selectionCounts.put(b, selectionCounts.get(b) + 1);
}
}
assertTrue(selectionCounts.get(addr3.toBookieId()) != 0);
assertTrue(selectionCounts.get(addr4.toBookieId()) != 0);


// add addr5, then enough bookie in the same rack
// so LoadWeightedSelectionImpl would exclude addr4 which is high load
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
repp.handleBookiesThatJoined(Collections.singleton(addr5.toBookieId()));
bookieInfoMap.put(addr5.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024));
repp.updateBookieInfo(bookieInfoMap);

addrs.add(addr5.toBookieId());
selectionCounts = new HashMap<>();
for (BookieId b : addrs) {
selectionCounts.put(b, 0L);
}
for (int i = 0; i < numTries; i++) {
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList);
ensemble = ensembleResponse.getResult();
assertTrue(
"Rackaware selection not happening "
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver),
getNumCoveredWriteQuorums(ensemble, writeQuorumSize,
conf.getMinNumRacksPerWriteQuorum(), repp.bookieAddressResolver) >= 2);
for (BookieId b : ensemble) {
selectionCounts.put(b, selectionCounts.get(b) + 1);
}
}
assertTrue(selectionCounts.get(addr3.toBookieId()) != 0);
assertTrue(selectionCounts.get(addr4.toBookieId()) == 0);
assertTrue(selectionCounts.get(addr5.toBookieId()) != 0);
}

@Test
public void testLoadWeightedPlacementAndReplaceBookie() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r1");
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
// Update cluster
Set<BookieId> addrs = new HashSet<>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());

conf.setLoadWeightBasedPlacementEnabled(true);
conf.setLowLoadBookieRatio(0.5);
conf.setLoadThreshold(70);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

repp.onClusterChanged(addrs, new HashSet<>());
Map<BookieId, BookieInfo> bookieInfoMap = new HashMap<>();
bookieInfoMap.put(addr1.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024));
bookieInfoMap.put(addr2.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024));
bookieInfoMap.put(addr3.toBookieId(), new BookieInfo(100L, 100L, 56, 60, 60, 1024 * 1024));
bookieInfoMap.put(addr4.toBookieId(), new BookieInfo(100L, 100L, 75, 60, 60, 1024 * 1024));
repp.updateBookieInfo(bookieInfoMap);

// although addr2 is in excludedBookies
// since it can be added to LoadWeightedSelectionImpl
// low load bookie's number is enough and addr4 would be excluded
Map<BookieId, Long> selectionCounts = new HashMap<>();
for (BookieId b : addrs) {
selectionCounts.put(b, 0L);
}
int numTries = 50000;
EnsemblePlacementPolicy.PlacementResult<BookieId> replaceBookieResponse;
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy;
BookieId replacedBookie;
for (int i = 0; i < numTries; i++) {
// replace node under r3
replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(),
addr2.toBookieId(), new HashSet<>());
replacedBookie = replaceBookieResponse.getResult();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.getAdheringToPolicy();
assertTrue("replaced : " + replacedBookie, addr3.toBookieId().equals(replacedBookie));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1);
}
assertTrue(selectionCounts.get(addr3.toBookieId()) != 0);
assertTrue(selectionCounts.get(addr4.toBookieId()) == 0);
}

@Test
public void testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
Expand Down

0 comments on commit 28056fb

Please sign in to comment.