Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Sep 12, 2024
1 parent e46bb83 commit a061e5a
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.snapshot;

import static io.delta.kernel.internal.TableConfig.ENABLE_EXPIRED_LOG_CLEANUP;
import static io.delta.kernel.internal.TableConfig.LOG_RETENTION;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataCleanup {
private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);

public MetadataCleanup() {}

public static void doLogCleanup(Engine engine, Path tablePath, Metadata metadata)
throws IOException {
Boolean enableExpireLogCleanup = ENABLE_EXPIRED_LOG_CLEANUP.fromMetadata(engine, metadata);
if (!enableExpireLogCleanup) {
logger.info(
"{}: Log cleanup is disabled. Skipping the deletion of expired log files", tablePath);
return;
}

List<String> potentialLogFilesToDelete = new ArrayList<>();

long retentionMillis = LOG_RETENTION.fromMetadata(engine, metadata);
long fileCutOffTime = System.currentTimeMillis() - retentionMillis;
logger.info("{}: Starting the deletion of log files older than {}", tablePath, fileCutOffTime);
long numDeleted = 0;
try (CloseableIterator<FileStatus> files = listExpiredDeltaLogs(engine, tablePath)) {
while (files.hasNext()) {
FileStatus nextFile = files.next();
if (nextFile.getModificationTime() > fileCutOffTime) {
if (!potentialLogFilesToDelete.isEmpty()) {
logger.info(
"{}: Skipping deletion of expired log files {}, because there is no checkpoint "
+ "file that indicates that the log files are no longer needed. ",
tablePath,
potentialLogFilesToDelete.size());
}
break;
}

if (FileNames.isCheckpointFile(nextFile.getPath())) {
// we have encountered a checkpoint file, now we can delete all the log files
// in `potentialLogFilesToDelete` list
for (String logFile : potentialLogFilesToDelete) {
if (engine.getFileSystemClient().delete(logFile)) {
numDeleted++;
}
}
} else if (FileNames.isCommitFile(nextFile.getPath())) {
// Add it the potential delta log files to delete list. We can't delete these
// files until we encounter a checkpoint later that indicates that the log files
// are no longer needed.
potentialLogFilesToDelete.add(nextFile.getPath());
}
}
}
logger.info("{}: Deleted {} log files older than {}", tablePath, numDeleted, fileCutOffTime);
}

private static CloseableIterator<FileStatus> listExpiredDeltaLogs(Engine engine, Path tablePath)
throws IOException {
Path logPath = new Path(tablePath, "_delta_log");
return engine.getFileSystemClient().listFrom(FileNames.listingPrefix(logPath, 0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBefore;
import static io.delta.kernel.internal.fs.Path.getName;
import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable;
import static io.delta.kernel.internal.snapshot.MetadataCleanup.doLogCleanup;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.lang.String.format;

Expand All @@ -31,7 +32,6 @@
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
Expand Down Expand Up @@ -232,45 +232,7 @@ public void checkpoint(Engine engine, long version) throws TableNotFoundExceptio
logger.info("{}: Last checkpoint metadata file is written for version: {}", tablePath, version);

logger.info("{}: Finished checkpoint for version: {}", tablePath, version);
doLogCleanup(engine, checkpointMetaData, snapshot);
}

private void doLogCleanup(
Engine engine, CheckpointMetaData checkpointMetaData, SnapshotImpl snapshot)
throws IOException {
Metadata metadata = snapshot.getMetadata();
Boolean enableExpireLogCleanup =
TableConfig.ENABLE_EXPIRED_LOG_CLEANUP.fromMetadata(engine, metadata);
if (enableExpireLogCleanup) {
Long retentionMillis = TableConfig.LOG_RETENTION.fromMetadata(engine, metadata);
Long fileCutOffTime = System.currentTimeMillis() - retentionMillis;
logger.info(
"{}: Starting the deletion of log files older than {}", tablePath, fileCutOffTime);
int numDeleted = 0;
try (CloseableIterator<FileStatus> files =
listExpiredDeltaLogs(engine, checkpointMetaData, fileCutOffTime)) {
while (files.hasNext()) {
if (engine.getFileSystemClient().delete(files.next().getPath())) {
numDeleted++;
}
}
}
logger.info("{}: Deleted {} log files older than {}", tablePath, numDeleted, fileCutOffTime);
}
}

private CloseableIterator<FileStatus> listExpiredDeltaLogs(
Engine engine, CheckpointMetaData checkpointMetaData, Long fileCutOffTime)
throws IOException {
long threshold = checkpointMetaData.version - 1;
return engine
.getFileSystemClient()
.listFrom(FileNames.checkpointPrefix(logPath, 0).toUri().getPath())
.filter(
f ->
(FileNames.isCheckpointFile(f.getPath()) || FileNames.isCommitFile(f.getPath()))
&& FileNames.getFileVersion(new Path(f.getPath())) <= threshold
&& f.getModificationTime() <= fileCutOffTime);
doLogCleanup(engine, tablePath, snapshot.getMetadata());
}

////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,6 @@ public static Path v2CheckpointSidecarFile(Path path, String uuid) {
return new Path(String.format("%s/_sidecars/%s.parquet", path.toString(), uuid));
}

/**
* Returns the prefix of all checkpoint files for the given version. Intended for use with
* listFrom to get all files from this version onwards. The returned Path will not exist as a
* file.
*/
public static Path checkpointPrefix(Path path, long version) {
return new Path(path, String.format("%020d.checkpoint", version));
}

/**
* Returns the paths for all parts of the checkpoint up to the given version.
*
Expand Down

0 comments on commit a061e5a

Please sign in to comment.