From a061e5adb529377ab0287ef25b5904c24afa4349 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Thu, 12 Sep 2024 14:04:14 -0700 Subject: [PATCH] updates --- .../internal/snapshot/MetadataCleanup.java | 91 +++++++++++++++++++ .../internal/snapshot/SnapshotManager.java | 42 +-------- .../delta/kernel/internal/util/FileNames.java | 9 -- 3 files changed, 93 insertions(+), 49 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/MetadataCleanup.java diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/MetadataCleanup.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/MetadataCleanup.java new file mode 100644 index 0000000000..8556260c1a --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/MetadataCleanup.java @@ -0,0 +1,91 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.snapshot; + +import static io.delta.kernel.internal.TableConfig.ENABLE_EXPIRED_LOG_CLEANUP; +import static io.delta.kernel.internal.TableConfig.LOG_RETENTION; + +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetadataCleanup { + private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); + + public MetadataCleanup() {} + + public static void doLogCleanup(Engine engine, Path tablePath, Metadata metadata) + throws IOException { + Boolean enableExpireLogCleanup = ENABLE_EXPIRED_LOG_CLEANUP.fromMetadata(engine, metadata); + if (!enableExpireLogCleanup) { + logger.info( + "{}: Log cleanup is disabled. Skipping the deletion of expired log files", tablePath); + return; + } + + List potentialLogFilesToDelete = new ArrayList<>(); + + long retentionMillis = LOG_RETENTION.fromMetadata(engine, metadata); + long fileCutOffTime = System.currentTimeMillis() - retentionMillis; + logger.info("{}: Starting the deletion of log files older than {}", tablePath, fileCutOffTime); + long numDeleted = 0; + try (CloseableIterator files = listExpiredDeltaLogs(engine, tablePath)) { + while (files.hasNext()) { + FileStatus nextFile = files.next(); + if (nextFile.getModificationTime() > fileCutOffTime) { + if (!potentialLogFilesToDelete.isEmpty()) { + logger.info( + "{}: Skipping deletion of expired log files {}, because there is no checkpoint " + + "file that indicates that the log files are no longer needed. ", + tablePath, + potentialLogFilesToDelete.size()); + } + break; + } + + if (FileNames.isCheckpointFile(nextFile.getPath())) { + // we have encountered a checkpoint file, now we can delete all the log files + // in `potentialLogFilesToDelete` list + for (String logFile : potentialLogFilesToDelete) { + if (engine.getFileSystemClient().delete(logFile)) { + numDeleted++; + } + } + } else if (FileNames.isCommitFile(nextFile.getPath())) { + // Add it the potential delta log files to delete list. We can't delete these + // files until we encounter a checkpoint later that indicates that the log files + // are no longer needed. + potentialLogFilesToDelete.add(nextFile.getPath()); + } + } + } + logger.info("{}: Deleted {} log files older than {}", tablePath, numDeleted, fileCutOffTime); + } + + private static CloseableIterator listExpiredDeltaLogs(Engine engine, Path tablePath) + throws IOException { + Path logPath = new Path(tablePath, "_delta_log"); + return engine.getFileSystemClient().listFrom(FileNames.listingPrefix(logPath, 0)); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 3a6832df3f..e4e5e3fb54 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -21,6 +21,7 @@ import static io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBefore; import static io.delta.kernel.internal.fs.Path.getName; import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable; +import static io.delta.kernel.internal.snapshot.MetadataCleanup.doLogCleanup; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static java.lang.String.format; @@ -31,7 +32,6 @@ import io.delta.kernel.exceptions.InvalidTableException; import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.*; -import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.checkpoints.*; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.ListUtils; @@ -232,45 +232,7 @@ public void checkpoint(Engine engine, long version) throws TableNotFoundExceptio logger.info("{}: Last checkpoint metadata file is written for version: {}", tablePath, version); logger.info("{}: Finished checkpoint for version: {}", tablePath, version); - doLogCleanup(engine, checkpointMetaData, snapshot); - } - - private void doLogCleanup( - Engine engine, CheckpointMetaData checkpointMetaData, SnapshotImpl snapshot) - throws IOException { - Metadata metadata = snapshot.getMetadata(); - Boolean enableExpireLogCleanup = - TableConfig.ENABLE_EXPIRED_LOG_CLEANUP.fromMetadata(engine, metadata); - if (enableExpireLogCleanup) { - Long retentionMillis = TableConfig.LOG_RETENTION.fromMetadata(engine, metadata); - Long fileCutOffTime = System.currentTimeMillis() - retentionMillis; - logger.info( - "{}: Starting the deletion of log files older than {}", tablePath, fileCutOffTime); - int numDeleted = 0; - try (CloseableIterator files = - listExpiredDeltaLogs(engine, checkpointMetaData, fileCutOffTime)) { - while (files.hasNext()) { - if (engine.getFileSystemClient().delete(files.next().getPath())) { - numDeleted++; - } - } - } - logger.info("{}: Deleted {} log files older than {}", tablePath, numDeleted, fileCutOffTime); - } - } - - private CloseableIterator listExpiredDeltaLogs( - Engine engine, CheckpointMetaData checkpointMetaData, Long fileCutOffTime) - throws IOException { - long threshold = checkpointMetaData.version - 1; - return engine - .getFileSystemClient() - .listFrom(FileNames.checkpointPrefix(logPath, 0).toUri().getPath()) - .filter( - f -> - (FileNames.isCheckpointFile(f.getPath()) || FileNames.isCommitFile(f.getPath())) - && FileNames.getFileVersion(new Path(f.getPath())) <= threshold - && f.getModificationTime() <= fileCutOffTime); + doLogCleanup(engine, tablePath, snapshot.getMetadata()); } //////////////////// diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java index 329b5e2127..93c554c9e8 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java @@ -109,15 +109,6 @@ public static Path v2CheckpointSidecarFile(Path path, String uuid) { return new Path(String.format("%s/_sidecars/%s.parquet", path.toString(), uuid)); } - /** - * Returns the prefix of all checkpoint files for the given version. Intended for use with - * listFrom to get all files from this version onwards. The returned Path will not exist as a - * file. - */ - public static Path checkpointPrefix(Path path, long version) { - return new Path(path, String.format("%020d.checkpoint", version)); - } - /** * Returns the paths for all parts of the checkpoint up to the given version. *