Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
elega committed Jun 30, 2023
1 parent b8b1d0a commit 268c342
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ public class DoraMetaManager implements Closeable {
private boolean mPopulateMetadataFingerprint =
Configuration.getBoolean(PropertyKey.DORA_WORKER_POPULATE_METADATA_FINGERPRINT);

private final Cache<String, ListStatusResult> mListStatusCache = Caffeine.newBuilder()
private final long mListingCacheCapacity
= Configuration.getInt(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_NR_FILES);
private final Cache<String, ListStatusResult> mListStatusCache = mListingCacheCapacity == 0
? null
: Caffeine.newBuilder()
.maximumWeight(Configuration.getInt(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_NR_FILES))
.weigher((String k, ListStatusResult v) -> v == null ? 0 : v.mUfsStatuses.length)
.expireAfterWrite(Configuration.getDuration(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_TTL))
Expand Down Expand Up @@ -103,6 +107,7 @@ public Optional<FileStatus> loadFromUfs(String path) throws IOException {
} else {
put(path, fileStatus.get());
}
// TODO(elega) invalidate/update listing cache based on the load result
return fileStatus;
}

Expand All @@ -121,10 +126,6 @@ public Optional<FileStatus> getFromMetaStore(String path) {
* @param meta the file meta
*/
public void put(String path, FileStatus meta) {
// TODO(elega) as an optimization, we can edit the listing cache to reflect the
// metadata change of a single file instead of invalidate it.
invalidateListingCache(getPathParent(path));

Optional<FileStatus> status = mMetaStore.getDoraMeta(path);
if (status.isEmpty()
|| status.get().getFileInfo().getFolder()
Expand Down Expand Up @@ -170,11 +171,38 @@ public Optional<FileStatus> removeFromMetaStore(String path) {
return status;
}

/**
* Invalidates the listing cache of a given path.
* @param path the full ufs path
*/
public void invalidateListingCache(String path) {
mListStatusCache.invalidate(path);
if (mListStatusCache != null) {
mListStatusCache.invalidate(path);
}
}

/**
* Invalidates the listing cache of its parent of a given path.
* If root is specified, the listing cache of root itself will be invalidated.
* @param path the full ufs path
*/
public void invalidateListingCacheOfParent(String path) {
if (mListStatusCache != null) {
mListStatusCache.invalidate(getPathParent(path));
}
}

/**
* Get the cached listing result from the listing cache.
* @param path the full ufs path to list
* @param isRecursive if the list is recursive
* @return an Optional of a listStatusResult object. If the object exists but the ufs status
* is empty, it means that the directory does not exist.
*/
public Optional<ListStatusResult> listCached(String path, boolean isRecursive) {
if (mListStatusCache == null) {
return Optional.empty();
}
// We don't cache recursive listing result as usually the number of files are too
// large to cache.
if (isRecursive) {
Expand All @@ -196,11 +224,13 @@ public Optional<ListStatusResult> listCached(String path, boolean isRecursive) {
*/
public Optional<UfsStatus[]> listFromUfsThenCache(String path, boolean isRecursive)
throws IOException, ListOnNonDirectoryException {
if (mListStatusCache == null) {
return listFromUfs(path, isRecursive);
}
try {
var cached = mListStatusCache.get(path, (k) -> {
try {
Optional<UfsStatus[]> listResults = listFromUfs(path, isRecursive);
// Question: should we cache absent value?
return listResults.map(
ufsStatuses -> new ListStatusResult(System.nanoTime(), ufsStatuses))
.orElseGet(() -> new ListStatusResult(System.nanoTime(), null));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.worker.dora;

import alluxio.underfs.UfsStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import alluxio.exception.ListOnNonDirectoryException;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.status.NotFoundException;
import alluxio.file.FileId;
import alluxio.grpc.Command;
import alluxio.grpc.CommandType;
import alluxio.grpc.CompleteFilePOptions;
Expand Down Expand Up @@ -74,7 +73,6 @@
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemConfiguration;
import alluxio.underfs.options.CreateOptions;
import alluxio.underfs.options.ListOptions;
import alluxio.util.CommonUtils;
import alluxio.util.ModeUtils;
import alluxio.util.executor.ExecutorServiceFactories;
Expand All @@ -91,8 +89,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
Expand All @@ -105,7 +101,6 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -139,8 +134,6 @@ public class PagedDoraWorker extends AbstractWorker implements DoraWorker {

private WorkerNetAddress mAddress;

private DoraMetaStore mMetaStore;

private final UnderFileSystem mUfs;

private final DoraOpenFileHandleContainer mOpenFileHandleContainer;
Expand Down Expand Up @@ -170,11 +163,6 @@ public PagedDoraWorker(

mPageSize = Configuration.global().getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE);
mBlockMasterClientPool = new BlockMasterClientPool();

String dbDir = Configuration.getString(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR);
Duration duration = Configuration.getDuration(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_TTL);
long ttl = (duration.isNegative() || duration.isZero()) ? -1 : duration.getSeconds();
mMetaStore = new RocksDBDoraMetaStore(dbDir, ttl);
mCacheManager = cacheManager;
mMetaManager = mResourceCloser.register(
new DoraMetaManager(this, mCacheManager, mUfs));
Expand Down Expand Up @@ -250,9 +238,6 @@ public void stop() throws IOException {

@Override
public void close() throws IOException {
if (mMetaStore != null) {
mMetaStore.close();
}
try (AutoCloseable ignoredCloser = mResourceCloser;
AutoCloseable ignoredCacheManager = mCacheManager
) {
Expand Down Expand Up @@ -682,6 +667,7 @@ public void completeFile(String path, CompleteFilePOptions options, String uuid)
mOpenFileHandleContainer.remove(path);
handle.close();
Optional<DoraMeta.FileStatus> status = mMetaManager.loadFromUfs(path);
mMetaManager.invalidateListingCacheOfParent(path);
if (status.isEmpty()) {
throw new FileNotFoundException("Cannot retrieve file metadata of "
+ path + " when completing the file");
Expand Down Expand Up @@ -721,6 +707,7 @@ public void rename(String src, String dst, RenamePOptions options)
}
mMetaManager.removeFromMetaStore(src);
mMetaManager.loadFromUfs(dst);
mMetaManager.invalidateListingCacheOfParent(dst);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -732,6 +719,7 @@ public void createDirectory(String path, CreateDirectoryPOptions options)
try {
boolean success = mUfs.mkdirs(path);
mMetaManager.loadFromUfs(path);
mMetaManager.invalidateListingCacheOfParent(path);
if (!success) {
throw new RuntimeException(
new FileAlreadyExistsException(String.format("%s already exists", path)));
Expand Down Expand Up @@ -779,6 +767,7 @@ public void setAttribute(String path, SetAttributePOptions options) throws IOExc
mUfs.setOwner(path, null, options.getGroup());
}
mMetaManager.loadFromUfs(path);
mMetaManager.invalidateListingCacheOfParent(path);
}

@Override
Expand Down Expand Up @@ -830,6 +819,11 @@ UnderFileSystem getUfs() {
return mUfs;
}

@VisibleForTesting
DoraMetaManager getMetaManager() {
return mMetaManager;
}

protected void checkCopyPermission(String srcPath, String dstPath)
throws AccessControlException, IOException {
// No-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
package alluxio.worker.dora;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

Expand All @@ -23,8 +27,11 @@
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AccessControlException;
import alluxio.grpc.DeletePOptions;
import alluxio.grpc.FileInfo;
import alluxio.grpc.FileSystemMasterCommonPOptions;
import alluxio.grpc.GetStatusPOptions;
import alluxio.grpc.ListStatusPOptions;
import alluxio.grpc.LoadFileFailure;
import alluxio.grpc.Route;
import alluxio.grpc.RouteFailure;
Expand All @@ -34,6 +41,7 @@
import alluxio.security.authorization.Mode;
import alluxio.underfs.Fingerprint;
import alluxio.underfs.UfsStatus;
import alluxio.util.CommonUtils;
import alluxio.util.io.BufferUtils;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -169,7 +177,7 @@ public void testCopyException() throws IOException, ExecutionException, Interrup
mWorker.copy(Collections.singletonList(route), null, writeOptions);
List<RouteFailure> failures = copy.get();
assertEquals(1, failures.size());
Assert.assertFalse(b.exists());
assertFalse(b.exists());
}

@Test
Expand Down Expand Up @@ -267,7 +275,7 @@ public void testSingleFileMove() throws IOException, ExecutionException, Interru
List<RouteFailure> failures = move.get();
assertEquals(0, failures.size());
Assert.assertTrue(b.exists());
Assert.assertFalse(a.exists());
assertFalse(a.exists());
try (InputStream in = Files.newInputStream(b.toPath())) {
byte[] readBuffer = new byte[length];
while (in.read(readBuffer) != -1) {
Expand Down Expand Up @@ -295,7 +303,7 @@ public void testMoveException() throws IOException, ExecutionException, Interrup
mWorker.move(Collections.singletonList(route), null, writeOptions);
List<RouteFailure> failures = move.get();
assertEquals(1, failures.size());
Assert.assertFalse(b.exists());
assertFalse(b.exists());
}

@Test
Expand All @@ -318,7 +326,7 @@ public void testSingleFolderMove() throws IOException, ExecutionException, Inter
assertEquals(0, failures.size());
Assert.assertTrue(b.exists());
Assert.assertTrue(b.isDirectory());
Assert.assertFalse(a.exists());
assertFalse(a.exists());
}

@Test
Expand Down Expand Up @@ -363,9 +371,9 @@ public void testFolderWithFileMove()
Assert.assertTrue(b.isDirectory());
Assert.assertTrue(dstD.exists());
Assert.assertTrue(dstD.isDirectory());
Assert.assertFalse(a.exists());
Assert.assertFalse(c.exists());
Assert.assertFalse(d.exists());
assertFalse(a.exists());
assertFalse(c.exists());
assertFalse(d.exists());
try (InputStream in = Files.newInputStream(dstC.toPath())) {
byte[] readBuffer = new byte[length];
while (in.read(readBuffer) != -1) {
Expand Down Expand Up @@ -500,6 +508,97 @@ private void testGetFileInfo(boolean populateFingerprint)
FileSystemMasterCommonPOptions.newBuilder().setSyncIntervalMs(0)).build()));
}

@Test
public void testListStatus() throws IOException, AccessControlException {
File rootFolder = mTestFolder.newFolder("root");
String rootPath = rootFolder.getAbsolutePath();
File f1 = mTestFolder.newFile("root/f1");
String f1Path = f1.getAbsolutePath();
mTestFolder.newFile("root/f2");
mTestFolder.newFolder("root/d1");
mTestFolder.newFile("root/d1/f1");

// Happy path: list non recursively
UfsStatus[] listResult =
mWorker.listStatus(rootPath, ListStatusPOptions.newBuilder().setRecursive(false).build());
assertNotNull(listResult);
assertEquals(3, listResult.length);
assertSame(listResult,
mWorker.getMetaManager().listCached(rootPath, false).get().mUfsStatuses);

// Happy path: list non recursively with a sync interval; cache should be updated.
CommonUtils.sleepMs(200);
UfsStatus[] listResult2 =
mWorker.listStatus(rootPath,
ListStatusPOptions.newBuilder().setRecursive(false)
.setCommonOptions(
FileSystemMasterCommonPOptions.newBuilder().setSyncIntervalMs(100))
.build());
assertNotSame(listResult, listResult2);
assertNotNull(listResult2);
assertEquals(3, listResult2.length);

// Happy path: list recursively; cache should not be filled
UfsStatus[] listResult3 =
mWorker.listStatus(rootPath, ListStatusPOptions.newBuilder().setRecursive(true).build());
assertNotNull(listResult3);
assertEquals(4, listResult3.length);
assertTrue(mWorker.getMetaManager().listCached(rootPath, true).isEmpty());

// Happy path: list a file, the file should be the only element in the ufs status array.
// The file should not be included in the listing cache.
UfsStatus[] listResult4 =
mWorker.listStatus(f1Path, ListStatusPOptions.getDefaultInstance());
assertNotNull(listResult4);
// The result only contains the file itself
assertEquals(1, listResult4.length);
assertTrue(mWorker.getMetaManager().listCached(f1Path, true).isEmpty());

// Error case: list a path that does not exist
assertThrows(FileNotFoundException.class, () -> {
mWorker.listStatus(rootPath + "/non-existing-path",
ListStatusPOptions.getDefaultInstance());
});
}

@Test
public void testListCacheConsistency()
throws IOException, AccessControlException, ExecutionException, InterruptedException,
TimeoutException {
String fileContent = "foobar";
File rootFolder = mTestFolder.newFolder("root");
String rootPath = rootFolder.getAbsolutePath();
File f = mTestFolder.newFile("root/f");
Files.write(f.toPath(), fileContent.getBytes());

UfsStatus[] listResult =
mWorker.listStatus(rootPath, ListStatusPOptions.newBuilder().setRecursive(false).build());
assertNotNull(listResult);
assertEquals(1, listResult.length);

FileInfo fileInfo = mWorker.getGrpcFileInfo(f.getPath(), 0);
loadFileData(f.getPath());
assertNotNull(fileInfo);

// Assert that page cache, metadata cache & list cache all cached data properly
assertTrue(mWorker.getMetaManager().getFromMetaStore(f.getPath()).isPresent());
assertSame(listResult,
mWorker.getMetaManager().listCached(rootPath, false).get().mUfsStatuses);
List<PageId> cachedPages =
mCacheManager.getCachedPageIdsByFileId(
new AlluxioURI(f.getPath()).hash(), fileContent.length());
assertEquals(1, cachedPages.size());

mWorker.delete(f.getAbsolutePath(), DeletePOptions.getDefaultInstance());
// Assert that page cache, metadata cache & list cache all removed stale data
assertTrue(mWorker.getMetaManager().getFromMetaStore(f.getPath()).isEmpty());
assertTrue(mWorker.getMetaManager().listCached(rootPath, false).isEmpty());
cachedPages =
mCacheManager.getCachedPageIdsByFileId(
new AlluxioURI(f.getPath()).hash(), fileContent.length());
assertEquals(0, cachedPages.size());
}

private void testGetFileInfoDir(boolean populateFingerprint)
throws AccessControlException, IOException {
mWorker.setPopulateMetadataFingerprint(populateFingerprint);
Expand Down

0 comments on commit 268c342

Please sign in to comment.