Skip to content

Commit

Permalink
Updates from meeting with Keith and Christopher
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 23, 2025
1 parent bf4994c commit 919bca2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 29 deletions.
33 changes: 22 additions & 11 deletions core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -83,7 +82,9 @@ public interface ZooCacheWatcher extends Consumer<WatchedEvent> {}

private volatile boolean closed = false;

private final AtomicLong updateCount = new AtomicLong(0);
private final AtomicLong updateCount = new AtomicLong();

private final AtomicLong zkClientTracker = new AtomicLong();

class ZCacheWatcher implements Watcher {
@Override
Expand Down Expand Up @@ -170,6 +171,7 @@ public ZooCache(ZooSession zk, Set<String> pathsToWatch) {
// visible for tests that use a Ticker
public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) {
this.zk = requireNonNull(zk);
this.zkClientTracker.set(this.getZKClientObjectVersion());
this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
.ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build();
// The concurrent map returned by Caffiene will only allow one thread to run at a time for a
Expand All @@ -184,6 +186,23 @@ public void addZooCacheWatcher(ZooCacheWatcher watcher) {
externalWatchers.add(requireNonNull(watcher));
}

// visible for tests
long getZKClientObjectVersion() {
return zk.getConnectionCounter();
}

private boolean handleZKConnectionChange() {
final long currentCount = getZKClientObjectVersion();
final long oldCount = zkClientTracker.get();
if (oldCount != currentCount) {
if (zkClientTracker.compareAndSet(oldCount, currentCount)) {
setupWatchers(watchedPaths);
}
return true;
}
return false;
}

// Called on construction and when ZooKeeper connection changes
synchronized void setupWatchers(Set<String> pathsToWatch) {

Expand Down Expand Up @@ -221,11 +240,6 @@ private void ensureWatched(String path) {
}
}

// visible for tests
long getZKClientObjectVersion() {
return zk.getConnectionCounter();
}

private abstract class ZooRunnable<T> {
/**
* Runs an operation against ZooKeeper. Retries are performed by the retry method when
Expand Down Expand Up @@ -256,10 +270,8 @@ public T retry() {
while (true) {

try {
long counter = getZKClientObjectVersion();
T result = run();
if (counter != getZKClientObjectVersion()) {
setupWatchers(watchedPaths);
if (handleZKConnectionChange()) {
continue;
}
return result;
Expand Down Expand Up @@ -292,7 +304,6 @@ public T retry() {
} catch (InterruptedException e) {
log.debug("Wait in retry() was interrupted.", e);
}
LockSupport.parkNanos(sleepTime);
if (sleepTime < 10_000) {
sleepTime = (int) (sleepTime + sleepTime * RANDOM.get().nextDouble());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -75,7 +77,7 @@ public static void visitSubTreeDFS(ZooSession zk, final String path, boolean wat
}
}

private class ZooSessionWatcher implements Watcher {
private static class ZooSessionWatcher implements Watcher {

private final String connectionName;
private final AtomicReference<KeeperState> lastState = new AtomicReference<>(null);
Expand Down Expand Up @@ -294,23 +296,29 @@ public void sync(final String path, VoidCallback cb, Object ctx) {

public void addPersistentRecursiveWatchers(Set<String> paths, Watcher watcher)
throws KeeperException, InterruptedException {
boolean sameZkConnection = false;
do {
final long counter = getConnectionCounter();
for (String path : paths) {
verifyConnected().addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE);
sameZkConnection = counter == getConnectionCounter();
if (!sameZkConnection) {
// It's still possible that this last watch was added, let's remove
// it and try the entire set again. Cannot use WatcherType.PersistentRecursive
// here as that was added in 3.9.0. See
// https://issues.apache.org/jira/browse/ZOOKEEPER-4472
verifyConnected().removeWatches(path, watcher, WatcherType.Any, true);
break;

ZooKeeper localZK = verifyConnected();
Set<String> remainingPaths = new HashSet<>(paths);
while (true) {
try {
Iterator<String> remainingPathsIter = remainingPaths.iterator();
while (remainingPathsIter.hasNext()) {
String path = remainingPathsIter.next();
localZK.addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE);
remainingPathsIter.remove();
}
break;
} catch (KeeperException e) {
log.error("Error setting persistent watcher in ZooKeeper, retrying...", e);
ZooKeeper currentZK = verifyConnected();
// If ZooKeeper object is different, then reset the localZK variable
// and start over.
if (localZK != currentZK) {
localZK = currentZK;
remainingPaths = new HashSet<>(paths);
}
log.debug("Added persistent recursive watcher at {}", path);
}
} while (!sameZkConnection);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.isA;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
Expand Down Expand Up @@ -96,8 +97,12 @@ public void setUp() {
zc = new TestZooCache(zk, Set.of(root));
}

@SuppressWarnings("unchecked")
@Test
public void testOverlappingPaths() {
public void testOverlappingPaths() throws Exception {
expect(zk.getConnectionCounter()).andReturn(2L).times(2);
zk.addPersistentRecursiveWatchers(isA(Set.class), isA(Watcher.class));
replay(zk);
assertThrows(IllegalArgumentException.class,
() -> new ZooCache(zk, Set.of(root, root + "/localhost:9995")));

Expand All @@ -115,7 +120,7 @@ public void testOverlappingPaths() {
"/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/mini",
"/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/monitor/lock");
new ZooCache(zk, goodPaths);

verify(zk);
}

@Test
Expand Down

0 comments on commit 919bca2

Please sign in to comment.