Skip to content

Commit

Permalink
support cleanup expired logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
horizonzy authored and vkorukanti committed Sep 12, 2024
1 parent 920f185 commit e46bb83
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,13 @@ CloseableIterator<ByteArrayInputStream> readFiles(CloseableIterator<FileReadRequ
* @throws IOException for any IO error.
*/
boolean mkdirs(String path) throws IOException;

/**
* Delete a file.
*
* @param path the path to delete.
* @return true if delete is successful else false.
* @throws IOException for any IO error.
*/
boolean delete(String path) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,40 @@ public class TableConfig<T> {
"needs to be a positive integer.",
true);

/**
* The shortest duration we have to keep logically deleted data files around before deleting them
* physically. This is to prevent failures in stale readers after compactions or partition
* overwrites.
*
* <p>Note: this value should be large enough: - It should be larger than the longest possible
* duration of a job if you decide to run "VACUUM" when there are concurrent readers or writers
* accessing the table. - If you are running a streaming query reading from the table, you should
* make sure the query doesn't stop longer than this value. Otherwise, the query may not be able
* to restart as it still needs to read old files.
*
* <p>We didn't validate the value is greater than 0. In standalone lib, the log expire time is
* based on day, so if we want to clean the log immediately, we need to config the value to "-1
* days", so here didn't validate the value. See:
* io.delta.standalone.internal.MetadataCleanup#cleanUpExpiredLogs().
*/
public static final TableConfig<Long> LOG_RETENTION =
new TableConfig<>(
"delta.logRetentionDuration",
"interval 30 days",
(engineOpt, v) -> IntervalParserUtils.safeParseIntervalAsMillis(v),
value -> true,
"needs to be provided as a calendar interval such as '2 weeks'. Months "
+ "and years are not accepted. You may specify '365 days' for a year instead.");

/** Whether to clean up expired checkpoints and delta logs. */
public static final TableConfig<Boolean> ENABLE_EXPIRED_LOG_CLEANUP =
new TableConfig<>(
"delta.enableExpiredLogCleanup",
"true",
(engineOpt, v) -> Boolean.valueOf(v),
value -> true,
"needs to be a boolean.");

/**
* This table property is used to track the enablement of the {@code inCommitTimestamps}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
Expand Down Expand Up @@ -231,6 +232,45 @@ public void checkpoint(Engine engine, long version) throws TableNotFoundExceptio
logger.info("{}: Last checkpoint metadata file is written for version: {}", tablePath, version);

logger.info("{}: Finished checkpoint for version: {}", tablePath, version);
doLogCleanup(engine, checkpointMetaData, snapshot);
}

private void doLogCleanup(
Engine engine, CheckpointMetaData checkpointMetaData, SnapshotImpl snapshot)
throws IOException {
Metadata metadata = snapshot.getMetadata();
Boolean enableExpireLogCleanup =
TableConfig.ENABLE_EXPIRED_LOG_CLEANUP.fromMetadata(engine, metadata);
if (enableExpireLogCleanup) {
Long retentionMillis = TableConfig.LOG_RETENTION.fromMetadata(engine, metadata);
Long fileCutOffTime = System.currentTimeMillis() - retentionMillis;
logger.info(
"{}: Starting the deletion of log files older than {}", tablePath, fileCutOffTime);
int numDeleted = 0;
try (CloseableIterator<FileStatus> files =
listExpiredDeltaLogs(engine, checkpointMetaData, fileCutOffTime)) {
while (files.hasNext()) {
if (engine.getFileSystemClient().delete(files.next().getPath())) {
numDeleted++;
}
}
}
logger.info("{}: Deleted {} log files older than {}", tablePath, numDeleted, fileCutOffTime);
}
}

private CloseableIterator<FileStatus> listExpiredDeltaLogs(
Engine engine, CheckpointMetaData checkpointMetaData, Long fileCutOffTime)
throws IOException {
long threshold = checkpointMetaData.version - 1;
return engine
.getFileSystemClient()
.listFrom(FileNames.checkpointPrefix(logPath, 0).toUri().getPath())
.filter(
f ->
(FileNames.isCheckpointFile(f.getPath()) || FileNames.isCommitFile(f.getPath()))
&& FileNames.getFileVersion(new Path(f.getPath())) <= threshold
&& f.getModificationTime() <= fileCutOffTime);
}

////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ public static Path v2CheckpointSidecarFile(Path path, String uuid) {
return new Path(String.format("%s/_sidecars/%s.parquet", path.toString(), uuid));
}

/**
* Returns the prefix of all checkpoint files for the given version. Intended for use with
* listFrom to get all files from this version onwards. The returned Path will not exist as a
* file.
*/
public static Path checkpointPrefix(Path path, long version) {
return new Path(path, String.format("%020d.checkpoint", version));
}

/**
* Returns the paths for all parts of the checkpoint up to the given version.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,7 @@ trait BaseMockFileSystemClient extends FileSystemClient {

override def mkdirs(path: String): Boolean =
throw new UnsupportedOperationException("not supported in this test suite")

override def delete(path: String): Boolean =
throw new UnsupportedOperationException("not supported in this test suite")
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ public boolean mkdirs(String path) throws IOException {
return fs.mkdirs(pathObject);
}

@Override
public boolean delete(String path) throws IOException {
Path pathObject = new Path(path);
FileSystem fs = pathObject.getFileSystem(hadoopConf);
return fs.delete(pathObject, false);
}

private ByteArrayInputStream getStream(String filePath, int offset, int size) {
Path path = new Path(filePath);
try {
Expand Down

0 comments on commit e46bb83

Please sign in to comment.