Skip to content

Commit

Permalink
fix checkstyle.
Browse files Browse the repository at this point in the history
  • Loading branch information
horizonzy committed Aug 19, 2024
1 parent de84974 commit ac54876
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ public class TableConfig<T> {
* The shortest duration we have to keep logically deleted data files around before deleting them
* physically. This is to prevent failures in stale readers after compactions or partition
* overwrites.
* <p>
* Note: this value should be large enough: - It should be larger than the longest possible
*
* <p>Note: this value should be large enough: - It should be larger than the longest possible
* duration of a job if you decide to run "VACUUM" when there are concurrent readers or writers
* accessing the table. - If you are running a streaming query reading from the table, you should
* make sure the query doesn't stop longer than this value. Otherwise, the query may not be able
* to restart as it still needs to read old files.
* <p>
* We didn't validate the value is greater than 0. In standalone lib, the log expire time is based
* on day, so if we want to clean the log immediately, we need to config the value to "-1 days",
* so here didn't validate the value. See:
*
* <p>We didn't validate the value is greater than 0. In standalone lib, the log expire time is
* based on day, so if we want to clean the log immediately, we need to config the value to "-1
* days", so here didn't validate the value. See:
* io.delta.standalone.internal.MetadataCleanup#cleanUpExpiredLogs().
*/
public static final TableConfig<Long> LOG_RETENTION =
Expand All @@ -99,20 +99,16 @@ public class TableConfig<T> {
(engineOpt, v) -> IntervalParserUtils.safeParseIntervalAsMillis(v),
value -> true,
"needs to be provided as a calendar interval such as '2 weeks'. Months "
+ "and years are not accepted. You may specify '365 days' for a year instead."
);
+ "and years are not accepted. You may specify '365 days' for a year instead.");

/**
* Whether to clean up expired checkpoints and delta logs.
*/
/** Whether to clean up expired checkpoints and delta logs. */
public static final TableConfig<Boolean> ENABLE_EXPIRED_LOG_CLEANUP =
new TableConfig<>(
"delta.enableExpiredLogCleanup",
"true",
(engineOpt, v) -> Boolean.valueOf(v),
value -> true,
"needs to be a boolean."
);
"needs to be a boolean.");

/**
* This table property is used to track the enablement of the {@code inCommitTimestamps}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,20 @@ public void checkpoint(Engine engine, long version) throws TableNotFoundExceptio
doLogCleanup(engine, checkpointMetaData, snapshot);
}

private void doLogCleanup(Engine engine, CheckpointMetaData checkpointMetaData,
SnapshotImpl snapshot) throws IOException {
private void doLogCleanup(
Engine engine, CheckpointMetaData checkpointMetaData, SnapshotImpl snapshot)
throws IOException {
Metadata metadata = snapshot.getMetadata();
Boolean enableExpireLogCleanup = TableConfig.ENABLE_EXPIRED_LOG_CLEANUP.fromMetadata(engine,
metadata);
Boolean enableExpireLogCleanup =
TableConfig.ENABLE_EXPIRED_LOG_CLEANUP.fromMetadata(engine, metadata);
if (enableExpireLogCleanup) {
Long retentionMillis = TableConfig.LOG_RETENTION.fromMetadata(engine, metadata);
Long fileCutOffTime = System.currentTimeMillis() - retentionMillis;
logger.info("{}: Starting the deletion of log files older than {}", tablePath,
fileCutOffTime);
logger.info(
"{}: Starting the deletion of log files older than {}", tablePath, fileCutOffTime);
int numDeleted = 0;
try (CloseableIterator<FileStatus> files = listExpiredDeltaLogs(engine, checkpointMetaData,
fileCutOffTime)) {
try (CloseableIterator<FileStatus> files =
listExpiredDeltaLogs(engine, checkpointMetaData, fileCutOffTime)) {
while (files.hasNext()) {
if (engine.getFileSystemClient().delete(files.next().getPath())) {
numDeleted++;
Expand All @@ -251,14 +252,18 @@ private void doLogCleanup(Engine engine, CheckpointMetaData checkpointMetaData,
}
}

private CloseableIterator<FileStatus> listExpiredDeltaLogs(Engine engine,
CheckpointMetaData checkpointMetaData, Long fileCutOffTime) throws IOException {
private CloseableIterator<FileStatus> listExpiredDeltaLogs(
Engine engine, CheckpointMetaData checkpointMetaData, Long fileCutOffTime)
throws IOException {
long threshold = checkpointMetaData.version - 1;
return engine.getFileSystemClient()
.listFrom(FileNames.checkpointPrefix(logPath, 0).toUri().getPath()).filter(
f -> (FileNames.isCheckpointFile(f.getPath()) || FileNames.isCommitFile(f.getPath()))
&& FileNames.getFileVersion(new Path(f.getPath())) <= threshold
&& f.getModificationTime() <= fileCutOffTime);
return engine
.getFileSystemClient()
.listFrom(FileNames.checkpointPrefix(logPath, 0).toUri().getPath())
.filter(
f ->
(FileNames.isCheckpointFile(f.getPath()) || FileNames.isCommitFile(f.getPath()))
&& FileNames.getFileVersion(new Path(f.getPath())) <= threshold
&& f.getModificationTime() <= fileCutOffTime);
}

////////////////////
Expand Down

0 comments on commit ac54876

Please sign in to comment.