Skip to content

Commit

Permalink
[Storage] Remove file path cache tech debt from S3SingleDriverLogStore (
Browse files Browse the repository at this point in the history
#3685)

#### Which Delta project/connector is this regarding?

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [X] Other (Storage)

## Description

S3 has been strongly consistent (for `GET`, `PUT`, `LIST`) for years
([announcement](https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/)),
yet the `S3SingleDriverLogStore` has had code that assumes S3 is
read-after-write inconsistent.

This PR removes such redundant code.

## How was this patch tested?

Existing unit tests.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
scottsand-db authored Sep 18, 2024
1 parent afc8bf6 commit 93eef11
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,86 +66,6 @@ trait S3SingleDriverLogStoreSuiteBase extends LogStoreSuiteBase {
}
}

test("cache works") {
withTempDir { dir =>
val store = createLogStore(spark)
val deltas =
Seq(0, 1, 2, 3, 4).map(i => FileNames.unsafeDeltaFile(new Path(dir.toURI), i))
store.write(deltas(0), Iterator("zero"), overwrite = false, sessionHadoopConf)
store.write(deltas(1), Iterator("one"), overwrite = false, sessionHadoopConf)
store.write(deltas(2), Iterator("two"), overwrite = false, sessionHadoopConf)

// delete delta file 2 from file system
val fs = new Path(dir.getCanonicalPath).getFileSystem(sessionHadoopConf)
fs.delete(deltas(2), true)

// file system listing doesn't see file 2
checkFileSystemList(fs, deltas(0), Seq(0, 1))

// can't re-write because cache says it still exists
intercept[java.nio.file.FileAlreadyExistsException] {
store.write(deltas(2), Iterator("two"), overwrite = false, sessionHadoopConf)
}

// log store list still sees file 2 as it's cached
checkLogStoreList(store, deltas(0), Seq(0, 1, 2), sessionHadoopConf)

if (canInvalidateCache) {
// clear the cache
store.invalidateCache()

// log store list doesn't see file 2 anymore
checkLogStoreList(store, deltas(0), Seq(0, 1), sessionHadoopConf)

// write a new file 2
store.write(deltas(2), Iterator("two"), overwrite = false, sessionHadoopConf)
}

// add a file 3 to cache only
store.write(deltas(3), Iterator("three"), overwrite = false, sessionHadoopConf)
fs.delete(deltas(3), true)

// log store listing returns a union of:
// 1) file system listing: 0, 1, 2
// 2a) cache listing - canInvalidateCache=true: 2, 3
// 2b) cache listing - canInvalidateCache=false: 0, 1, 2, 3
checkLogStoreList(store, deltas(0), Seq(0, 1, 2, 3), sessionHadoopConf)
}
}

test("cache works correctly when writing an initial log version") {
withTempDir { rootDir =>
val dir = new File(rootDir, "_delta_log")
dir.mkdir()
val store = createLogStore(spark)
val deltas =
Seq(0, 1, 2).map(i => FileNames.unsafeDeltaFile(new Path(dir.toURI), i))
store.write(deltas(0), Iterator("log version 0"), overwrite = false, sessionHadoopConf)
store.write(deltas(1), Iterator("log version 1"), overwrite = false, sessionHadoopConf)
store.write(deltas(2), Iterator("log version 2"), overwrite = false, sessionHadoopConf)

val fs = new Path(dir.getCanonicalPath).getFileSystem(sessionHadoopConf)
// delete all log files
fs.delete(deltas(2), true)
fs.delete(deltas(1), true)
fs.delete(deltas(0), true)

// can't write a new version 1 as it's in cache
intercept[java.nio.file.FileAlreadyExistsException] {
store.write(deltas(1), Iterator("new log version 1"), overwrite = false, sessionHadoopConf)
}

// all three log files still in cache
checkLogStoreList(store, deltas(0), Seq(0, 1, 2), sessionHadoopConf)

// can write a new version 0 as it's the initial version of the log
store.write(deltas(0), Iterator("new log version 0"), overwrite = false, sessionHadoopConf)

// writing a new initial version invalidates all files in that log
checkLogStoreList(store, deltas(0), Seq(0), sessionHadoopConf)
}
}

protected def shouldUseRenameToWriteCheckpoint: Boolean = false

/**
Expand Down
153 changes: 8 additions & 145 deletions storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,8 @@
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.io.CountingOutputStream;
import io.delta.storage.internal.FileNameUtils;
import io.delta.storage.internal.PathLock;
import io.delta.storage.internal.S3LogStoreUtil;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -41,12 +35,12 @@
import org.apache.hadoop.fs.RawLocalFileSystem;

/**
* Single Spark-driver/JVM LogStore implementation for S3.
* Single JVM LogStore implementation for S3.
* <p>
* We assume the following from S3's {@link FileSystem} implementations:
* <ul>
* <li>File writing on S3 is all-or-nothing, whether overwrite or not.</li>
* <li>List-after-write can be inconsistent.</li>
* <li>List-after-write is strongly consistent.</li>
* </ul>
* <p>
* Regarding file creation, this implementation:
Expand All @@ -55,12 +49,6 @@
* <li>Failures during stream write may leak resources, but may never result in partial
* writes.</li>
* </ul>
* <p>
* Regarding directory listing, this implementation:
* <ul>
* <li>returns a list by merging the files listed from S3 and recently-written files from the
* cache.</li>
* </ul>
*/
public class S3SingleDriverLogStore extends HadoopFileSystemLogStore {

Expand All @@ -85,16 +73,6 @@ public class S3SingleDriverLogStore extends HadoopFileSystemLogStore {
*/
private static final PathLock pathLock = new PathLock();

/**
* A global cache that records the metadata of the files recently written.
* As list-after-write may be inconsistent on S3, we can use the files in the cache
* to fix the inconsistent file listing.
*/
private static final Cache<Path, FileMetadata> writtenPathCache =
CacheBuilder.newBuilder()
.expireAfterAccess(120, TimeUnit.MINUTES)
.build();

/////////////////////////////////////////////
// Constructor and Instance Helper Methods //
/////////////////////////////////////////////
Expand All @@ -103,13 +81,6 @@ public S3SingleDriverLogStore(Configuration hadoopConf) {
super(hadoopConf);
}

/**
* Check if the path is an initial version of a Delta log.
*/
private boolean isInitialVersion(Path path) {
return FileNameUtils.isDeltaFile(path) && FileNameUtils.deltaVersion(path) == 0L;
}

private Path resolvePath(FileSystem fs, Path path) {
return stripUserInfo(fs.makeQualified(path));
}
Expand Down Expand Up @@ -137,66 +108,14 @@ private Path stripUserInfo(Path path) {
}
}

/**
* Merge two lists of {@link FileStatus} into a single list ordered by file path name.
* In case both lists have {@link FileStatus}'s for the same file path, keep the one from
* `listWithPrecedence` and discard the other from `list`.
*/
private Iterator<FileStatus> mergeFileLists(
List<FileStatus> list,
List<FileStatus> listWithPrecedence) {
final Map<Path, FileStatus> fileStatusMap = new HashMap<>();

// insert all elements from `listWithPrecedence` (highest priority)
// and then insert elements from `list` if and only if that key doesn't already exist
Stream.concat(listWithPrecedence.stream(), list.stream())
.forEach(fs -> fileStatusMap.putIfAbsent(fs.getPath(), fs));

return fileStatusMap
.values()
.stream()
.sorted(Comparator.comparing(a -> a.getPath().getName()))
.iterator();
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory.
*/
private List<FileStatus> listFromCache(
FileSystem fs,
Path resolvedPath) {
final Path pathKey = stripUserInfo(resolvedPath);

return writtenPathCache
.asMap()
.entrySet()
.stream()
.filter(e -> {
final Path path = e.getKey();
return path.getParent().equals(pathKey.getParent()) &&
path.getName().compareTo(pathKey.getName()) >= 0;
}).map(e -> {
final Path path = e.getKey();
final FileMetadata fileMetadata = e.getValue();
return new FileStatus(
fileMetadata.length,
false, // isDir
1, // block_replication
fs.getDefaultBlockSize(path),
fileMetadata.modificationTime,
path);
}).collect(Collectors.toList());
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory, which merges
* the file system list and the cache list when `useCache` is on, otherwise
* use file system list only.
*/
private Iterator<FileStatus> listFromInternal(
FileSystem fs,
Path resolvedPath,
boolean useCache) throws IOException {
Path resolvedPath) throws IOException {
final Path parentPath = resolvedPath.getParent();
if (!fs.exists(parentPath)) {
throw new FileNotFoundException(
Expand All @@ -214,30 +133,11 @@ private Iterator<FileStatus> listFromInternal(
statuses = S3LogStoreUtil.s3ListFromArray(fs, resolvedPath, parentPath);
}

final List<FileStatus> listedFromFs = Arrays
return Arrays
.stream(statuses)
.filter(s -> s.getPath().getName().compareTo(resolvedPath.getName()) >= 0)
.collect(Collectors.toList());

final List<FileStatus> listedFromCache = useCache ?
listFromCache(fs, resolvedPath) : Collections.emptyList();

// File statuses listed from file system take precedence
return mergeFileLists(listedFromCache, listedFromFs);
}

/**
* Check if a path exists. Normally we check both the file system and the cache, but when the
* path is the first version of a Delta log, we ignore the cache.
*/
private boolean exists(
FileSystem fs,
Path resolvedPath) throws IOException {
final boolean useCache = !isInitialVersion(resolvedPath);
final Iterator<FileStatus> iter = listFromInternal(fs, resolvedPath, useCache);
if (!iter.hasNext()) return false;

return iter.next().getPath().getName().equals(resolvedPath.getName());
.sorted(Comparator.comparing(a -> a.getPath().getName()))
.iterator();
}

////////////////////////
Expand All @@ -255,7 +155,7 @@ public void write(
try {
pathLock.acquire(resolvedPath);
try {
if (exists(fs, resolvedPath) && !overwrite) {
if (fs.exists(resolvedPath) && !overwrite) {
throw new java.nio.file.FileAlreadyExistsException(
resolvedPath.toUri().toString()
);
Expand All @@ -268,26 +168,6 @@ public void write(
stream.write((actions.next() + "\n").getBytes(StandardCharsets.UTF_8));
}
stream.close();

// When a Delta log starts afresh, all cached files in that Delta log become
// obsolete, so we remove them from the cache.
if (isInitialVersion(resolvedPath)) {
final List<Path> obsoleteFiles = writtenPathCache
.asMap()
.keySet()
.stream()
.filter(p -> p.getParent().equals(resolvedPath.getParent()))
.collect(Collectors.toList());

writtenPathCache.invalidateAll(obsoleteFiles);
}

// Cache the information of written files to help fix the inconsistency in future
// listings
writtenPathCache.put(
resolvedPath,
new FileMetadata(stream.getCount(), System.currentTimeMillis())
);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException e) {
// Convert Hadoop's FileAlreadyExistsException to Java's FileAlreadyExistsException
throw new java.nio.file.FileAlreadyExistsException(e.getMessage());
Expand All @@ -303,28 +183,11 @@ public void write(
public Iterator<FileStatus> listFrom(Path path, Configuration hadoopConf) throws IOException {
final FileSystem fs = path.getFileSystem(hadoopConf);
final Path resolvedPath = resolvePath(fs, path);
return listFromInternal(fs, resolvedPath, true); // useCache=true
return listFromInternal(fs, resolvedPath);
}

@Override
public Boolean isPartialWriteVisible(Path path, Configuration hadoopConf) {
return false;
}

//////////////////
// Helper Class //
//////////////////

/**
* The file metadata to be stored in the cache.
*/
private class FileMetadata {
private long length;
private long modificationTime;

public FileMetadata(long length, long modificationTime) {
this.length = length;
this.modificationTime = modificationTime;
}
}
}

0 comments on commit 93eef11

Please sign in to comment.