Skip to content

Commit

Permalink
Moved ZooCache management into ZooSession
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 22, 2025
1 parent 4da2f2c commit e5b9308
Show file tree
Hide file tree
Showing 33 changed files with 300 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@
import org.apache.accumulo.core.data.KeyValue;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
Expand All @@ -105,6 +103,8 @@
import org.apache.accumulo.core.util.tables.TableZooHelper;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.zookeeper.ZcStat;
import org.apache.accumulo.core.zookeeper.ZooCache;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -238,8 +238,8 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
return zk;
});

this.zooCache = memoize(() -> new ZooCache(getZooSession(),
createPersistentWatcherPaths(ZooUtil.getRoot(getInstanceID()))));
this.zooCache = memoize(() -> getZooSession()
.getCache(createPersistentWatcherPaths(ZooUtil.getRoot(getInstanceID()))).get());
this.accumuloConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
() -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS);
Expand Down Expand Up @@ -1068,7 +1068,10 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() {
var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get();
String zkRoot = getZooKeeperRoot();
this.zkLockChecker =
new ZookeeperLockChecker(new ZooCache(zk, Set.of(zkRoot + Constants.ZTSERVERS)), zkRoot);
new ZookeeperLockChecker(zk.getCache(Set.of(
zkRoot + RootTable.ZROOT_TABLET,
zkRoot + Constants.ZSSERVERS,
zkRoot + Constants.ZTSERVERS)).get(), zkRoot);
}
return this.zkLockChecker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.zookeeper.ZcStat;
import org.apache.accumulo.core.zookeeper.ZooCache;
import org.apache.zookeeper.KeeperException;

import com.google.common.collect.ImmutableSortedMap;
Expand Down Expand Up @@ -147,7 +148,7 @@ public static Map<String,String> deserialize(byte[] data) {
private synchronized void update() {
final ZooCache zc = context.getZooCache();
final String zPath = context.getZooKeeperRoot() + Constants.ZNAMESPACES;
final ZooCache.ZcStat stat = new ZooCache.ZcStat();
final ZcStat stat = new ZcStat();

byte[] data = zc.get(zPath, stat);
if (stat.getMzxid() > lastMzxid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.TabletServerLockChecker;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockPaths;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.zookeeper.ZooCache;

import com.google.common.net.HostAndPort;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
import java.util.Optional;
import java.util.UUID;

import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UuidUtil;
import org.apache.accumulo.core.zookeeper.ZcStat;
import org.apache.accumulo.core.zookeeper.ZooCache;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -672,7 +672,7 @@ public static Optional<ServiceLockData> getLockData(ZooSession zk, ServiceLockPa
}

public static Optional<ServiceLockData> getLockData(
org.apache.accumulo.core.fate.zookeeper.ZooCache zc, ServiceLockPath path, ZcStat stat) {
org.apache.accumulo.core.zookeeper.ZooCache zc, ServiceLockPath path, ZcStat stat) {

List<String> children = validateAndSort(path, zc.getChildren(path.toString()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import java.util.function.Predicate;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.util.threads.ThreadPoolNames;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.zookeeper.ZcStat;
import org.apache.accumulo.core.zookeeper.ZooCache;

import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
Expand All @@ -76,6 +75,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.zookeeper.ZcStat;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -663,7 +663,7 @@ public static synchronized Set<TServerInstance> getLiveTServers(ClientContext co
*/
private static Optional<TServerInstance> checkTabletServer(ClientContext context,
ServiceLockPath slp) {
ZooCache.ZcStat stat = new ZooCache.ZcStat();
ZcStat stat = new ZcStat();
log.trace("Checking server at ZK path: {}", slp);
return ServiceLock.getLockData(context.getZooCache(), slp, stat)
.map(sld -> sld.getAddress(ServiceLockData.ThriftService.TSERV))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.accumulo.core.clientImpl.AccumuloServerException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
Expand All @@ -45,6 +44,7 @@
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.Exec;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.ExecVoid;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.zookeeper.ZooCache;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
Expand All @@ -50,6 +49,7 @@
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.zookeeper.ZcStat;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.accumulo.core.clientImpl.Namespaces;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.zookeeper.ZooCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import org.apache.accumulo.core.clientImpl.Namespaces;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.core.zookeeper.ZooCache;

import com.github.benmanes.caffeine.cache.Cache;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.fate.zookeeper;
package org.apache.accumulo.core.zookeeper;

import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -48,7 +48,7 @@
class ZcNode {

private final byte[] data;
private final ZooCache.ZcStat stat;
private final ZcStat stat;
private final List<String> children;

static final ZcNode NON_EXISTENT = new ZcNode();
Expand Down Expand Up @@ -82,7 +82,7 @@ private ZcNode() {
* Creates a new ZcNode that combines the children from an existing ZcNode and sets the data and
* stat.
*/
ZcNode(byte[] data, ZooCache.ZcStat zstat, ZcNode existing) {
ZcNode(byte[] data, ZcStat zstat, ZcNode existing) {
this.data = Objects.requireNonNull(data);
this.stat = Objects.requireNonNull(zstat);
if (existing == null) {
Expand All @@ -108,7 +108,7 @@ byte[] getData() {
* exist
* @throws IllegalStateException in the case where the node exists and the data was never set
*/
ZooCache.ZcStat getStat() {
ZcStat getStat() {
Preconditions.checkState(cachedData());
return stat;
}
Expand Down
53 changes: 53 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/zookeeper/ZcStat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.zookeeper;

import org.apache.zookeeper.data.Stat;

import com.google.common.annotations.VisibleForTesting;

public class ZcStat {
private long ephemeralOwner;
private long mzxid;

public ZcStat() {}

public ZcStat(Stat stat) {
this.ephemeralOwner = stat.getEphemeralOwner();
this.mzxid = stat.getMzxid();
}

public long getEphemeralOwner() {
return ephemeralOwner;
}

public void set(ZcStat cachedStat) {
this.ephemeralOwner = cachedStat.ephemeralOwner;
this.mzxid = cachedStat.mzxid;
}

@VisibleForTesting
public void setEphemeralOwner(long ephemeralOwner) {
this.ephemeralOwner = ephemeralOwner;
}

public long getMzxid() {
return mzxid;
}
}
107 changes: 107 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.zookeeper;

import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.zookeeper.WatchedEvent;

public interface ZooCache {

interface ZooCacheWatcher extends Consumer<WatchedEvent> {}

/**
* Add a ZooCacheWatcher object to this ZooCache
*
* @param watcher
*/
void addZooCacheWatcher(ZooCacheWatcher watcher);

/**
* Gets the children of the given node. A watch is established by this call.
*
* @param zPath path of node
* @return children list, or null if node has no children or does not exist
*/
List<String> getChildren(String zPath);

/**
* Gets data at the given path. Status information is not returned. A watch is established by this
* call.
*
* @param zPath path to get
* @return path data, or null if non-existent
*/
byte[] get(String zPath);

/**
* Gets data at the given path, filling status information into the given <code>Stat</code>
* object. A watch is established by this call.
*
* @param zPath path to get
* @param status status object to populate
* @return path data, or null if non-existent
*/
byte[] get(String zPath, ZcStat status);

/**
* Returns a monotonically increasing count of the number of time the cache was updated. If the
* count is the same, then it means cache did not change.
*/
long getUpdateCount();

/**
* Checks if a data value (or lack of one) is cached.
*
* @param zPath path of node
* @return true if data value is cached
*/
boolean dataCached(String zPath);

/**
* Checks if children of a node (or lack of them) are cached.
*
* @param zPath path of node
* @return true if children are cached
*/
boolean childrenCached(String zPath);

/**
* Removes all paths in the cache match the predicate.
*/
void clear(Predicate<String> pathPredicate);

/**
* Clears this cache of all information about nodes rooted at the given path.
*
* @param zPath path of top node
*/
void clear(String zPath);

/**
* Gets the lock data from the node in the cache at the specified path
*/
Optional<ServiceLockData> getLockData(ServiceLockPath path);

}
Loading

0 comments on commit e5b9308

Please sign in to comment.