Skip to content

Commit

Permalink
Update S3SingleDriverLogStore.java
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Sep 17, 2024
1 parent e1dd987 commit 5e12b9f
Showing 1 changed file with 8 additions and 139 deletions.
147 changes: 8 additions & 139 deletions storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
import org.apache.hadoop.fs.RawLocalFileSystem;

/**
* Single Spark-driver/JVM LogStore implementation for S3.
* Single JVM LogStore implementation for S3.
* <p>
* We assume the following from S3's {@link FileSystem} implementations:
* <ul>
* <li>File writing on S3 is all-or-nothing, whether overwrite or not.</li>
* <li>List-after-write can be inconsistent.</li>
* <li>List-after-write is strongly consistent.</li>
* </ul>
* <p>
* Regarding file creation, this implementation:
Expand All @@ -55,12 +55,6 @@
* <li>Failures during stream write may leak resources, but may never result in partial
* writes.</li>
* </ul>
* <p>
* Regarding directory listing, this implementation:
* <ul>
* <li>returns a list by merging the files listed from S3 and recently-written files from the
* cache.</li>
* </ul>
*/
public class S3SingleDriverLogStore extends HadoopFileSystemLogStore {

Expand All @@ -85,16 +79,6 @@ public class S3SingleDriverLogStore extends HadoopFileSystemLogStore {
*/
private static final PathLock pathLock = new PathLock();

/**
* A global cache that records the metadata of the files recently written.
* As list-after-write may be inconsistent on S3, we can use the files in the cache
* to fix the inconsistent file listing.
*/
private static final Cache<Path, FileMetadata> writtenPathCache =
CacheBuilder.newBuilder()
.expireAfterAccess(120, TimeUnit.MINUTES)
.build();

/////////////////////////////////////////////
// Constructor and Instance Helper Methods //
/////////////////////////////////////////////
Expand All @@ -103,13 +87,6 @@ public S3SingleDriverLogStore(Configuration hadoopConf) {
super(hadoopConf);
}

/**
* Check if the path is an initial version of a Delta log.
*/
private boolean isInitialVersion(Path path) {
return FileNameUtils.isDeltaFile(path) && FileNameUtils.deltaVersion(path) == 0L;
}

private Path resolvePath(FileSystem fs, Path path) {
return stripUserInfo(fs.makeQualified(path));
}
Expand Down Expand Up @@ -137,66 +114,14 @@ private Path stripUserInfo(Path path) {
}
}

/**
* Merge two lists of {@link FileStatus} into a single list ordered by file path name.
* In case both lists have {@link FileStatus}'s for the same file path, keep the one from
* `listWithPrecedence` and discard the other from `list`.
*/
private Iterator<FileStatus> mergeFileLists(
List<FileStatus> list,
List<FileStatus> listWithPrecedence) {
final Map<Path, FileStatus> fileStatusMap = new HashMap<>();

// insert all elements from `listWithPrecedence` (highest priority)
// and then insert elements from `list` if and only if that key doesn't already exist
Stream.concat(listWithPrecedence.stream(), list.stream())
.forEach(fs -> fileStatusMap.putIfAbsent(fs.getPath(), fs));

return fileStatusMap
.values()
.stream()
.sorted(Comparator.comparing(a -> a.getPath().getName()))
.iterator();
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory.
*/
private List<FileStatus> listFromCache(
FileSystem fs,
Path resolvedPath) {
final Path pathKey = stripUserInfo(resolvedPath);

return writtenPathCache
.asMap()
.entrySet()
.stream()
.filter(e -> {
final Path path = e.getKey();
return path.getParent().equals(pathKey.getParent()) &&
path.getName().compareTo(pathKey.getName()) >= 0;
}).map(e -> {
final Path path = e.getKey();
final FileMetadata fileMetadata = e.getValue();
return new FileStatus(
fileMetadata.length,
false, // isDir
1, // block_replication
fs.getDefaultBlockSize(path),
fileMetadata.modificationTime,
path);
}).collect(Collectors.toList());
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory, which merges
* the file system list and the cache list when `useCache` is on, otherwise
* use file system list only.
*/
private Iterator<FileStatus> listFromInternal(
FileSystem fs,
Path resolvedPath,
boolean useCache) throws IOException {
Path resolvedPath) throws IOException {
final Path parentPath = resolvedPath.getParent();
if (!fs.exists(parentPath)) {
throw new FileNotFoundException(
Expand All @@ -214,30 +139,11 @@ private Iterator<FileStatus> listFromInternal(
statuses = S3LogStoreUtil.s3ListFromArray(fs, resolvedPath, parentPath);
}

final List<FileStatus> listedFromFs = Arrays
return Arrays
.stream(statuses)
.filter(s -> s.getPath().getName().compareTo(resolvedPath.getName()) >= 0)
.collect(Collectors.toList());

final List<FileStatus> listedFromCache = useCache ?
listFromCache(fs, resolvedPath) : Collections.emptyList();

// File statuses listed from file system take precedence
return mergeFileLists(listedFromCache, listedFromFs);
}

/**
* Check if a path exists. Normally we check both the file system and the cache, but when the
* path is the first version of a Delta log, we ignore the cache.
*/
private boolean exists(
FileSystem fs,
Path resolvedPath) throws IOException {
final boolean useCache = !isInitialVersion(resolvedPath);
final Iterator<FileStatus> iter = listFromInternal(fs, resolvedPath, useCache);
if (!iter.hasNext()) return false;

return iter.next().getPath().getName().equals(resolvedPath.getName());
.sorted(Comparator.comparing(a -> a.getPath().getName()))
.iterator();
}

////////////////////////
Expand All @@ -255,7 +161,7 @@ public void write(
try {
pathLock.acquire(resolvedPath);
try {
if (exists(fs, resolvedPath) && !overwrite) {
if (fs.exists(resolvedPath) && !overwrite) {
throw new java.nio.file.FileAlreadyExistsException(
resolvedPath.toUri().toString()
);
Expand All @@ -268,26 +174,6 @@ public void write(
stream.write((actions.next() + "\n").getBytes(StandardCharsets.UTF_8));
}
stream.close();

// When a Delta log starts afresh, all cached files in that Delta log become
// obsolete, so we remove them from the cache.
if (isInitialVersion(resolvedPath)) {
final List<Path> obsoleteFiles = writtenPathCache
.asMap()
.keySet()
.stream()
.filter(p -> p.getParent().equals(resolvedPath.getParent()))
.collect(Collectors.toList());

writtenPathCache.invalidateAll(obsoleteFiles);
}

// Cache the information of written files to help fix the inconsistency in future
// listings
writtenPathCache.put(
resolvedPath,
new FileMetadata(stream.getCount(), System.currentTimeMillis())
);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException e) {
// Convert Hadoop's FileAlreadyExistsException to Java's FileAlreadyExistsException
throw new java.nio.file.FileAlreadyExistsException(e.getMessage());
Expand All @@ -303,28 +189,11 @@ public void write(
public Iterator<FileStatus> listFrom(Path path, Configuration hadoopConf) throws IOException {
final FileSystem fs = path.getFileSystem(hadoopConf);
final Path resolvedPath = resolvePath(fs, path);
return listFromInternal(fs, resolvedPath, true); // useCache=true
return listFromInternal(fs, resolvedPath);
}

@Override
public Boolean isPartialWriteVisible(Path path, Configuration hadoopConf) {
return false;
}

//////////////////
// Helper Class //
//////////////////

/**
* The file metadata to be stored in the cache.
*/
private class FileMetadata {
private long length;
private long modificationTime;

public FileMetadata(long length, long modificationTime) {
this.length = length;
this.modificationTime = modificationTime;
}
}
}

0 comments on commit 5e12b9f

Please sign in to comment.