diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/FileSystemClient.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/FileSystemClient.java index 8814a86602..5e7811e15d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/FileSystemClient.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/FileSystemClient.java @@ -76,4 +76,13 @@ CloseableIterator readFiles(CloseableIterator { "needs to be a positive integer.", true); + /** + * The shortest duration we have to keep logically deleted data files around before deleting them + * physically. This is to prevent failures in stale readers after compactions or partition + * overwrites. + * + *

Note: this value should be large enough: - It should be larger than the longest possible + * duration of a job if you decide to run "VACUUM" when there are concurrent readers or writers + * accessing the table. - If you are running a streaming query reading from the table, you should + * make sure the query doesn't stop longer than this value. Otherwise, the query may not be able + * to restart as it still needs to read old files. + * + *

We didn't validate the value is greater than 0. In standalone lib, the log expire time is + * based on day, so if we want to clean the log immediately, we need to config the value to "-1 + * days", so here didn't validate the value. See: + * io.delta.standalone.internal.MetadataCleanup#cleanUpExpiredLogs(). + */ + public static final TableConfig LOG_RETENTION = + new TableConfig<>( + "delta.logRetentionDuration", + "interval 30 days", + (engineOpt, v) -> IntervalParserUtils.safeParseIntervalAsMillis(v), + value -> true, + "needs to be provided as a calendar interval such as '2 weeks'. Months " + + "and years are not accepted. You may specify '365 days' for a year instead."); + + /** Whether to clean up expired checkpoints and delta logs. */ + public static final TableConfig ENABLE_EXPIRED_LOG_CLEANUP = + new TableConfig<>( + "delta.enableExpiredLogCleanup", + "true", + (engineOpt, v) -> Boolean.valueOf(v), + value -> true, + "needs to be a boolean."); + /** * This table property is used to track the enablement of the {@code inCommitTimestamps}. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 82f00bd5e5..3a6832df3f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -31,6 +31,7 @@ import io.delta.kernel.exceptions.InvalidTableException; import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.*; +import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.checkpoints.*; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.ListUtils; @@ -231,6 +232,45 @@ public void checkpoint(Engine engine, long version) throws TableNotFoundExceptio logger.info("{}: Last checkpoint metadata file is written for version: {}", tablePath, version); logger.info("{}: Finished checkpoint for version: {}", tablePath, version); + doLogCleanup(engine, checkpointMetaData, snapshot); + } + + private void doLogCleanup( + Engine engine, CheckpointMetaData checkpointMetaData, SnapshotImpl snapshot) + throws IOException { + Metadata metadata = snapshot.getMetadata(); + Boolean enableExpireLogCleanup = + TableConfig.ENABLE_EXPIRED_LOG_CLEANUP.fromMetadata(engine, metadata); + if (enableExpireLogCleanup) { + Long retentionMillis = TableConfig.LOG_RETENTION.fromMetadata(engine, metadata); + Long fileCutOffTime = System.currentTimeMillis() - retentionMillis; + logger.info( + "{}: Starting the deletion of log files older than {}", tablePath, fileCutOffTime); + int numDeleted = 0; + try (CloseableIterator files = + listExpiredDeltaLogs(engine, checkpointMetaData, fileCutOffTime)) { + while (files.hasNext()) { + if (engine.getFileSystemClient().delete(files.next().getPath())) { + numDeleted++; + } + } + } + logger.info("{}: Deleted {} log files older than {}", tablePath, numDeleted, fileCutOffTime); + } + } + + private CloseableIterator listExpiredDeltaLogs( + Engine engine, CheckpointMetaData checkpointMetaData, Long fileCutOffTime) + throws IOException { + long threshold = checkpointMetaData.version - 1; + return engine + .getFileSystemClient() + .listFrom(FileNames.checkpointPrefix(logPath, 0).toUri().getPath()) + .filter( + f -> + (FileNames.isCheckpointFile(f.getPath()) || FileNames.isCommitFile(f.getPath())) + && FileNames.getFileVersion(new Path(f.getPath())) <= threshold + && f.getModificationTime() <= fileCutOffTime); } //////////////////// diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java index 93c554c9e8..329b5e2127 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java @@ -109,6 +109,15 @@ public static Path v2CheckpointSidecarFile(Path path, String uuid) { return new Path(String.format("%s/_sidecars/%s.parquet", path.toString(), uuid)); } + /** + * Returns the prefix of all checkpoint files for the given version. Intended for use with + * listFrom to get all files from this version onwards. The returned Path will not exist as a + * file. + */ + public static Path checkpointPrefix(Path path, long version) { + return new Path(path, String.format("%020d.checkpoint", version)); + } + /** * Returns the paths for all parts of the checkpoint up to the given version. * diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala index 671272bdab..c7eb9caf7f 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala @@ -159,4 +159,7 @@ trait BaseMockFileSystemClient extends FileSystemClient { override def mkdirs(path: String): Boolean = throw new UnsupportedOperationException("not supported in this test suite") + + override def delete(path: String): Boolean = + throw new UnsupportedOperationException("not supported in this test suite") } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java index e80923554a..c226e61a5a 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java @@ -99,6 +99,13 @@ public boolean mkdirs(String path) throws IOException { return fs.mkdirs(pathObject); } + @Override + public boolean delete(String path) throws IOException { + Path pathObject = new Path(path); + FileSystem fs = pathObject.getFileSystem(hadoopConf); + return fs.delete(pathObject, false); + } + private ByteArrayInputStream getStream(String filePath, int offset, int size) { Path path = new Path(filePath); try {