Skip to content

Commit

Permalink
speeds up fate lock acquisition (#5262)
Browse files Browse the repository at this point in the history
Stores the lock data for fate locks in the zookeeper node name instead
of the zookeeper data for the node.  Ran some local performance test
with hundreds of fate operations and saw lock times go from 750ms to
15ms.

fixes #5181


Co-authored-by: Christopher Tubbs <[email protected]>
keith-turner and ctubbsii authored Feb 1, 2025
1 parent e37cae1 commit 3213646
Showing 7 changed files with 284 additions and 117 deletions.
24 changes: 12 additions & 12 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
@@ -33,12 +33,14 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.fate.zookeeper.FateLock;
import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -275,38 +277,36 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath,
List<String> lockedIds = zr.getChildren(lockPath.toString());

for (String id : lockedIds) {

try {

FateLockPath fLockPath = FateLock.path(lockPath + "/" + id);
List<String> lockNodes =
FateLock.validateAndSort(fLockPath, zr.getChildren(fLockPath.toString()));
SortedSet<FateLock.NodeName> lockNodes =
FateLock.validateAndWarn(fLockPath, zr.getChildren(fLockPath.toString()));

int pos = 0;
boolean sawWriteLock = false;

for (String node : lockNodes) {
for (FateLock.NodeName node : lockNodes) {
try {
byte[] data = zr.getData(lockPath + "/" + id + "/" + node);
// Example data: "READ:<FateId>". FateId contains ':' hence the limit of 2
String[] lda = new String(data, UTF_8).split(":", 2);
FateId fateId = FateId.from(lda[1]);
FateLock.FateLockEntry fateLockEntry = node.fateLockEntry.get();
var fateId = fateLockEntry.getFateId();
var lockType = fateLockEntry.getLockType();

if (lda[0].charAt(0) == 'W') {
if (lockType == LockType.WRITE) {
sawWriteLock = true;
}

Map<FateId,List<String>> locks;

if (pos == 0) {
locks = heldLocks;
} else if (lda[0].charAt(0) == 'R' && !sawWriteLock) {
} else if (lockType == LockType.READ && !sawWriteLock) {
locks = heldLocks;
} else {
locks = waitingLocks;
}

locks.computeIfAbsent(fateId, k -> new ArrayList<>()).add(lda[0].charAt(0) + ":" + id);
locks.computeIfAbsent(fateId, k -> new ArrayList<>())
.add(lockType.name().charAt(0) + ":" + id);

} catch (Exception e) {
log.error("{}", e.getMessage(), e);
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.function.BiPredicate;
import java.util.function.Supplier;

import org.apache.accumulo.core.fate.FateId;
@@ -50,9 +51,10 @@ public enum LockType {
// them,
// a writer only runs when they are at the top of the queue.
public interface QueueLock {
SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry);
SortedMap<Long,Supplier<FateLockEntry>>
getEntries(BiPredicate<Long,Supplier<FateLockEntry>> predicate);

void removeEntry(long entry);
void removeEntry(FateLockEntry data, long seq);

long addEntry(FateLockEntry entry);
}
@@ -115,7 +117,9 @@ public boolean tryLock() {
entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId));
log.info("Added lock entry {} fateId {} lockType {}", entry, fateId, getType());
}
SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry);

SortedMap<Long,Supplier<FateLockEntry>> entries =
qlock.getEntries((seq, lockData) -> seq <= entry);
for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) {
if (entry.getKey().equals(this.entry)) {
return true;
@@ -150,7 +154,7 @@ public void unlock() {
return;
}
log.debug("Removing lock entry {} fateId {} lockType {}", entry, this.fateId, getType());
qlock.removeEntry(entry);
qlock.removeEntry(FateLockEntry.from(this.getType(), this.fateId), entry);
entry = -1;
}

@@ -181,7 +185,8 @@ public boolean tryLock() {
entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId));
log.info("Added lock entry {} fateId {} lockType {}", entry, this.fateId, getType());
}
SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry);
SortedMap<Long,Supplier<FateLockEntry>> entries =
qlock.getEntries((seq, locData) -> seq <= entry);
Iterator<Entry<Long,Supplier<FateLockEntry>>> iterator = entries.entrySet().iterator();
if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
@@ -200,19 +205,26 @@ public DistributedReadWriteLock(QueueLock qlock, FateId fateId) {
}

public static DistributedLock recoverLock(QueueLock qlock, FateId fateId) {
SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) {
FateLockEntry lockEntry = entry.getValue().get();
if (fateId.equals(lockEntry.getFateId())) {
SortedMap<Long,Supplier<FateLockEntry>> entries =
qlock.getEntries((seq, lockData) -> lockData.get().fateId.equals(fateId));

switch (entries.size()) {
case 0:
return null;
case 1:
var entry = entries.entrySet().iterator().next();
FateLockEntry lockEntry = entry.getValue().get();
switch (lockEntry.getLockType()) {
case READ:
return new ReadLock(qlock, lockEntry.getFateId(), entry.getKey());
case WRITE:
return new WriteLock(qlock, lockEntry.getFateId(), entry.getKey());
default:
throw new IllegalStateException("Unknown lock type " + lockEntry.getLockType());
}
}
default:
throw new IllegalStateException("Found more than one lock node " + entries);
}
return null;
}

@Override
Original file line number Diff line number Diff line change
@@ -18,16 +18,16 @@
*/
package org.apache.accumulo.core.fate.zookeeper;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.BiPredicate;
import java.util.function.Supplier;

import org.apache.accumulo.core.fate.FateId;
@@ -41,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;

/**
@@ -49,7 +50,7 @@
public class FateLock implements QueueLock {
private static final Logger log = LoggerFactory.getLogger(FateLock.class);

private static final String PREFIX = "flock#";
static final String PREFIX = "flock#";

private final ZooReaderWriter zoo;
private final FateLockPath path;
@@ -67,7 +68,7 @@ public String toString() {
}
}

public static class FateLockEntry {
public static class FateLockEntry implements Comparable<FateLockEntry> {
final LockType lockType;
final FateId fateId;

@@ -76,26 +77,10 @@ private FateLockEntry(LockType lockType, FateId fateId) {
this.fateId = Objects.requireNonNull(fateId);
}

private FateLockEntry(byte[] entry) {
if (entry == null || entry.length < 1) {
throw new IllegalArgumentException();
}

int split = -1;
for (int i = 0; i < entry.length; i++) {
if (entry[i] == ':') {
split = i;
break;
}
}

if (split == -1) {
throw new IllegalArgumentException();
}

this.lockType = LockType.valueOf(new String(entry, 0, split, UTF_8));
this.fateId =
FateId.from(new String(Arrays.copyOfRange(entry, split + 1, entry.length), UTF_8));
private FateLockEntry(String entry) {
var fields = entry.split(":", 2);
this.lockType = LockType.valueOf(fields[0]);
this.fateId = FateId.from(fields[1]);
}

public LockType getLockType() {
@@ -106,14 +91,8 @@ public FateId getFateId() {
return fateId;
}

public byte[] serialize() {
byte[] typeBytes = lockType.name().getBytes(UTF_8);
byte[] fateIdBytes = fateId.canonical().getBytes(UTF_8);
byte[] result = new byte[fateIdBytes.length + 1 + typeBytes.length];
System.arraycopy(typeBytes, 0, result, 0, typeBytes.length);
result[typeBytes.length] = ':';
System.arraycopy(fateIdBytes, 0, result, typeBytes.length + 1, fateIdBytes.length);
return result;
public String serialize() {
return lockType.name() + ":" + fateId.canonical();
}

@Override
@@ -137,9 +116,18 @@ public static FateLockEntry from(LockType lockType, FateId fateId) {
return new FateLockEntry(lockType, fateId);
}

public static FateLockEntry deserialize(byte[] serialized) {
public static FateLockEntry deserialize(String serialized) {
return new FateLockEntry(serialized);
}

@Override
public int compareTo(FateLockEntry o) {
int cmp = lockType.compareTo(o.lockType);
if (cmp == 0) {
cmp = fateId.compareTo(o.fateId);
}
return cmp;
}
}

public static FateLockPath path(String path) {
@@ -151,16 +139,59 @@ public FateLock(ZooReaderWriter zrw, FateLockPath path) {
this.path = requireNonNull(path);
}

public static class NodeName implements Comparable<NodeName> {
public final long sequence;
public final Supplier<FateLockEntry> fateLockEntry;

NodeName(String nodeName) {
int len = nodeName.length();
Preconditions.checkArgument(nodeName.startsWith(PREFIX) && nodeName.charAt(len - 11) == '#',
"Illegal node name %s", nodeName);
sequence = Long.parseUnsignedLong(nodeName.substring(len - 10), 10);
// Use a supplier so we don't need to deserialize unless the calling code cares about
// the value for that entry.
fateLockEntry = Suppliers
.memoize(() -> FateLockEntry.deserialize(nodeName.substring(PREFIX.length(), len - 11)));
}

@Override
public int compareTo(NodeName o) {
int cmp = Long.compare(sequence, o.sequence);
if (cmp == 0) {
cmp = fateLockEntry.get().compareTo(o.fateLockEntry.get());
}
return cmp;
}

@Override
public boolean equals(Object o) {
if (o instanceof NodeName) {
return this.compareTo((NodeName) o) == 0;
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(sequence, fateLockEntry.get());
}
}

@Override
public long addEntry(FateLockEntry entry) {

String dataString = entry.serialize();
Preconditions.checkState(!dataString.contains("#"));

String newPath;
try {
while (true) {
try {
newPath = zoo.putPersistentSequential(path + "/" + PREFIX, entry.serialize());
newPath =
zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", new byte[0]);
String[] parts = newPath.split("/");
String last = parts[parts.length - 1];
return Long.parseLong(last.substring(PREFIX.length()));
return new NodeName(last).sequence;
} catch (NoNodeException nne) {
// the parent does not exist so try to create it
zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
@@ -172,7 +203,8 @@ public long addEntry(FateLockEntry entry) {
}

@Override
public SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) {
public SortedMap<Long,Supplier<FateLockEntry>>
getEntries(BiPredicate<Long,Supplier<FateLockEntry>> predicate) {
SortedMap<Long,Supplier<FateLockEntry>> result = new TreeMap<>();
try {
List<String> children = Collections.emptyList();
@@ -184,17 +216,9 @@ public SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) {
}

for (String name : children) {
// this try catch must be done inside the loop because some subset of the children may exist
try {
long order = Long.parseLong(name.substring(PREFIX.length()));
if (order <= entry) {
byte[] data = zoo.getData(path + "/" + name);
// Use a supplier so we don't need to deserialize unless the calling code cares about
// the value for that entry.
result.put(order, Suppliers.memoize(() -> FateLockEntry.deserialize(data)));
}
} catch (KeeperException.NoNodeException ex) {
// ignored
var parsed = new NodeName(name);
if (predicate.test(parsed.sequence, parsed.fateLockEntry)) {
Preconditions.checkState(result.put(parsed.sequence, parsed.fateLockEntry) == null);
}
}
} catch (KeeperException | InterruptedException ex) {
@@ -204,9 +228,12 @@ public SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) {
}

@Override
public void removeEntry(long entry) {
public void removeEntry(FateLockEntry data, long entry) {
String dataString = data.serialize();
Preconditions.checkState(!dataString.contains("#"));
try {
zoo.recursiveDelete(path + String.format("/%s%010d", PREFIX, entry), NodeMissingPolicy.SKIP);
zoo.recursiveDelete(path + String.format("/%s%s#%010d", PREFIX, dataString, entry),
NodeMissingPolicy.SKIP);
try {
// try to delete the parent if it has no children
zoo.delete(path.toString());
@@ -221,50 +248,25 @@ public void removeEntry(long entry) {
/**
* Validate and sort child nodes at this lock path by the lock prefix
*/
public static List<String> validateAndSort(FateLockPath path, List<String> children) {
public static SortedSet<NodeName> validateAndWarn(FateLockPath path, List<String> children) {
log.trace("validating and sorting children at path {}", path);
List<String> validChildren = new ArrayList<>();

SortedSet<NodeName> validChildren = new TreeSet<>();

if (children == null || children.isEmpty()) {
return validChildren;
}

children.forEach(c -> {
log.trace("Validating {}", c);
if (c.startsWith(PREFIX)) {
int idx = c.indexOf('#');
String sequenceNum = c.substring(idx + 1);
if (sequenceNum.length() == 10) {
try {
log.trace("Testing number format of {}", sequenceNum);
Integer.parseInt(sequenceNum);
validChildren.add(c);
} catch (NumberFormatException e) {
log.warn("Fate lock found with invalid sequence number format: {} (not a number)", c);
}
} else {
log.warn("Fate lock found with invalid sequence number format: {} (not 10 characters)",
c);
}
} else {
log.warn("Fate lock found with invalid lock format: {} (does not start with {})", c,
PREFIX);
try {
var fateLockNode = new NodeName(c);
validChildren.add(fateLockNode);
} catch (RuntimeException e) {
log.warn("Illegal fate lock node {}", c, e);
}
});

if (validChildren.size() > 1) {
validChildren.sort((o1, o2) -> {
// Lock should be of the form:
// lock-sequenceNumber
// Example:
// flock#0000000000

// Lock length - sequenceNumber length
// 16 - 10
int secondHashIdx = 6;
return Integer.valueOf(o1.substring(secondHashIdx))
.compareTo(Integer.valueOf(o2.substring(secondHashIdx)));
});
}
log.trace("Children nodes (size: {}): {}", validChildren.size(), validChildren);
return validChildren;
}
}
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.BiPredicate;
import java.util.function.Supplier;

import org.apache.accumulo.core.fate.FateId;
@@ -45,14 +46,19 @@ public static class MockQueueLock implements QueueLock {
final SortedMap<Long,FateLockEntry> locks = new TreeMap<>();

@Override
public synchronized SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) {
public synchronized SortedMap<Long,Supplier<FateLockEntry>>
getEntries(BiPredicate<Long,Supplier<FateLockEntry>> predicate) {
SortedMap<Long,Supplier<FateLockEntry>> result = new TreeMap<>();
locks.headMap(entry + 1).forEach((k, v) -> result.put(k, () -> v));
locks.forEach((seq, lockData) -> {
if (predicate.test(seq, () -> lockData)) {
result.put(seq, () -> lockData);
}
});
return result;
}

@Override
public synchronized void removeEntry(long entry) {
public synchronized void removeEntry(FateLockEntry data, long entry) {
synchronized (locks) {
locks.remove(entry);
locks.notifyAll();
@@ -147,7 +153,7 @@ public void testFateLockEntrySerDes() {
assertEquals(LockType.READ, entry.getLockType());
assertEquals(FateId.from(FateInstanceType.USER, uuid), entry.getFateId());

byte[] serialized = entry.serialize();
String serialized = entry.serialize();
var deserialized = FateLockEntry.deserialize(serialized);
assertEquals(entry, deserialized);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.fate.zookeeper;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.UUID;

import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.junit.jupiter.api.Test;

public class FateLockTest {

@Test
public void testParsing() {
var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
// ZooKeeper docs state that sequence numbers are formatted using %010d
String lockData = "WRITE:" + fateId.canonical();
var lockNode =
new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%010d", 40));
assertEquals(40, lockNode.sequence);
assertEquals(lockData, lockNode.fateLockEntry.get().serialize());

assertThrows(IllegalArgumentException.class,
() -> new FateLock.NodeName(lockData + "#" + String.format("%010d", 40)));
assertThrows(IllegalArgumentException.class,
() -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%d", 40)));
assertThrows(IllegalArgumentException.class,
() -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%09d", 40)));
assertThrows(IllegalArgumentException.class,
() -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%011d", 40)));
assertThrows(IllegalArgumentException.class,
() -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#abc"));
assertThrows(IllegalArgumentException.class,
() -> new FateLock.NodeName(FateLock.PREFIX + lockData + String.format("%010d", 40)));

// ZooKeeper docs state that sequence numbers can roll and become negative. The FateLock code
// does not support this, so make sure it fails if this happens.
for (int i : new int[] {Integer.MIN_VALUE, Integer.MIN_VALUE / 2, Integer.MIN_VALUE / 10,
Integer.MIN_VALUE / 1000, -40}) {
var seq = String.format("%010d", i);
if (seq.length() == 10) {
assertThrows(NumberFormatException.class,
() -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + seq));
} else if (seq.length() == 11) {
assertThrows(IllegalArgumentException.class,
() -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + seq));
} else {
fail("Unexpected length " + seq.length());
}
}

// Test a negative number that is not formatted w/ %010d
assertThrows(IllegalArgumentException.class,
() -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%d", -40)));
}
}
Original file line number Diff line number Diff line change
@@ -99,15 +99,20 @@ public static String row(int r) {
return String.format("r:%04d", r);
}

public static void compact(final AccumuloClient client, String table1, int modulus,
String expectedQueue, boolean wait)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
public static void addCompactionIterators(CompactionConfig config, int modulus,
String expectedQueue) {
IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
// make sure iterator options make it to compactor process
iterSetting.addOption("expectedQ", expectedQueue);
iterSetting.addOption("modulus", modulus + "");
CompactionConfig config =
new CompactionConfig().setIterators(List.of(iterSetting)).setWait(wait);
config.setIterators(List.of(iterSetting));
}

public static void compact(final AccumuloClient client, String table1, int modulus,
String expectedQueue, boolean wait)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
CompactionConfig config = new CompactionConfig().setWait(wait);
addCompactionIterators(config, modulus, expectedQueue);
client.tableOperations().compact(table1, config);
}

Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP6;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP8;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.addCompactionIterators;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
@@ -58,12 +59,14 @@
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.PluginConfig;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer;
@@ -72,6 +75,7 @@
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
@@ -514,6 +518,68 @@ public void testManytablets() throws Exception {
compact(client, table1, 3, GROUP4, true);

verify(client, table1, 3);

List<TabletId> tabletIds;
// start a compaction on each tablet
try (var tablets = client.tableOperations().getTabletInformation(table1, new Range())) {
tabletIds = tablets.map(TabletInformation::getTabletId).collect(Collectors.toList());
}
// compact the even tablet with a modulus filter of 2
List<Range> evenRanges = new ArrayList<>();
for (int i = 0; i < tabletIds.size(); i += 2) {
var tabletId = tabletIds.get(i);
CompactionConfig compactionConfig = new CompactionConfig()
.setStartRow(tabletId.getPrevEndRow()).setEndRow(tabletId.getEndRow()).setWait(false);
addCompactionIterators(compactionConfig, 2, GROUP4);
client.tableOperations().compact(table1, compactionConfig);
evenRanges.add(tabletId.toRange());
}

// compact the odd tablets with a modulus filter of 5
List<Range> oddRanges = new ArrayList<>();
for (int i = 1; i < tabletIds.size(); i += 2) {
var tabletId = tabletIds.get(i);
CompactionConfig compactionConfig = new CompactionConfig()
.setStartRow(tabletId.getPrevEndRow()).setEndRow(tabletId.getEndRow()).setWait(false);
addCompactionIterators(compactionConfig, 5, GROUP4);
client.tableOperations().compact(table1, compactionConfig);
oddRanges.add(tabletId.toRange());
}

Wait.waitFor(() -> {
try (BatchScanner scanner = client.createBatchScanner(table1)) {
scanner.setRanges(evenRanges);
// filtered out data that was divisible by 3 and then 2 by compactions, so should end up
// w/ only data divisible by 6
int matching = 0;
int nonMatching = 0;
for (var entry : scanner) {
int val = Integer.parseInt(entry.getValue().toString());
if (val % 6 == 0) {
matching++;
} else {
nonMatching++;
}
}
boolean evenDone = matching > 0 && nonMatching == 0;
// filtered out data that was divisible by 3 and then 5 by compactions, so should end up
// w/ only data divisible by 15
scanner.setRanges(oddRanges);
matching = 0;
nonMatching = 0;
for (var entry : scanner) {
int val = Integer.parseInt(entry.getValue().toString());
if (val % 15 == 0) {
matching++;
} else {
nonMatching++;
}
}
boolean oddDone = matching > 0 && nonMatching == 0;
return evenDone && oddDone;
}
});

}
}

0 comments on commit 3213646

Please sign in to comment.