From 268c342f41e8527ff08457d64842678fef70b39a Mon Sep 17 00:00:00 2001 From: "445092967@qq.com" <445092967@qq.com> Date: Fri, 30 Jun 2023 12:36:52 +0800 Subject: [PATCH] update --- .../alluxio/worker/dora/DoraMetaManager.java | 44 +++++-- .../alluxio/worker/dora/ListStatusResult.java | 11 ++ .../alluxio/worker/dora/PagedDoraWorker.java | 24 ++-- .../worker/dora/PagedDoraWorkerTest.java | 113 ++++++++++++++++-- 4 files changed, 163 insertions(+), 29 deletions(-) diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java index 1c24f591fbad..26608bf364b2 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java @@ -50,7 +50,11 @@ public class DoraMetaManager implements Closeable { private boolean mPopulateMetadataFingerprint = Configuration.getBoolean(PropertyKey.DORA_WORKER_POPULATE_METADATA_FINGERPRINT); - private final Cache mListStatusCache = Caffeine.newBuilder() + private final long mListingCacheCapacity + = Configuration.getInt(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_NR_FILES); + private final Cache mListStatusCache = mListingCacheCapacity == 0 + ? null + : Caffeine.newBuilder() .maximumWeight(Configuration.getInt(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_NR_FILES)) .weigher((String k, ListStatusResult v) -> v == null ? 0 : v.mUfsStatuses.length) .expireAfterWrite(Configuration.getDuration(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_TTL)) @@ -103,6 +107,7 @@ public Optional loadFromUfs(String path) throws IOException { } else { put(path, fileStatus.get()); } + // TODO(elega) invalidate/update listing cache based on the load result return fileStatus; } @@ -121,10 +126,6 @@ public Optional getFromMetaStore(String path) { * @param meta the file meta */ public void put(String path, FileStatus meta) { - // TODO(elega) as an optimization, we can edit the listing cache to reflect the - // metadata change of a single file instead of invalidate it. - invalidateListingCache(getPathParent(path)); - Optional status = mMetaStore.getDoraMeta(path); if (status.isEmpty() || status.get().getFileInfo().getFolder() @@ -170,11 +171,38 @@ public Optional removeFromMetaStore(String path) { return status; } + /** + * Invalidates the listing cache of a given path. + * @param path the full ufs path + */ public void invalidateListingCache(String path) { - mListStatusCache.invalidate(path); + if (mListStatusCache != null) { + mListStatusCache.invalidate(path); + } } + /** + * Invalidates the listing cache of its parent of a given path. + * If root is specified, the listing cache of root itself will be invalidated. + * @param path the full ufs path + */ + public void invalidateListingCacheOfParent(String path) { + if (mListStatusCache != null) { + mListStatusCache.invalidate(getPathParent(path)); + } + } + + /** + * Get the cached listing result from the listing cache. + * @param path the full ufs path to list + * @param isRecursive if the list is recursive + * @return an Optional of a listStatusResult object. If the object exists but the ufs status + * is empty, it means that the directory does not exist. + */ public Optional listCached(String path, boolean isRecursive) { + if (mListStatusCache == null) { + return Optional.empty(); + } // We don't cache recursive listing result as usually the number of files are too // large to cache. if (isRecursive) { @@ -196,11 +224,13 @@ public Optional listCached(String path, boolean isRecursive) { */ public Optional listFromUfsThenCache(String path, boolean isRecursive) throws IOException, ListOnNonDirectoryException { + if (mListStatusCache == null) { + return listFromUfs(path, isRecursive); + } try { var cached = mListStatusCache.get(path, (k) -> { try { Optional listResults = listFromUfs(path, isRecursive); - // Question: should we cache absent value? return listResults.map( ufsStatuses -> new ListStatusResult(System.nanoTime(), ufsStatuses)) .orElseGet(() -> new ListStatusResult(System.nanoTime(), null)); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/ListStatusResult.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/ListStatusResult.java index e7b923099876..69d9dd0af88a 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/ListStatusResult.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/ListStatusResult.java @@ -1,3 +1,14 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + package alluxio.worker.dora; import alluxio.underfs.UfsStatus; diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java index b4ef7ed8fc77..c2cc3f75fc1f 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java @@ -32,7 +32,6 @@ import alluxio.exception.ListOnNonDirectoryException; import alluxio.exception.runtime.AlluxioRuntimeException; import alluxio.exception.status.NotFoundException; -import alluxio.file.FileId; import alluxio.grpc.Command; import alluxio.grpc.CommandType; import alluxio.grpc.CompleteFilePOptions; @@ -74,7 +73,6 @@ import alluxio.underfs.UnderFileSystem; import alluxio.underfs.UnderFileSystemConfiguration; import alluxio.underfs.options.CreateOptions; -import alluxio.underfs.options.ListOptions; import alluxio.util.CommonUtils; import alluxio.util.ModeUtils; import alluxio.util.executor.ExecutorServiceFactories; @@ -91,8 +89,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.Closer; @@ -105,7 +101,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -139,8 +134,6 @@ public class PagedDoraWorker extends AbstractWorker implements DoraWorker { private WorkerNetAddress mAddress; - private DoraMetaStore mMetaStore; - private final UnderFileSystem mUfs; private final DoraOpenFileHandleContainer mOpenFileHandleContainer; @@ -170,11 +163,6 @@ public PagedDoraWorker( mPageSize = Configuration.global().getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE); mBlockMasterClientPool = new BlockMasterClientPool(); - - String dbDir = Configuration.getString(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR); - Duration duration = Configuration.getDuration(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_TTL); - long ttl = (duration.isNegative() || duration.isZero()) ? -1 : duration.getSeconds(); - mMetaStore = new RocksDBDoraMetaStore(dbDir, ttl); mCacheManager = cacheManager; mMetaManager = mResourceCloser.register( new DoraMetaManager(this, mCacheManager, mUfs)); @@ -250,9 +238,6 @@ public void stop() throws IOException { @Override public void close() throws IOException { - if (mMetaStore != null) { - mMetaStore.close(); - } try (AutoCloseable ignoredCloser = mResourceCloser; AutoCloseable ignoredCacheManager = mCacheManager ) { @@ -682,6 +667,7 @@ public void completeFile(String path, CompleteFilePOptions options, String uuid) mOpenFileHandleContainer.remove(path); handle.close(); Optional status = mMetaManager.loadFromUfs(path); + mMetaManager.invalidateListingCacheOfParent(path); if (status.isEmpty()) { throw new FileNotFoundException("Cannot retrieve file metadata of " + path + " when completing the file"); @@ -721,6 +707,7 @@ public void rename(String src, String dst, RenamePOptions options) } mMetaManager.removeFromMetaStore(src); mMetaManager.loadFromUfs(dst); + mMetaManager.invalidateListingCacheOfParent(dst); } catch (IOException e) { throw new RuntimeException(e); } @@ -732,6 +719,7 @@ public void createDirectory(String path, CreateDirectoryPOptions options) try { boolean success = mUfs.mkdirs(path); mMetaManager.loadFromUfs(path); + mMetaManager.invalidateListingCacheOfParent(path); if (!success) { throw new RuntimeException( new FileAlreadyExistsException(String.format("%s already exists", path))); @@ -779,6 +767,7 @@ public void setAttribute(String path, SetAttributePOptions options) throws IOExc mUfs.setOwner(path, null, options.getGroup()); } mMetaManager.loadFromUfs(path); + mMetaManager.invalidateListingCacheOfParent(path); } @Override @@ -830,6 +819,11 @@ UnderFileSystem getUfs() { return mUfs; } + @VisibleForTesting + DoraMetaManager getMetaManager() { + return mMetaManager; + } + protected void checkCopyPermission(String srcPath, String dstPath) throws AccessControlException, IOException { // No-op diff --git a/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java b/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java index 2825478ab2ba..e1cafb6bd797 100644 --- a/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java +++ b/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java @@ -12,6 +12,10 @@ package alluxio.worker.dora; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -23,8 +27,11 @@ import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; import alluxio.exception.AccessControlException; +import alluxio.grpc.DeletePOptions; +import alluxio.grpc.FileInfo; import alluxio.grpc.FileSystemMasterCommonPOptions; import alluxio.grpc.GetStatusPOptions; +import alluxio.grpc.ListStatusPOptions; import alluxio.grpc.LoadFileFailure; import alluxio.grpc.Route; import alluxio.grpc.RouteFailure; @@ -34,6 +41,7 @@ import alluxio.security.authorization.Mode; import alluxio.underfs.Fingerprint; import alluxio.underfs.UfsStatus; +import alluxio.util.CommonUtils; import alluxio.util.io.BufferUtils; import com.google.common.base.Preconditions; @@ -169,7 +177,7 @@ public void testCopyException() throws IOException, ExecutionException, Interrup mWorker.copy(Collections.singletonList(route), null, writeOptions); List failures = copy.get(); assertEquals(1, failures.size()); - Assert.assertFalse(b.exists()); + assertFalse(b.exists()); } @Test @@ -267,7 +275,7 @@ public void testSingleFileMove() throws IOException, ExecutionException, Interru List failures = move.get(); assertEquals(0, failures.size()); Assert.assertTrue(b.exists()); - Assert.assertFalse(a.exists()); + assertFalse(a.exists()); try (InputStream in = Files.newInputStream(b.toPath())) { byte[] readBuffer = new byte[length]; while (in.read(readBuffer) != -1) { @@ -295,7 +303,7 @@ public void testMoveException() throws IOException, ExecutionException, Interrup mWorker.move(Collections.singletonList(route), null, writeOptions); List failures = move.get(); assertEquals(1, failures.size()); - Assert.assertFalse(b.exists()); + assertFalse(b.exists()); } @Test @@ -318,7 +326,7 @@ public void testSingleFolderMove() throws IOException, ExecutionException, Inter assertEquals(0, failures.size()); Assert.assertTrue(b.exists()); Assert.assertTrue(b.isDirectory()); - Assert.assertFalse(a.exists()); + assertFalse(a.exists()); } @Test @@ -363,9 +371,9 @@ public void testFolderWithFileMove() Assert.assertTrue(b.isDirectory()); Assert.assertTrue(dstD.exists()); Assert.assertTrue(dstD.isDirectory()); - Assert.assertFalse(a.exists()); - Assert.assertFalse(c.exists()); - Assert.assertFalse(d.exists()); + assertFalse(a.exists()); + assertFalse(c.exists()); + assertFalse(d.exists()); try (InputStream in = Files.newInputStream(dstC.toPath())) { byte[] readBuffer = new byte[length]; while (in.read(readBuffer) != -1) { @@ -500,6 +508,97 @@ private void testGetFileInfo(boolean populateFingerprint) FileSystemMasterCommonPOptions.newBuilder().setSyncIntervalMs(0)).build())); } + @Test + public void testListStatus() throws IOException, AccessControlException { + File rootFolder = mTestFolder.newFolder("root"); + String rootPath = rootFolder.getAbsolutePath(); + File f1 = mTestFolder.newFile("root/f1"); + String f1Path = f1.getAbsolutePath(); + mTestFolder.newFile("root/f2"); + mTestFolder.newFolder("root/d1"); + mTestFolder.newFile("root/d1/f1"); + + // Happy path: list non recursively + UfsStatus[] listResult = + mWorker.listStatus(rootPath, ListStatusPOptions.newBuilder().setRecursive(false).build()); + assertNotNull(listResult); + assertEquals(3, listResult.length); + assertSame(listResult, + mWorker.getMetaManager().listCached(rootPath, false).get().mUfsStatuses); + + // Happy path: list non recursively with a sync interval; cache should be updated. + CommonUtils.sleepMs(200); + UfsStatus[] listResult2 = + mWorker.listStatus(rootPath, + ListStatusPOptions.newBuilder().setRecursive(false) + .setCommonOptions( + FileSystemMasterCommonPOptions.newBuilder().setSyncIntervalMs(100)) + .build()); + assertNotSame(listResult, listResult2); + assertNotNull(listResult2); + assertEquals(3, listResult2.length); + + // Happy path: list recursively; cache should not be filled + UfsStatus[] listResult3 = + mWorker.listStatus(rootPath, ListStatusPOptions.newBuilder().setRecursive(true).build()); + assertNotNull(listResult3); + assertEquals(4, listResult3.length); + assertTrue(mWorker.getMetaManager().listCached(rootPath, true).isEmpty()); + + // Happy path: list a file, the file should be the only element in the ufs status array. + // The file should not be included in the listing cache. + UfsStatus[] listResult4 = + mWorker.listStatus(f1Path, ListStatusPOptions.getDefaultInstance()); + assertNotNull(listResult4); + // The result only contains the file itself + assertEquals(1, listResult4.length); + assertTrue(mWorker.getMetaManager().listCached(f1Path, true).isEmpty()); + + // Error case: list a path that does not exist + assertThrows(FileNotFoundException.class, () -> { + mWorker.listStatus(rootPath + "/non-existing-path", + ListStatusPOptions.getDefaultInstance()); + }); + } + + @Test + public void testListCacheConsistency() + throws IOException, AccessControlException, ExecutionException, InterruptedException, + TimeoutException { + String fileContent = "foobar"; + File rootFolder = mTestFolder.newFolder("root"); + String rootPath = rootFolder.getAbsolutePath(); + File f = mTestFolder.newFile("root/f"); + Files.write(f.toPath(), fileContent.getBytes()); + + UfsStatus[] listResult = + mWorker.listStatus(rootPath, ListStatusPOptions.newBuilder().setRecursive(false).build()); + assertNotNull(listResult); + assertEquals(1, listResult.length); + + FileInfo fileInfo = mWorker.getGrpcFileInfo(f.getPath(), 0); + loadFileData(f.getPath()); + assertNotNull(fileInfo); + + // Assert that page cache, metadata cache & list cache all cached data properly + assertTrue(mWorker.getMetaManager().getFromMetaStore(f.getPath()).isPresent()); + assertSame(listResult, + mWorker.getMetaManager().listCached(rootPath, false).get().mUfsStatuses); + List cachedPages = + mCacheManager.getCachedPageIdsByFileId( + new AlluxioURI(f.getPath()).hash(), fileContent.length()); + assertEquals(1, cachedPages.size()); + + mWorker.delete(f.getAbsolutePath(), DeletePOptions.getDefaultInstance()); + // Assert that page cache, metadata cache & list cache all removed stale data + assertTrue(mWorker.getMetaManager().getFromMetaStore(f.getPath()).isEmpty()); + assertTrue(mWorker.getMetaManager().listCached(rootPath, false).isEmpty()); + cachedPages = + mCacheManager.getCachedPageIdsByFileId( + new AlluxioURI(f.getPath()).hash(), fileContent.length()); + assertEquals(0, cachedPages.size()); + } + private void testGetFileInfoDir(boolean populateFingerprint) throws AccessControlException, IOException { mWorker.setPopulateMetadataFingerprint(populateFingerprint);