Skip to content

Commit

Permalink
Refactor dora meta manager
Browse files Browse the repository at this point in the history
  • Loading branch information
elega committed Jun 29, 2023
1 parent 42e9ce8 commit b8b1d0a
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 130 deletions.
4 changes: 4 additions & 0 deletions dora/core/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
14 changes: 8 additions & 6 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -7927,10 +7927,12 @@ public String toString() {
.setScope(Scope.WORKER)
.build();

public static final PropertyKey DORA_UFS_LIST_STATUS_CACHE_NR_DIRS =
intBuilder(Name.DORA_UFS_LIST_STATUS_CACHE_NR_DIRS)
.setDefaultValue(50)
.setDescription("Number of the file/dir cache of UFS list status results")
public static final PropertyKey DORA_UFS_LIST_STATUS_CACHE_NR_FILES =
intBuilder(Name.DORA_UFS_LIST_STATUS_CACHE_NR_FILES)
.setDefaultValue(100000)
.setDescription("Number of the file ufs statuses of UFS listing cache. "
+ "The more files a directory contain, the more weight it is counted "
+ "when the cache capacity is calculated.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.WORKER)
.build();
Expand Down Expand Up @@ -9585,8 +9587,8 @@ public static final class Name {
public static final String DORA_UFS_LIST_STATUS_CACHE_TTL =
"alluxio.dora.ufs.list.status.cache.ttl";

public static final String DORA_UFS_LIST_STATUS_CACHE_NR_DIRS =
"alluxio.dora.ufs.list.status.cache.nr.dirs";
public static final String DORA_UFS_LIST_STATUS_CACHE_NR_FILES =
"alluxio.dora.ufs.list.status.cache.nr.files";

//
// Extra class to be loaded
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.exception;

import javax.annotation.concurrent.ThreadSafe;

/**
* Exception for situations where a listing operation happens on a non-directory file/object.
*/
@ThreadSafe
public class ListOnNonDirectoryException extends AlluxioException {
private static final long serialVersionUID = -98364579823401L;

/**
* Constructs a new exception with the specified detail message.
*
* @param message the detail message
*/
public ListOnNonDirectoryException(String message) {
super(message);
}

/**
* Constructs a new exception with the specified detail message and cause.
*
* @param message the detail message
* @param cause the cause
*/
public ListOnNonDirectoryException(String message, Throwable cause) {
super(message, cause);
}

/**
* Constructs a new exception with the specified exception message and multiple parameters.
*
* @param message the exception message
* @param params the parameters
*/
public ListOnNonDirectoryException(ExceptionMessage message, Object... params) {
this(message.getMessage(params));
}

/**
* Constructs a new exception with the specified exception message, the cause and multiple
* parameters.
*
* @param message the exception message
* @param cause the cause
* @param params the parameters
*/
public ListOnNonDirectoryException(ExceptionMessage message, Throwable cause,
Object... params) {
this(message.getMessage(params), cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,27 @@

import alluxio.AlluxioURI;
import alluxio.client.file.cache.CacheManager;
import alluxio.client.file.cache.PageId;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.InvalidPathException;
import alluxio.exception.ListOnNonDirectoryException;
import alluxio.file.FileId;
import alluxio.proto.meta.DoraMeta;
import alluxio.proto.meta.DoraMeta.FileStatus;
import alluxio.underfs.Fingerprint;
import alluxio.underfs.UfsStatus;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.ListOptions;
import alluxio.util.io.PathUtils;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;

/**
Expand All @@ -35,25 +42,33 @@
* TODO(elega) Invalidating page cache synchronously causes performance issue and currently it
* also lacks concurrency control. Address this problem in the future.
*/
public class DoraMetaManager {
private final DoraMetaStore mMetastore;
public class DoraMetaManager implements Closeable {
private final DoraMetaStore mMetaStore;
private final CacheManager mCacheManager;
private final PagedDoraWorker mDoraWorker;
private final UnderFileSystem mUfs;
private boolean mPopulateMetadataFingerprint =
Configuration.getBoolean(PropertyKey.DORA_WORKER_POPULATE_METADATA_FINGERPRINT);

private final Cache<String, ListStatusResult> mListStatusCache = Caffeine.newBuilder()
.maximumWeight(Configuration.getInt(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_NR_FILES))
.weigher((String k, ListStatusResult v) -> v == null ? 0 : v.mUfsStatuses.length)
.expireAfterWrite(Configuration.getDuration(PropertyKey.DORA_UFS_LIST_STATUS_CACHE_TTL))
.build();

/**
* Creates a dora meta manager.
* @param doraWorker the dora worker instance
* @param metaStore the dora meta store
* @param cacheManger the cache manager to manage the page cache
* @param ufs the associated ufs
*/
public DoraMetaManager(
PagedDoraWorker doraWorker, DoraMetaStore metaStore, CacheManager cacheManger,
PagedDoraWorker doraWorker, CacheManager cacheManger,
UnderFileSystem ufs) {
mMetastore = metaStore;
String dbDir = Configuration.getString(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR);
Duration duration = Configuration.getDuration(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_TTL);
long ttl = (duration.isNegative() || duration.isZero()) ? -1 : duration.getSeconds();
mMetaStore = new RocksDBDoraMetaStore(dbDir, ttl);
mCacheManager = cacheManger;
mDoraWorker = doraWorker;
mUfs = ufs;
Expand Down Expand Up @@ -97,7 +112,7 @@ public Optional<FileStatus> loadFromUfs(String path) throws IOException {
* @return the file status, or empty optional if not found
*/
public Optional<FileStatus> getFromMetaStore(String path) {
return mMetastore.getDoraMeta(path);
return mMetaStore.getDoraMeta(path);
}

/**
Expand All @@ -106,11 +121,15 @@ public Optional<FileStatus> getFromMetaStore(String path) {
* @param meta the file meta
*/
public void put(String path, FileStatus meta) {
Optional<FileStatus> status = mMetastore.getDoraMeta(path);
// TODO(elega) as an optimization, we can edit the listing cache to reflect the
// metadata change of a single file instead of invalidate it.
invalidateListingCache(getPathParent(path));

Optional<FileStatus> status = mMetaStore.getDoraMeta(path);
if (status.isEmpty()
|| status.get().getFileInfo().getFolder()
|| status.get().getFileInfo().getLength() == 0) {
mMetastore.putDoraMeta(path, meta);
mMetaStore.putDoraMeta(path, meta);
return;
}
if (mPopulateMetadataFingerprint) {
Expand All @@ -128,12 +147,12 @@ public void put(String path, FileStatus meta) {
&& newFingerprint.isValid()
&& originalFingerprint.matchContent(newFingerprint);
if (!contentMatched) {
invalidateCachedFile(path, status.get().getFileInfo().getLength());
invalidateCachedFile(path);
}
} else {
invalidateCachedFile(path, status.get().getFileInfo().getLength());
invalidateCachedFile(path);
}
mMetastore.putDoraMeta(path, meta);
mMetaStore.putDoraMeta(path, meta);
}

/**
Expand All @@ -142,24 +161,112 @@ public void put(String path, FileStatus meta) {
* @return the removed file meta, if exists
*/
public Optional<FileStatus> removeFromMetaStore(String path) {
Optional<FileStatus> status = mMetastore.getDoraMeta(path);
invalidateListingCache(getPathParent(path));
Optional<FileStatus> status = mMetaStore.getDoraMeta(path);
if (status.isPresent()) {
mMetastore.removeDoraMeta(path);
invalidateCachedFile(path, status.get().getFileInfo().getLength());
mMetaStore.removeDoraMeta(path);
}
invalidateCachedFile(path);
return status;
}

private void invalidateCachedFile(String path, long length) {
FileId fileId = FileId.of(AlluxioURI.hash(path));
mCacheManager.deleteFile(fileId.toString());
for (PageId page: mCacheManager.getCachedPageIdsByFileId(fileId.toString(), length)) {
mCacheManager.delete(page);
public void invalidateListingCache(String path) {
mListStatusCache.invalidate(path);
}

public Optional<ListStatusResult> listCached(String path, boolean isRecursive) {
// We don't cache recursive listing result as usually the number of files are too
// large to cache.
if (isRecursive) {
return Optional.empty();
}
ListStatusResult result = mListStatusCache.getIfPresent(path);
return Optional.ofNullable(result);
}

/**
* Lists a directory from UFS and cache it into the listing cache if it exists.
* @param path the ufs path
* @param isRecursive if the listing is recursive
* @return an empty option if the directory does not exist, otherwise an option contains an
* ufs status array.
* @throws IOException if the UFS call failed
* @throws ListOnNonDirectoryException if the target path exists but not a valid listing target
* (e.g.) the path is a file or object.
*/
public Optional<UfsStatus[]> listFromUfsThenCache(String path, boolean isRecursive)
throws IOException, ListOnNonDirectoryException {
try {
var cached = mListStatusCache.get(path, (k) -> {
try {
Optional<UfsStatus[]> listResults = listFromUfs(path, isRecursive);
// Question: should we cache absent value?
return listResults.map(
ufsStatuses -> new ListStatusResult(System.nanoTime(), ufsStatuses))
.orElseGet(() -> new ListStatusResult(System.nanoTime(), null));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return Optional.ofNullable(cached.mUfsStatuses);
} catch (RuntimeException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
if (e.getCause() instanceof ListOnNonDirectoryException) {
throw (ListOnNonDirectoryException) e.getCause();
}
throw new RuntimeException(e);
}
}

/**
* Lists a directory from UFS.
* @param path the ufs path
* @param isRecursive if the listing is recursive
* @return an empty option if the directory does not exist, otherwise an option contains an
* ufs status array.
* @throws IOException if the UFS call failed
* @throws ListOnNonDirectoryException if the target path exists but not a valid listing target
* (e.g.) the path is a file or object.
*/
public Optional<UfsStatus[]> listFromUfs(String path, boolean isRecursive)
throws IOException, ListOnNonDirectoryException {
ListOptions ufsListOptions = ListOptions.defaults().setRecursive(isRecursive);
try {
UfsStatus[] listResults = mUfs.listStatus(path, ufsListOptions);
if (listResults == null) {
throw new ListOnNonDirectoryException(path + " is not a directory");
}
return Optional.of(listResults);
} catch (FileNotFoundException e) {
return Optional.empty();
}
}

@VisibleForTesting
void setPopulateMetadataFingerprint(boolean value) {
mPopulateMetadataFingerprint = value;
}

private void invalidateCachedFile(String path) {
FileId fileId = FileId.of(AlluxioURI.hash(path));
mCacheManager.deleteFile(fileId.toString());
}

private String getPathParent(String path) {
if (path.equals("/")) {
return path;
}
try {
return PathUtils.getParent(path);
} catch (InvalidPathException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws IOException {
mMetaStore.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package alluxio.worker.dora;

import alluxio.underfs.UfsStatus;

/**
* The list status results stored in the cache.
*/
public class ListStatusResult {
public long mTimeStamp;
public UfsStatus[] mUfsStatuses;

ListStatusResult(long timeStamp, UfsStatus[] ufsStatuses) {
mTimeStamp = timeStamp;
mUfsStatuses = ufsStatuses;
}
}
Loading

0 comments on commit b8b1d0a

Please sign in to comment.