Skip to content

Commit

Permalink
support cleanup expired logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
horizonzy committed Aug 19, 2024
1 parent e763fe9 commit cdbd5b2
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ CloseableIterator<ByteArrayInputStream> readFiles(CloseableIterator<FileReadRequ
* @throws IOException for any IO error.
*/
boolean mkdirs(String path) throws IOException;

/** Delete a file.
*
* @param path the path to delete.
* @return true if delete is successful else false.
* @throws IOException for any IO error.
*/
boolean delete(String path) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,41 @@ public class TableConfig<T> {
value -> value > 0,
"needs to be a positive integer.");

/**
* The shortest duration we have to keep logically deleted data files around before deleting
* them physically. This is to prevent failures in stale readers after compactions or partition
* overwrites.
*
* Note: this value should be large enough:
* - It should be larger than the longest possible duration of a job if you decide to run
* "VACUUM" when there are concurrent readers or writers accessing the table.
* - If you are running a streaming query reading from the table, you should make sure the query
* doesn't stop longer than this value. Otherwise, the query may not be able to restart as it
* still needs to read old files.
*
* We didn't validate the value is greater than 0. In standalone lib, the log expire time is
* based on day, so if we want to clean the log immediately, we need to config the value to
* "-1 days", so here didn't validate the value.
* See: io.delta.standalone.internal.MetadataCleanup#cleanUpExpiredLogs().
*/
public static final TableConfig<Long> LOG_RETENTION = new TableConfig<>(
"delta.logRetentionDuration",
"interval 30 days",
(engineOpt, v) -> IntervalParserUtils.safeParseIntervalAsMillis(v),
value -> true,
"needs to be provided as a calendar interval such as '2 weeks'. Months "
+ "and years are not accepted. You may specify '365 days' for a year instead."
);

/** Whether to clean up expired checkpoints and delta logs. */
public static final TableConfig<Boolean> ENABLE_EXPIRED_LOG_CLEANUP = new TableConfig<>(
"delta.enableExpiredLogCleanup",
"true",
(engineOpt, v) -> Boolean.valueOf(v),
value -> true,
"needs to be a boolean."
);

/**
* This table property is used to track the enablement of the {@code inCommitTimestamps}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
Expand Down Expand Up @@ -224,6 +225,42 @@ public void checkpoint(Engine engine, long version) throws TableNotFoundExceptio
logger.info("{}: Last checkpoint metadata file is written for version: {}", tablePath, version);

logger.info("{}: Finished checkpoint for version: {}", tablePath, version);
doLogCleanup(engine, checkpointMetaData, snapshot);
}

private void doLogCleanup(Engine engine, CheckpointMetaData checkpointMetaData,
SnapshotImpl snapshot) throws IOException {
Metadata metadata = snapshot.getMetadata();
Boolean enableExpireLogCleanup = TableConfig.ENABLE_EXPIRED_LOG_CLEANUP
.fromMetadata(engine, metadata);
if (enableExpireLogCleanup) {
Long retentionMillis = TableConfig.LOG_RETENTION.fromMetadata(engine, metadata);
Long fileCutOffTime = System.currentTimeMillis() - retentionMillis;
logger.info("{}: Starting the deletion of log files older than {}",
tablePath, fileCutOffTime);
int numDeleted = 0;
try(CloseableIterator<FileStatus> files =
listExpiredDeltaLogs(engine, checkpointMetaData, fileCutOffTime)) {
while (files.hasNext()) {
if(engine.getFileSystemClient().delete(files.next().getPath())) {
numDeleted++;
}
}
}
logger.info("{}: Deleted {} log files older than {}",
tablePath, numDeleted, fileCutOffTime);
}
}

private CloseableIterator<FileStatus> listExpiredDeltaLogs(Engine engine,
CheckpointMetaData checkpointMetaData, Long fileCutOffTime) throws IOException {
long threshold = checkpointMetaData.version - 1;
return engine.getFileSystemClient()
.listFrom(FileNames.checkpointPrefix(logPath, 0).toUri().getPath())
.filter(f -> (FileNames.isCheckpointFile(f.getPath()) ||
FileNames.isCommitFile(f.getPath()))
&& FileNames.getFileVersion(new Path(f.getPath())) <= threshold
&& f.getModificationTime() <= fileCutOffTime);
}

////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ public static Path v2CheckpointSidecarFile(Path path, String uuid) {
return new Path(String.format("%s/_sidecars/%s.parquet", path.toString(), uuid));
}

/**
* Returns the prefix of all checkpoint files for the given version.
*
* Intended for use with listFrom to get all files from this version onwards. The returned Path
* will not exist as a file.
*/
public static Path checkpointPrefix(Path path, long version) {
return new Path(path, String.format("%020d.checkpoint", version));
}

/**
* Returns the paths for all parts of the checkpoint up to the given version.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,7 @@ trait BaseMockFileSystemClient extends FileSystemClient {

override def mkdirs(path: String): Boolean =
throw new UnsupportedOperationException("not supported in this test suite")

override def delete(path: String): Boolean =
throw new UnsupportedOperationException("not supported in this test suite")
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ public boolean mkdirs(String path) throws IOException {
return fs.mkdirs(pathObject);
}

@Override
public boolean delete(String path) throws IOException {
Path pathObject = new Path(path);
FileSystem fs = pathObject.getFileSystem(hadoopConf);
return fs.delete(pathObject, false);
}

private ByteArrayInputStream getStream(String filePath, int offset, int size) {
Path path = new Path(filePath);
try {
Expand Down

0 comments on commit cdbd5b2

Please sign in to comment.