diff --git a/dora/core/common/pom.xml b/dora/core/common/pom.xml index 1663d3f802ae..fdaae5c91a49 100644 --- a/dora/core/common/pom.xml +++ b/dora/core/common/pom.xml @@ -55,6 +55,10 @@ com.google.code.gson gson + + com.github.ben-manes.caffeine + caffeine + com.google.guava guava diff --git a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java index 67c01b716ba4..36456086b84e 100755 --- a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -7927,10 +7927,12 @@ public String toString() { .setScope(Scope.WORKER) .build(); - public static final PropertyKey DORA_UFS_LIST_STATUS_CACHE_NR_DIRS = - intBuilder(Name.DORA_UFS_LIST_STATUS_CACHE_NR_DIRS) - .setDefaultValue(50) - .setDescription("Number of the file/dir cache of UFS list status results") + public static final PropertyKey DORA_UFS_LIST_STATUS_CACHE_NR_FILES = + intBuilder(Name.DORA_UFS_LIST_STATUS_CACHE_NR_FILES) + .setDefaultValue(100000) + .setDescription("Number of the file ufs statuses of UFS listing cache. " + + "The more files a directory contain, the more weight it is counted " + + "when the cache capacity is calculated.") .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.WORKER) .build(); @@ -9585,8 +9587,8 @@ public static final class Name { public static final String DORA_UFS_LIST_STATUS_CACHE_TTL = "alluxio.dora.ufs.list.status.cache.ttl"; - public static final String DORA_UFS_LIST_STATUS_CACHE_NR_DIRS = - "alluxio.dora.ufs.list.status.cache.nr.dirs"; + public static final String DORA_UFS_LIST_STATUS_CACHE_NR_FILES = + "alluxio.dora.ufs.list.status.cache.nr.files"; // // Extra class to be loaded diff --git a/dora/core/common/src/main/java/alluxio/exception/ListOnNonDirectoryException.java b/dora/core/common/src/main/java/alluxio/exception/ListOnNonDirectoryException.java new file mode 100644 index 000000000000..fb9dac6a6416 --- /dev/null +++ b/dora/core/common/src/main/java/alluxio/exception/ListOnNonDirectoryException.java @@ -0,0 +1,64 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.exception; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * Exception for situations where a listing operation happens on a non-directory file/object. + */ +@ThreadSafe +public class ListOnNonDirectoryException extends AlluxioException { + private static final long serialVersionUID = -98364579823401L; + + /** + * Constructs a new exception with the specified detail message. + * + * @param message the detail message + */ + public ListOnNonDirectoryException(String message) { + super(message); + } + + /** + * Constructs a new exception with the specified detail message and cause. + * + * @param message the detail message + * @param cause the cause + */ + public ListOnNonDirectoryException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a new exception with the specified exception message and multiple parameters. + * + * @param message the exception message + * @param params the parameters + */ + public ListOnNonDirectoryException(ExceptionMessage message, Object... params) { + this(message.getMessage(params)); + } + + /** + * Constructs a new exception with the specified exception message, the cause and multiple + * parameters. + * + * @param message the exception message + * @param cause the cause + * @param params the parameters + */ + public ListOnNonDirectoryException(ExceptionMessage message, Throwable cause, + Object... params) { + this(message.getMessage(params), cause); + } +} diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java index 814a2b5e3e11..1c24f591fbad 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java @@ -13,20 +13,27 @@ import alluxio.AlluxioURI; import alluxio.client.file.cache.CacheManager; -import alluxio.client.file.cache.PageId; import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; +import alluxio.exception.InvalidPathException; +import alluxio.exception.ListOnNonDirectoryException; import alluxio.file.FileId; import alluxio.proto.meta.DoraMeta; import alluxio.proto.meta.DoraMeta.FileStatus; import alluxio.underfs.Fingerprint; import alluxio.underfs.UfsStatus; import alluxio.underfs.UnderFileSystem; +import alluxio.underfs.options.ListOptions; +import alluxio.util.io.PathUtils; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; +import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.time.Duration; import java.util.Optional; /** @@ -35,25 +42,33 @@ * TODO(elega) Invalidating page cache synchronously causes performance issue and currently it * also lacks concurrency control. Address this problem in the future. */ -public class DoraMetaManager { - private final DoraMetaStore mMetastore; +public class DoraMetaManager implements Closeable { + private final DoraMetaStore mMetaStore; private final CacheManager mCacheManager; private final PagedDoraWorker mDoraWorker; private final UnderFileSystem mUfs; private boolean mPopulateMetadataFingerprint = Configuration.getBoolean(PropertyKey.DORA_WORKER_POPULATE_METADATA_FINGERPRINT); + private final Cache mListStatusCache = Caffeine.newBuilder() + .maximumWeight(Configuration.getInt(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_NR_FILES)) + .weigher((String k, ListStatusResult v) -> v == null ? 0 : v.mUfsStatuses.length) + .expireAfterWrite(Configuration.getDuration(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_TTL)) + .build(); + /** * Creates a dora meta manager. * @param doraWorker the dora worker instance - * @param metaStore the dora meta store * @param cacheManger the cache manager to manage the page cache * @param ufs the associated ufs */ public DoraMetaManager( - PagedDoraWorker doraWorker, DoraMetaStore metaStore, CacheManager cacheManger, + PagedDoraWorker doraWorker, CacheManager cacheManger, UnderFileSystem ufs) { - mMetastore = metaStore; + String dbDir = Configuration.getString(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR); + Duration duration = Configuration.getDuration(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_TTL); + long ttl = (duration.isNegative() || duration.isZero()) ? -1 : duration.getSeconds(); + mMetaStore = new RocksDBDoraMetaStore(dbDir, ttl); mCacheManager = cacheManger; mDoraWorker = doraWorker; mUfs = ufs; @@ -97,7 +112,7 @@ public Optional loadFromUfs(String path) throws IOException { * @return the file status, or empty optional if not found */ public Optional getFromMetaStore(String path) { - return mMetastore.getDoraMeta(path); + return mMetaStore.getDoraMeta(path); } /** @@ -106,11 +121,15 @@ public Optional getFromMetaStore(String path) { * @param meta the file meta */ public void put(String path, FileStatus meta) { - Optional status = mMetastore.getDoraMeta(path); + // TODO(elega) as an optimization, we can edit the listing cache to reflect the + // metadata change of a single file instead of invalidate it. + invalidateListingCache(getPathParent(path)); + + Optional status = mMetaStore.getDoraMeta(path); if (status.isEmpty() || status.get().getFileInfo().getFolder() || status.get().getFileInfo().getLength() == 0) { - mMetastore.putDoraMeta(path, meta); + mMetaStore.putDoraMeta(path, meta); return; } if (mPopulateMetadataFingerprint) { @@ -128,12 +147,12 @@ public void put(String path, FileStatus meta) { && newFingerprint.isValid() && originalFingerprint.matchContent(newFingerprint); if (!contentMatched) { - invalidateCachedFile(path, status.get().getFileInfo().getLength()); + invalidateCachedFile(path); } } else { - invalidateCachedFile(path, status.get().getFileInfo().getLength()); + invalidateCachedFile(path); } - mMetastore.putDoraMeta(path, meta); + mMetaStore.putDoraMeta(path, meta); } /** @@ -142,19 +161,86 @@ public void put(String path, FileStatus meta) { * @return the removed file meta, if exists */ public Optional removeFromMetaStore(String path) { - Optional status = mMetastore.getDoraMeta(path); + invalidateListingCache(getPathParent(path)); + Optional status = mMetaStore.getDoraMeta(path); if (status.isPresent()) { - mMetastore.removeDoraMeta(path); - invalidateCachedFile(path, status.get().getFileInfo().getLength()); + mMetaStore.removeDoraMeta(path); } + invalidateCachedFile(path); return status; } - private void invalidateCachedFile(String path, long length) { - FileId fileId = FileId.of(AlluxioURI.hash(path)); - mCacheManager.deleteFile(fileId.toString()); - for (PageId page: mCacheManager.getCachedPageIdsByFileId(fileId.toString(), length)) { - mCacheManager.delete(page); + public void invalidateListingCache(String path) { + mListStatusCache.invalidate(path); + } + + public Optional listCached(String path, boolean isRecursive) { + // We don't cache recursive listing result as usually the number of files are too + // large to cache. + if (isRecursive) { + return Optional.empty(); + } + ListStatusResult result = mListStatusCache.getIfPresent(path); + return Optional.ofNullable(result); + } + + /** + * Lists a directory from UFS and cache it into the listing cache if it exists. + * @param path the ufs path + * @param isRecursive if the listing is recursive + * @return an empty option if the directory does not exist, otherwise an option contains an + * ufs status array. + * @throws IOException if the UFS call failed + * @throws ListOnNonDirectoryException if the target path exists but not a valid listing target + * (e.g.) the path is a file or object. + */ + public Optional listFromUfsThenCache(String path, boolean isRecursive) + throws IOException, ListOnNonDirectoryException { + try { + var cached = mListStatusCache.get(path, (k) -> { + try { + Optional listResults = listFromUfs(path, isRecursive); + // Question: should we cache absent value? + return listResults.map( + ufsStatuses -> new ListStatusResult(System.nanoTime(), ufsStatuses)) + .orElseGet(() -> new ListStatusResult(System.nanoTime(), null)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return Optional.ofNullable(cached.mUfsStatuses); + } catch (RuntimeException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + if (e.getCause() instanceof ListOnNonDirectoryException) { + throw (ListOnNonDirectoryException) e.getCause(); + } + throw new RuntimeException(e); + } + } + + /** + * Lists a directory from UFS. + * @param path the ufs path + * @param isRecursive if the listing is recursive + * @return an empty option if the directory does not exist, otherwise an option contains an + * ufs status array. + * @throws IOException if the UFS call failed + * @throws ListOnNonDirectoryException if the target path exists but not a valid listing target + * (e.g.) the path is a file or object. + */ + public Optional listFromUfs(String path, boolean isRecursive) + throws IOException, ListOnNonDirectoryException { + ListOptions ufsListOptions = ListOptions.defaults().setRecursive(isRecursive); + try { + UfsStatus[] listResults = mUfs.listStatus(path, ufsListOptions); + if (listResults == null) { + throw new ListOnNonDirectoryException(path + " is not a directory"); + } + return Optional.of(listResults); + } catch (FileNotFoundException e) { + return Optional.empty(); } } @@ -162,4 +248,25 @@ private void invalidateCachedFile(String path, long length) { void setPopulateMetadataFingerprint(boolean value) { mPopulateMetadataFingerprint = value; } + + private void invalidateCachedFile(String path) { + FileId fileId = FileId.of(AlluxioURI.hash(path)); + mCacheManager.deleteFile(fileId.toString()); + } + + private String getPathParent(String path) { + if (path.equals("/")) { + return path; + } + try { + return PathUtils.getParent(path); + } catch (InvalidPathException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + mMetaStore.close(); + } } diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/ListStatusResult.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/ListStatusResult.java new file mode 100644 index 000000000000..e7b923099876 --- /dev/null +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/ListStatusResult.java @@ -0,0 +1,16 @@ +package alluxio.worker.dora; + +import alluxio.underfs.UfsStatus; + +/** + * The list status results stored in the cache. + */ +public class ListStatusResult { + public long mTimeStamp; + public UfsStatus[] mUfsStatuses; + + ListStatusResult(long timeStamp, UfsStatus[] ufsStatuses) { + mTimeStamp = timeStamp; + mUfsStatuses = ufsStatuses; + } +} diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java index 90a44660a1b8..b4ef7ed8fc77 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java @@ -29,6 +29,7 @@ import alluxio.conf.PropertyKey; import alluxio.exception.AccessControlException; import alluxio.exception.FileAlreadyExistsException; +import alluxio.exception.ListOnNonDirectoryException; import alluxio.exception.runtime.AlluxioRuntimeException; import alluxio.exception.status.NotFoundException; import alluxio.file.FileId; @@ -136,17 +137,6 @@ public class PagedDoraWorker extends AbstractWorker implements DoraWorker { private boolean mPopulateMetadataFingerprint = Configuration.getBoolean(PropertyKey.DORA_WORKER_POPULATE_METADATA_FINGERPRINT); - private static class ListStatusResult { - public long mTimeStamp; - public UfsStatus[] mUfsStatuses; - - ListStatusResult(long timeStamp, UfsStatus[] ufsStatuses) { - mTimeStamp = timeStamp; - mUfsStatuses = ufsStatuses; - } - } - - private final Cache mListStatusCache; private WorkerNetAddress mAddress; private DoraMetaStore mMetaStore; @@ -177,10 +167,6 @@ public PagedDoraWorker( mUfs = UnderFileSystem.Factory.create( mRootUFS, UnderFileSystemConfiguration.defaults(Configuration.global())); - mListStatusCache = CacheBuilder.newBuilder() - .maximumSize(Configuration.getInt(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_NR_DIRS)) - .expireAfterWrite(Configuration.getDuration(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_TTL)) - .build(); mPageSize = Configuration.global().getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE); mBlockMasterClientPool = new BlockMasterClientPool(); @@ -190,7 +176,8 @@ public PagedDoraWorker( long ttl = (duration.isNegative() || duration.isZero()) ? -1 : duration.getSeconds(); mMetaStore = new RocksDBDoraMetaStore(dbDir, ttl); mCacheManager = cacheManager; - mMetaManager = new DoraMetaManager(this, mMetaStore, mCacheManager, mUfs); + mMetaManager = mResourceCloser.register( + new DoraMetaManager(this, mCacheManager, mUfs)); mOpenFileHandleContainer = new DoraOpenFileHandleContainer(); } @@ -290,65 +277,26 @@ public UfsStatus[] listStatus(String path, ListStatusPOptions options) ? options.getCommonOptions().getSyncIntervalMs() : -1) : -1; - final boolean skipCache = options.hasRecursive() && options.getRecursive(); - final UfsStatus[] cachedStatuses; - final ListStatusResult resultFromCache = mListStatusCache.getIfPresent(path); - if (resultFromCache == null) { - cachedStatuses = null; - } else if (skipCache) { - // Only use the cached result when its not recursive listing - cachedStatuses = null; - } else { - // Metadata is cached. Check if it is expired. - if (syncIntervalMs >= 0 - && System.nanoTime() - resultFromCache.mTimeStamp > syncIntervalMs * Constants.MS_NANO) { - // The metadata is expired. Remove it from in-memory cache. - mListStatusCache.invalidate(path); - cachedStatuses = null; - } else { - // Cache is still valid. Use cached statuses. - cachedStatuses = resultFromCache.mUfsStatuses; - } - } - if (cachedStatuses != null) { - return cachedStatuses; - } - - // Not found in cache. Query the Under File System. - ListOptions ufsListOptions = ListOptions.defaults().setRecursive( - options.hasRecursive() ? options.getRecursive() : false); - UfsStatus[] freshStatusesFromUfs = mUfs.listStatus(path, ufsListOptions); + boolean isRecursive = options.getRecursive(); + final Optional resultFromCache = mMetaManager.listCached(path, isRecursive); - if (freshStatusesFromUfs == null) { - // If empty, the request path might be a regular file/object. Let's retry getStatus(). - try { - UfsStatus status = mUfs.getStatus(path); - // Success. Create an array with only one element. - status.setName(""); // listStatus() expects relative name to the @path. - freshStatusesFromUfs = new UfsStatus[1]; - freshStatusesFromUfs[0] = status; - } catch (FileNotFoundException e) { - // Do nothing. - // The freshStatusesFromUfs is still null to indicate empty listStatus() result. - } + if (resultFromCache.isPresent() + && (syncIntervalMs < 0 + || System.nanoTime() - resultFromCache.get().mTimeStamp + <= syncIntervalMs * Constants.MS_NANO)) { + return resultFromCache.get().mUfsStatuses; } - - // Add this into cache. Return value of listStatus() might be null if not found. - if (freshStatusesFromUfs != null && !skipCache) { - ListStatusResult newResult = new ListStatusResult(System.nanoTime(), freshStatusesFromUfs); - mListStatusCache.put(path, newResult); + mMetaManager.invalidateListingCache(path); + try { + return mMetaManager.listFromUfsThenCache(path, isRecursive).orElseGet(() -> new UfsStatus[0]); + } catch (ListOnNonDirectoryException e) { + // If path is not a directory, the request path might be a regular file/object. + // Try getStatus() instead. + UfsStatus status = mUfs.getStatus(path); + // Success. Create an array with only one element. + status.setName(""); // listStatus() expects relative name to the @path. + return new UfsStatus[] {status}; } - return freshStatusesFromUfs; - } - - /** - * Invalidate the given cached File by deleting it from local cache. - * - * @param path the full path of this file - */ - private void invalidateCachedFile(String path) { - FileId file = FileId.of(new AlluxioURI(path).hash()); - mCacheManager.deleteFile(file.toString()); } @Override @@ -696,8 +644,7 @@ public OpenFileHandle createFile(String path, CreateFilePOptions options) new FileAlreadyExistsException("File already exists but no overwrite flag")); } else if (overWrite) { // client is going to overwrite this file. We need to invalidate the cached meta and data. - invalidateFileMeta(path); - invalidateCachedFile(path); + mMetaManager.removeFromMetaStore(path); } // Open UFS OutputStream and use it in write operation. @@ -727,20 +674,6 @@ public OpenFileHandle createFile(String path, CreateFilePOptions options) return handle; } - private void invalidateFileMeta(String path) { - // The simplest way of updating metadata is invalidating cache in worker. - // Next time, worker will get fresh metadata from ufs. - AlluxioURI fullPathUri = new AlluxioURI(path); - AlluxioURI parentDir; - if (fullPathUri.isRoot()) { - parentDir = fullPathUri; - } else { - parentDir = fullPathUri.getParent(); - } - mListStatusCache.invalidate(parentDir.toString()); // invalidate dir cache - mMetaStore.removeDoraMeta(path); // invalidate in-Rocks cache - } - @Override public void completeFile(String path, CompleteFilePOptions options, String uuid) throws IOException, AccessControlException { @@ -760,16 +693,16 @@ public void completeFile(String path, CompleteFilePOptions options, String uuid) public void delete(String path, DeletePOptions options) throws IOException, AccessControlException { try { - invalidateFileMeta(path); - invalidateCachedFile(path); + mMetaManager.removeFromMetaStore(path); // TODO(hua) Close the open file handle? - - UfsStatus status = mUfs.getStatus(path); - if (status.isFile()) { - mUfs.deleteFile(path); - } else { - mUfs.deleteDirectory(path); + if (!options.getAlluxioOnly()) { + UfsStatus status = mUfs.getStatus(path); + if (status.isFile()) { + mUfs.deleteFile(path); + } else { + mUfs.deleteDirectory(path); + } } } catch (IOException e) { throw new RuntimeException(e); @@ -780,17 +713,14 @@ public void delete(String path, DeletePOptions options) throws IOException, public void rename(String src, String dst, RenamePOptions options) throws IOException, AccessControlException { try { - invalidateFileMeta(src); - invalidateCachedFile(src); - invalidateFileMeta(dst); - invalidateCachedFile(dst); - UfsStatus status = mUfs.getStatus(src); if (status.isFile()) { mUfs.renameFile(src, dst); } else { mUfs.renameDirectory(src, dst); } + mMetaManager.removeFromMetaStore(src); + mMetaManager.loadFromUfs(dst); } catch (IOException e) { throw new RuntimeException(e); } @@ -800,10 +730,8 @@ public void rename(String src, String dst, RenamePOptions options) public void createDirectory(String path, CreateDirectoryPOptions options) throws IOException, AccessControlException { try { - invalidateFileMeta(path); - invalidateCachedFile(path); - boolean success = mUfs.mkdirs(path); + mMetaManager.loadFromUfs(path); if (!success) { throw new RuntimeException( new FileAlreadyExistsException(String.format("%s already exists", path))); diff --git a/pom.xml b/pom.xml index 98ee5640daaf..581591bb5782 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ 2.20.56 build 1.2.1 + 3.1.6 2.3.13 1.54.1 2.8.9 @@ -248,6 +249,11 @@ jackson-dataformat-xml ${jackson.version} + + com.github.ben-manes.caffeine + caffeine + 3.1.6 + com.github.serceman jnr-fuse