Skip to content

Commit

Permalink
speeds up fate lock acquisition
Browse files Browse the repository at this point in the history
Stores the lock data for fate locks in the zookeeper node name instead
of the zookeeper data for the node.  Ran some local performance test
with hundreds of fate operations and saw lock times go from 750ms to
15ms.

fixes apache#5181
  • Loading branch information
keith-turner committed Jan 15, 2025
1 parent 57e3dda commit 20fbbe8
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 74 deletions.
13 changes: 7 additions & 6 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
import java.util.Formatter;
Expand Down Expand Up @@ -285,19 +286,19 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath,
List<String> lockedIds = zr.getChildren(lockPath.toString());

for (String id : lockedIds) {

try {

FateLockPath fLockPath = FateLock.path(lockPath + "/" + id);
List<String> lockNodes =
FateLock.validateAndSort(fLockPath, zr.getChildren(fLockPath.toString()));
List<FateLock.FateLockNode> lockNodes =
FateLock.validateAndWarn(fLockPath, zr.getChildren(fLockPath.toString()));

lockNodes.sort(Comparator.comparingLong(ln -> ln.sequence));

int pos = 0;
boolean sawWriteLock = false;

for (String node : lockNodes) {
for (FateLock.FateLockNode node : lockNodes) {
try {
byte[] data = zr.getData(lockPath + "/" + id + "/" + node);
byte[] data = node.lockData.getBytes(UTF_8);
// Example data: "READ:<FateId>". FateId contains ':' hence the limit of 2
String[] lda = new String(data, UTF_8).split(":", 2);
FateId fateId = FateId.from(lda[1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.function.BiPredicate;

import org.apache.accumulo.core.util.UtilWaitThread;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,9 +99,9 @@ public byte[] getLockData() {
// them,
// a writer only runs when they are at the top of the queue.
public interface QueueLock {
SortedMap<Long,byte[]> getEarlierEntries(long entry);
SortedMap<Long,byte[]> getEntries(BiPredicate<Long,byte[]> predicate);

void removeEntry(long entry);
void removeEntry(byte[] data, long entry);

long addEntry(byte[] data);
}
Expand Down Expand Up @@ -164,7 +165,8 @@ public boolean tryLock() {
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);

SortedMap<Long,byte[]> entries = qlock.getEntries((seq, lockData) -> seq <= entry);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
ParsedLock parsed = new ParsedLock(entry.getValue());
if (entry.getKey().equals(this.entry)) {
Expand Down Expand Up @@ -200,7 +202,7 @@ public void unlock() {
}
log.debug("Removing lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
qlock.removeEntry(entry);
qlock.removeEntry(new ParsedLock(this.getType(), this.userData).getLockData(), entry);
entry = -1;
}

Expand Down Expand Up @@ -232,7 +234,7 @@ public boolean tryLock() {
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
SortedMap<Long,byte[]> entries = qlock.getEntries((seq, locData) -> seq <= entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
Expand All @@ -251,19 +253,28 @@ public DistributedReadWriteLock(QueueLock qlock, byte[] data) {
}

public static DistributedLock recoverLock(QueueLock qlock, byte[] data) {
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
ParsedLock parsed = new ParsedLock(entry.getValue());
if (Arrays.equals(data, parsed.getUserData())) {
SortedMap<Long,byte[]> entries = qlock.getEntries((seq, lockData) -> {
ParsedLock parsed = new ParsedLock(lockData);
return Arrays.equals(data, parsed.getUserData());
});

switch (entries.size()) {
case 0:
return null;
case 1:
var entry = entries.entrySet().iterator().next();
ParsedLock parsed = new ParsedLock(entry.getValue());
switch (parsed.getType()) {
case READ:
return new ReadLock(qlock, parsed.getUserData(), entry.getKey());
case WRITE:
return new WriteLock(qlock, parsed.getUserData(), entry.getKey());
default:
throw new IllegalStateException("Uknown lock type " + parsed.getType());
}
}
default:
throw new IllegalStateException("Found more than one lock node " + entries);
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
package org.apache.accumulo.core.fate.zookeeper;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.BiPredicate;

import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
Expand All @@ -35,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
* A persistent lock mechanism in ZooKeeper used for locking tables during FaTE operations.
*/
Expand Down Expand Up @@ -68,16 +72,34 @@ public FateLock(ZooReaderWriter zrw, FateLockPath path) {
this.path = requireNonNull(path);
}

public static class FateLockNode {
public final long sequence;
public final String lockData;

private FateLockNode(String nodeName) {
int len = nodeName.length();
Preconditions.checkArgument(nodeName.startsWith(PREFIX) && nodeName.charAt(len - 11) == '#',
"Illegal node name %s", nodeName);
sequence = Long.parseLong(nodeName.substring(len - 10));
lockData = nodeName.substring(PREFIX.length(), len - 11);
}
}

// TODO change data arg from byte[] to String.. in the rest of the code its always a String.
@Override
public long addEntry(byte[] data) {

String dataString = new String(data, UTF_8);
Preconditions.checkState(!dataString.contains("#"));

String newPath;
try {
while (true) {
try {
newPath = zoo.putPersistentSequential(path + "/" + PREFIX, data);
newPath = zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", data);
String[] parts = newPath.split("/");
String last = parts[parts.length - 1];
return Long.parseLong(last.substring(PREFIX.length()));
return new FateLockNode(last).sequence;
} catch (NoNodeException nne) {
// the parent does not exist so try to create it
zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
Expand All @@ -89,7 +111,7 @@ public long addEntry(byte[] data) {
}

@Override
public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
public SortedMap<Long,byte[]> getEntries(BiPredicate<Long,byte[]> predicate) {
SortedMap<Long,byte[]> result = new TreeMap<>();
try {
List<String> children = Collections.emptyList();
Expand All @@ -101,15 +123,10 @@ public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
}

for (String name : children) {
// this try catch must be done inside the loop because some subset of the children may exist
try {
long order = Long.parseLong(name.substring(PREFIX.length()));
if (order <= entry) {
byte[] data = zoo.getData(path + "/" + name);
result.put(order, data);
}
} catch (KeeperException.NoNodeException ex) {
// ignored
var parsed = new FateLockNode(name);
byte[] data = parsed.lockData.getBytes(UTF_8);
if (predicate.test(parsed.sequence, data)) {
result.put(parsed.sequence, data);
}
}
} catch (KeeperException | InterruptedException ex) {
Expand All @@ -119,9 +136,12 @@ public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
}

@Override
public void removeEntry(long entry) {
public void removeEntry(byte[] data, long entry) {
String dataString = new String(data, UTF_8);
Preconditions.checkState(!dataString.contains("#"));
try {
zoo.recursiveDelete(path + String.format("/%s%010d", PREFIX, entry), NodeMissingPolicy.SKIP);
zoo.recursiveDelete(path + String.format("/%s%s#%010d", PREFIX, dataString, entry),
NodeMissingPolicy.SKIP);
try {
// try to delete the parent if it has no children
zoo.delete(path.toString());
Expand All @@ -136,50 +156,25 @@ public void removeEntry(long entry) {
/**
* Validate and sort child nodes at this lock path by the lock prefix
*/
public static List<String> validateAndSort(FateLockPath path, List<String> children) {
public static List<FateLockNode> validateAndWarn(FateLockPath path, List<String> children) {
log.trace("validating and sorting children at path {}", path);
List<String> validChildren = new ArrayList<>();

List<FateLockNode> validChildren = new ArrayList<>();

if (children == null || children.isEmpty()) {
return validChildren;
}

children.forEach(c -> {
log.trace("Validating {}", c);
if (c.startsWith(PREFIX)) {
int idx = c.indexOf('#');
String sequenceNum = c.substring(idx + 1);
if (sequenceNum.length() == 10) {
try {
log.trace("Testing number format of {}", sequenceNum);
Integer.parseInt(sequenceNum);
validChildren.add(c);
} catch (NumberFormatException e) {
log.warn("Fate lock found with invalid sequence number format: {} (not a number)", c);
}
} else {
log.warn("Fate lock found with invalid sequence number format: {} (not 10 characters)",
c);
}
} else {
log.warn("Fate lock found with invalid lock format: {} (does not start with {})", c,
PREFIX);
try {
var fateLockNode = new FateLockNode(c);
validChildren.add(fateLockNode);
} catch (RuntimeException e) {
log.warn("Illegal fate lock node {}", c, e);
}
});

if (validChildren.size() > 1) {
validChildren.sort((o1, o2) -> {
// Lock should be of the form:
// lock-sequenceNumber
// Example:
// flock#0000000000

// Lock length - sequenceNumber length
// 16 - 10
int secondHashIdx = 6;
return Integer.valueOf(o1.substring(secondHashIdx))
.compareTo(Integer.valueOf(o2.substring(secondHashIdx)));
});
}
log.trace("Children nodes (size: {}): {}", validChildren.size(), validChildren);
return validChildren;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.BiPredicate;

import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock;
import org.junit.jupiter.api.Test;
Expand All @@ -40,14 +41,18 @@ public static class MockQueueLock implements QueueLock {
final SortedMap<Long,byte[]> locks = new TreeMap<>();

@Override
public synchronized SortedMap<Long,byte[]> getEarlierEntries(long entry) {
public synchronized SortedMap<Long,byte[]> getEntries(BiPredicate<Long,byte[]> predicate) {
SortedMap<Long,byte[]> result = new TreeMap<>();
result.putAll(locks.headMap(entry + 1));
locks.forEach((seq, lockData) -> {
if (predicate.test(seq, lockData)) {
result.put(seq, lockData);
}
});
return result;
}

@Override
public synchronized void removeEntry(long entry) {
public synchronized void removeEntry(byte[] data, long entry) {
synchronized (locks) {
locks.remove(entry);
locks.notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,20 @@ public static String row(int r) {
return String.format("r:%04d", r);
}

public static void compact(final AccumuloClient client, String table1, int modulus,
String expectedQueue, boolean wait)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
public static void addCompactionIterators(CompactionConfig config, int modulus,
String expectedQueue) {
IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
// make sure iterator options make it to compactor process
iterSetting.addOption("expectedQ", expectedQueue);
iterSetting.addOption("modulus", modulus + "");
CompactionConfig config =
new CompactionConfig().setIterators(List.of(iterSetting)).setWait(wait);
config.setIterators(List.of(iterSetting));
}

public static void compact(final AccumuloClient client, String table1, int modulus,
String expectedQueue, boolean wait)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
CompactionConfig config = new CompactionConfig().setWait(wait);
addCompactionIterators(config, modulus, expectedQueue);
client.tableOperations().compact(table1, config);
}

Expand Down
Loading

0 comments on commit 20fbbe8

Please sign in to comment.