From 9b93435eb455a3f06dea9b701a1bf07cc92954b1 Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Wed, 2 Oct 2024 14:10:59 +0200 Subject: [PATCH] Persistence: purge unreferenced `Obj`s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is an attempt to implement the algorithm mentioned in the PR #9401. The `Obj.referenced()` attribute contains the timestamp when the object was last "referenced" (aka: attempted to be written). It is ... * set when an object is first persisted via a `storeObj()` * updated in the database, when an object was not persisted via `storeObj()` * set/updated via `upsertObj()` * updated via `updateConditional()` Let's assume that there is a mechanism to identify the IDs of all referenced objects (it would be very similar to what the export functionality does). The algorithm to purge unreferenced objects must never delete an object that is referenced at any point of time, and must consider the case that an object that was unreferenced when a purge-unreferenced-objects routine started, but became referenced while it is running. An approach could work as follows: 1. Memoize the current timestamp (minus some wall-clock drift adjustment). 2. Identify the IDs of all referenced objects. We could leverage a bloom filter, if the set of IDs is big. 3. Then scan all objects in the repository. Objects can be purged, if ...     * the ID is not in the set (or bloom filter) generated in step 2 ...     * _AND_ have a `referenced` timestamp less than the memoized timestamp. Any deletion in the backing database would follow the meaning of this pseudo SQL: `DELETE FROM objs WHERE obj_id = :objId AND referenced < :memoizedTimestamp`. Noting, that the `referenced` attribute is rather incorrect when retrieved from the objects cache (aka: during normal operations), which is not a problem, because that `referenced` attribute is irrelevant for production accesses. There are two edge cases / race conditions: * (for some backends): A `storeObj()` operation detected that the object already exists - then the purge routine deletes that object - and then the `storeObj()` tries to upddate the `referenced` attribute. The result is the loss of that object. This race condition can only occur, if the object existed but was not referenced. * While the referenced objects are being identified, create a new named reference (branch / tag) pointing to commit(s) that would be identified as unreferenced and being later purged. --- bom/build.gradle.kts | 1 + gradle/projects.main.properties | 1 + versioned/storage/cleanup/build.gradle.kts | 53 +++ .../versioned/storage/cleanup/Cleanup.java | 78 ++++ .../storage/cleanup/CleanupParams.java | 119 ++++++ .../MustRestartWithBiggerFilterException.java | 27 ++ .../storage/cleanup/PurgeFilter.java | 66 ++++ .../storage/cleanup/PurgeObjects.java | 23 ++ .../storage/cleanup/PurgeObjectsContext.java | 49 +++ .../storage/cleanup/PurgeObjectsImpl.java | 101 +++++ .../storage/cleanup/PurgeResult.java | 23 ++ .../versioned/storage/cleanup/PurgeStats.java | 33 ++ .../storage/cleanup/PurgeStatsBuilder.java | 34 ++ .../versioned/storage/cleanup/RateLimit.java | 50 +++ .../cleanup/ReferencedObjectsContext.java | 55 +++ .../cleanup/ReferencedObjectsFilter.java | 41 ++ .../cleanup/ReferencedObjectsFilterImpl.java | 168 +++++++++ .../cleanup/ReferencedObjectsResolver.java | 23 ++ .../ReferencedObjectsResolverImpl.java | 349 ++++++++++++++++++ .../storage/cleanup/ResolveResult.java | 25 ++ .../storage/cleanup/ResolveStats.java | 49 +++ .../storage/cleanup/ResolveStatsBuilder.java | 54 +++ .../storage/cleanup/VisitedCommitFilter.java | 46 +++ .../cleanup/VisitedCommitFilterImpl.java | 74 ++++ .../TestReferencedObjectsFilterImpl.java | 106 ++++++ .../TestReferencedObjectsResolverImpl.java | 220 +++++++++++ 26 files changed, 1868 insertions(+) create mode 100644 versioned/storage/cleanup/build.gradle.kts create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java create mode 100644 versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java create mode 100644 versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsResolverImpl.java diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 7560a54b459..200d7bd9056 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -88,6 +88,7 @@ dependencies { api(project(":nessie-versioned-storage-cassandra-tests")) api(project(":nessie-versioned-storage-cassandra2")) api(project(":nessie-versioned-storage-cassandra2-tests")) + api(project(":nessie-versioned-storage-cleanup")) api(project(":nessie-versioned-storage-common")) api(project(":nessie-versioned-storage-common-proto")) api(project(":nessie-versioned-storage-common-serialize")) diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 558a5a261b1..1d7c24b90c1 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -71,6 +71,7 @@ nessie-versioned-storage-cassandra=versioned/storage/cassandra nessie-versioned-storage-cassandra-tests=versioned/storage/cassandra-tests nessie-versioned-storage-cassandra2=versioned/storage/cassandra2 nessie-versioned-storage-cassandra2-tests=versioned/storage/cassandra2-tests +nessie-versioned-storage-cleanup=versioned/storage/cleanup nessie-versioned-storage-common=versioned/storage/common nessie-versioned-storage-common-proto=versioned/storage/common-proto nessie-versioned-storage-common-serialize=versioned/storage/common-serialize diff --git a/versioned/storage/cleanup/build.gradle.kts b/versioned/storage/cleanup/build.gradle.kts new file mode 100644 index 00000000000..6b782dd6b46 --- /dev/null +++ b/versioned/storage/cleanup/build.gradle.kts @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id("nessie-conventions-server") } + +publishingHelper { mavenName = "Nessie - Storage - Cleanup unreferenced objects" } + +description = "Identify and purge unreferenced objects in the Nessie repository." + +dependencies { + implementation(project(":nessie-model")) + implementation(project(":nessie-versioned-storage-common")) + implementation(project(":nessie-versioned-spi")) + implementation(project(":nessie-versioned-transfer-related")) + + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.microprofile.openapi) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + + compileOnly(libs.errorprone.annotations) + implementation(libs.guava) + implementation(libs.agrona) + implementation(libs.slf4j.api) + + compileOnly(project(":nessie-versioned-storage-testextension")) + + compileOnly(project(":nessie-immutables")) + annotationProcessor(project(":nessie-immutables", configuration = "processor")) + + testImplementation(project(":nessie-versioned-storage-testextension")) + testImplementation(project(":nessie-versioned-storage-inmemory")) + testImplementation(project(":nessie-versioned-tests")) + testImplementation(project(path = ":nessie-protobuf-relocated", configuration = "shadow")) + testImplementation(platform(libs.junit.bom)) + testImplementation(libs.bundles.junit.testing) + testRuntimeOnly(libs.logback.classic) +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java new file mode 100644 index 00000000000..905c83295f3 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.PurgeFilter.ReferencedObjectsPurgeFilter.referencedObjectsPurgeFilter; +import static org.projectnessie.versioned.storage.cleanup.ReferencedObjectsContext.objectsResolverContext; + +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.Persist; + +public class Cleanup { + private final CleanupParams objectsResolverParams; + + private Cleanup(CleanupParams cleanupParams) { + this.objectsResolverParams = cleanupParams; + } + + public static Cleanup createCleanup(CleanupParams params) { + return new Cleanup(params); + } + + /** + * Create the context holder used when identifying referenced objects and purging unreferenced + * objects. + * + *

Choosing an appropriate value for {@code maxObjReferenced} is crucial. Technically, this + * value must be at max the current timestamp - but logically {@code maxObjReferenced} should be + * the timestamp of a few days ago to not delete unreferenced objects too early and give users a + * chance to reset branches to another commit ID in case some table/view metadata is broken. + * + * @param persist the persistence/repository to run against + * @param maxObjReferenced only {@link Obj}s with a {@link Obj#referenced()} older than {@code + * maxObjReferenced} will be deleted. Production workloads should set this to something like + * "now minus 7 days" to have the chance to reset branches, just in case. Technically, this + * value must not be greater than "now". "Now" should be inquired using {@code + * Persist.config().clock().instant()}. + */ + public ReferencedObjectsContext buildReferencedObjectsContext( + Persist persist, long maxObjReferenced) { + var referencedObjects = new ReferencedObjectsFilterImpl(objectsResolverParams); + var purgeFilter = referencedObjectsPurgeFilter(referencedObjects, maxObjReferenced); + return objectsResolverContext(persist, objectsResolverParams, referencedObjects, purgeFilter); + } + + /** + * Creates a new objects-resolver instance to identify referenced objects, which must be + * retained. + * + * @param objectsResolverContext context, preferably created using {@link + * #buildReferencedObjectsContext(Persist, long)} + */ + public ReferencedObjectsResolver createReferencedObjectsResolver( + ReferencedObjectsContext objectsResolverContext) { + return new ReferencedObjectsResolverImpl(objectsResolverContext); + } + + /** + * Creates a new objects-purger instance to delete unreferenced objects. + * + * @param purgeObjectsContext return value of {@link ReferencedObjectsResolver#resolve()}. + */ + public PurgeObjects createPurgeObjects(PurgeObjectsContext purgeObjectsContext) { + return new PurgeObjectsImpl(purgeObjectsContext); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java new file mode 100644 index 00000000000..b89521cdb67 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REFS; +import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REPO; + +import java.util.List; +import java.util.function.IntFunction; +import org.immutables.value.Value; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.transfer.related.TransferRelatedObjects; + +@NessieImmutable +public interface CleanupParams { + // Following defaults result in a serialized bloom filter size of about 3000000 bytes. + long DEFAULT_EXPECTED_OBJ_COUNT = 1_000_000L; + double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.00001d; + double DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY = 0.0001d; + boolean DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS = false; + int DEFAULT_PENDING_OBJS_BATCH_SIZE = 20; + int DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE = 100_000; + + static ImmutableCleanupParams.Builder builder() { + return ImmutableCleanupParams.builder(); + } + + @Value.Default + default long expectedObjCount() { + return DEFAULT_EXPECTED_OBJ_COUNT; + } + + @Value.Default + default double falsePositiveProbability() { + return DEFAULT_FALSE_POSITIVE_PROBABILITY; + } + + @Value.Default + default double allowedFalsePositiveProbability() { + return DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY; + } + + @Value.Default + default TransferRelatedObjects relatedObjects() { + return new TransferRelatedObjects() {}; + } + + @Value.Default + default boolean allowDuplicateCommitTraversals() { + return DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS; + } + + @Value.Default + default int resolveCommitRatePerSecond() { + return Integer.MAX_VALUE; + } + + @Value.Default + default int resolveObjRatePerSecond() { + return Integer.MAX_VALUE; + } + + @Value.Default + default int purgeScanObjRatePerSecond() { + return Integer.MAX_VALUE; + } + + @Value.Default + default int purgeDeleteObjRatePerSecond() { + return Integer.MAX_VALUE; + } + + @Value.Default + default int pendingObjsBatchSize() { + return DEFAULT_PENDING_OBJS_BATCH_SIZE; + } + + /** + * Size of the "recent object IDs" filter to prevent processing the same {@link ObjId}s. This * + * happens, when the values referenced from the commit index are iterated, because it iterates * + * over all keys, not only the keys added by a particular commit. + * + *

The value defaults to {@value #DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE}. It should be higher than + * the maximum number of keys in a commit. + */ + @Value.Default + default int recentObjIdsFilterSize() { + return DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE; + } + + @Value.Default + default IntFunction rateLimitFactory() { + return RateLimit::create; + } + + @Value.Default + default List internalReferenceNames() { + return List.of(REF_REFS.name(), REF_REPO.name()); + } + + @Value.Default + default boolean dryRun() { + return false; + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java new file mode 100644 index 00000000000..8762fb740cb --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +/** + * Thrown when the bloom filter's FPP is above the configured threshold when adding IDs. If this + * exception is encountered, the current garbage-collection run must be aborted and + * restarted with a bigger {@link CleanupParams#expectedObjCount()} value. + */ +public class MustRestartWithBiggerFilterException extends RuntimeException { + public MustRestartWithBiggerFilterException(String msg) { + super(msg); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java new file mode 100644 index 00000000000..99f59602fc4 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import jakarta.validation.constraints.NotNull; +import java.util.List; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.Obj; + +public interface PurgeFilter { + boolean mustKeep(@NotNull Obj obj); + + @NessieImmutable + interface CompositePurgeFilter extends PurgeFilter { + List filters(); + + static CompositePurgeFilter compositePurgeFilter(PurgeFilter... filters) { + return ImmutableCompositePurgeFilter.of(List.of(filters)); + } + + static CompositePurgeFilter compositePurgeFilter(List filters) { + return ImmutableCompositePurgeFilter.of(filters); + } + + @Override + default boolean mustKeep(Obj obj) { + for (PurgeFilter filter : filters()) { + if (filter.mustKeep(obj)) { + return true; + } + } + return false; + } + } + + @NessieImmutable + interface ReferencedObjectsPurgeFilter extends PurgeFilter { + ReferencedObjectsFilter referencedObjects(); + + long maxObjReferenced(); + + static ReferencedObjectsPurgeFilter referencedObjectsPurgeFilter( + ReferencedObjectsFilter referencedObjects, long maxObjReferenced) { + return ImmutableReferencedObjectsPurgeFilter.of(referencedObjects, maxObjReferenced); + } + + @Override + default boolean mustKeep(Obj obj) { + return obj.referenced() > maxObjReferenced() + || referencedObjects().isProbablyReferenced(obj.id()); + } + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java new file mode 100644 index 00000000000..fd2cc9f9fe0 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +public interface PurgeObjects { + PurgeResult purge(); + + /** Return the current statistics, works even if {@link #purge()} throws an exception. */ + PurgeStats getStats(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java new file mode 100644 index 00000000000..19d166b7c7f --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.Persist; + +@NessieImmutable +public interface PurgeObjectsContext { + @NotNull + Persist persist(); + + @NotNull + ReferencedObjectsFilter referencedObjects(); + + @NotNull + PurgeFilter purgeFilter(); + + int scanObjRatePerSecond(); + + int deleteObjRatePerSecond(); + + static PurgeObjectsContext purgeObjectsContext( + ReferencedObjectsContext referencedObjectsContext) { + return ImmutablePurgeObjectsContext.of( + referencedObjectsContext.persist(), + referencedObjectsContext.referencedObjects(), + referencedObjectsContext.purgeFilter(), + referencedObjectsContext.params().purgeScanObjRatePerSecond(), + referencedObjectsContext.params().purgeDeleteObjRatePerSecond(), + referencedObjectsContext.params().dryRun()); + } + + boolean dryRun(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java new file mode 100644 index 00000000000..7bdbcb03297 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import org.projectnessie.versioned.storage.common.persist.CloseableIterator; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class PurgeObjectsImpl implements PurgeObjects { + private static final Logger LOGGER = LoggerFactory.getLogger(PurgeObjectsImpl.class); + + private final PurgeObjectsContext purgeObjectsContext; + private final PurgeStatsBuilder stats; + private final AtomicBoolean used = new AtomicBoolean(); + + public PurgeObjectsImpl(PurgeObjectsContext purgeObjectsContext) { + this.purgeObjectsContext = purgeObjectsContext; + this.stats = new PurgeStatsBuilder(); + } + + @Override + public PurgeResult purge() { + checkState(used.compareAndSet(false, true), "resolve() has already been called."); + + var scanRateLimiter = RateLimit.create(purgeObjectsContext.scanObjRatePerSecond()); + var purgeRateLimiter = RateLimit.create(purgeObjectsContext.deleteObjRatePerSecond()); + var purgeFilter = purgeObjectsContext.purgeFilter(); + var persist = purgeObjectsContext.persist(); + var clock = persist.config().clock(); + + LOGGER.debug( + "Purging unreferenced objects in repository '{}', scanning {} objects per second, deleting {} objects per second", + persist.config().repositoryId(), + scanRateLimiter, + purgeRateLimiter); + + stats.started = clock.instant(); + + try (CloseableIterator iter = persist.scanAllObjects(Set.of())) { + scanRateLimiter.acquire(); + + while (iter.hasNext()) { + stats.numScannedObjs++; + var obj = iter.next(); + if (purgeFilter.mustKeep(obj)) { + continue; + } + + purgeRateLimiter.acquire(); + purgeObj(obj); + } + } catch (RuntimeException e) { + stats.failure = e; + throw e; + } finally { + stats.ended = clock.instant(); + LOGGER.debug( + "Finished purging unreferenced objects in repository '{}', {} of {} objects deleted", + persist.config().repositoryId(), + stats.numScannedObjs, + stats.numPurgedObjs); + } + + return ImmutablePurgeResult.of(stats.build()); + } + + @Override + public PurgeStats getStats() { + return stats.build(); + } + + private void purgeObj(Obj obj) { + // TODO delete in parallel (multiple threads) + stats.numPurgedObjs++; + + LOGGER.trace( + "Deleting obj {} of type {}/{}", obj.id(), obj.type().name(), obj.type().shortName()); + + if (!purgeObjectsContext.dryRun()) { + purgeObjectsContext.persist().deleteWithReferenced(obj); + } + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java new file mode 100644 index 00000000000..2e94a6e237b --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface PurgeResult { + PurgeStats stats(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java new file mode 100644 index 00000000000..5fbcfe1f404 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.util.Optional; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface PurgeStats { + Instant started(); + + Instant ended(); + + Optional failure(); + + long numScannedObjs(); + + long numPurgedObjs(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java new file mode 100644 index 00000000000..eb07a42f646 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.util.Optional; + +final class PurgeStatsBuilder { + Instant started; + Instant ended; + + Exception failure; + + long numScannedObjs; + long numPurgedObjs; + + PurgeStats build() { + return ImmutablePurgeStats.of( + started, ended, Optional.ofNullable(failure), numScannedObjs, numPurgedObjs); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java new file mode 100644 index 00000000000..d942c8c6f0e --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import com.google.common.util.concurrent.RateLimiter; + +public interface RateLimit { + void acquire(); + + @SuppressWarnings("UnstableApiUsage") + static RateLimit create(int ratePerSecond) { + if (ratePerSecond <= 0 || ratePerSecond == Integer.MAX_VALUE) { + return new RateLimit() { + @Override + public void acquire() {} + + @Override + public String toString() { + return "unlimited"; + } + }; + } + return new RateLimit() { + final RateLimiter limiter = RateLimiter.create(ratePerSecond); + + @Override + public void acquire() { + limiter.acquire(); + } + + @Override + public String toString() { + return "up to " + ratePerSecond; + } + }; + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java new file mode 100644 index 00000000000..2eb7ed9fb10 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.VisitedCommitFilter.ALLOW_DUPLICATE_TRAVERSALS; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.Persist; + +@NessieImmutable +public interface ReferencedObjectsContext { + @NotNull + Persist persist(); + + @NotNull + ReferencedObjectsFilter referencedObjects(); + + @NotNull + CleanupParams params(); + + @NotNull + PurgeFilter purgeFilter(); + + @NotNull + VisitedCommitFilter visitedCommitFilter(); + + static ReferencedObjectsContext objectsResolverContext( + Persist persist, + CleanupParams params, + ReferencedObjectsFilter referencedObjects, + PurgeFilter purgeFilter) { + return ImmutableReferencedObjectsContext.of( + persist, + referencedObjects, + params, + purgeFilter, + params.allowDuplicateCommitTraversals() + ? ALLOW_DUPLICATE_TRAVERSALS + : new VisitedCommitFilterImpl()); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java new file mode 100644 index 00000000000..81cff05fbb8 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +/** + * Mark {@linkplain ObjId object IDs} as referenced and allow checking whether object IDs are marked + * as referenced. + * + *

The implementation is usually backed by a probabilistic data structure (bloom filter), which + * means that there is a {@linkplain #expectedFpp() chance} that an unreferenced object is not + * collected, but all referenced objects are guaranteed to remain. + */ +public interface ReferencedObjectsFilter { + boolean markReferenced(@NotNull ObjId objId) throws MustRestartWithBiggerFilterException; + + boolean isProbablyReferenced(@NotNull ObjId objId); + + boolean withinExpectedFpp(); + + long approximateElementCount(); + + double expectedFpp(); + + long estimatedHeapPressure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java new file mode 100644 index 00000000000..7b0b5bca457 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import com.google.common.hash.BloomFilter; +import com.google.common.hash.PrimitiveSink; +import java.util.concurrent.atomic.AtomicLong; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +@SuppressWarnings("UnstableApiUsage") +final class ReferencedObjectsFilterImpl implements ReferencedObjectsFilter { + + private final BloomFilter filter; + private final double allowedFalsePositiveProbability; + private final AtomicLong remainingElements; + private final long estimatedHeapPressure; + + ReferencedObjectsFilterImpl(CleanupParams params) { + this.filter = createBloomFilter(params); + this.remainingElements = new AtomicLong(params.expectedObjCount()); + this.allowedFalsePositiveProbability = params.allowedFalsePositiveProbability(); + this.estimatedHeapPressure = calculateEstimatedHeapPressure(params); + } + + static BloomFilter createBloomFilter(CleanupParams params) { + return BloomFilter.create( + ReferencedObjectsFilterImpl::funnel, + params.expectedObjCount(), + params.falsePositiveProbability()); + } + + private static void funnel(ObjId id, PrimitiveSink primitiveSink) { + var idSize = id.size(); + var i = 0; + for (; idSize >= 8; idSize -= 8) { + primitiveSink.putLong(id.longAt(i++)); + } + i <<= 3; + for (; idSize > 0; idSize--) { + primitiveSink.putByte(id.byteAt(i++)); + } + } + + @Override + public boolean markReferenced(ObjId objId) throws MustRestartWithBiggerFilterException { + if (filter.put(objId)) { + if (remainingElements.decrementAndGet() >= 0L || withinExpectedFpp()) { + return true; + } + throw new MustRestartWithBiggerFilterException( + "Bloom filter exceeded the configured expected FPP"); + } + return false; + } + + @Override + public boolean isProbablyReferenced(ObjId objId) { + return filter.mightContain(objId); + } + + @Override + public boolean withinExpectedFpp() { + return expectedFpp() <= allowedFalsePositiveProbability; + } + + @Override + public long approximateElementCount() { + return filter.approximateElementCount(); + } + + @Override + public double expectedFpp() { + return filter.expectedFpp(); + } + + @Override + public long estimatedHeapPressure() { + return estimatedHeapPressure; + } + + private static long calculateEstimatedHeapPressure(CleanupParams params) { + var bits = optimalNumOfBits(params.expectedObjCount(), params.falsePositiveProbability()); + var arrayLen = bits / 64 + 1; + return HEAP_SIZE_BLOOM_FILTER + + HEAP_SIZE_BIT_ARRAY + + HEAP_SIZE_LONG_ADDER + + HEAP_SIZE_ATOMIC_LONG_ARRAY + + HEAP_SIZE_PRIMITIVE_LONG_ARRAY * arrayLen; + } + + // See com.google.common.hash.BloomFilter.optimalNumOfBits + private static long optimalNumOfBits(long expectedInsertions, double fpp) { + if (fpp == 0) { + fpp = Double.MIN_VALUE; + } + return (long) (-expectedInsertions * Math.log(fpp) / (Math.log(2) * Math.log(2))); + } + + /* + com.google.common.hash.BloomFilter object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 8 (object header: class) N/A + 16 4 int BloomFilter.numHashFunctions N/A + 20 4 (alignment/padding gap) + 24 8 com.google.common.hash.BloomFilterStrategies.LockFreeBitArray BloomFilter.bits N/A + 32 8 com.google.common.hash.Funnel BloomFilter.funnel N/A + 40 8 com.google.common.hash.BloomFilter.Strategy BloomFilter.strategy N/A + Instance size: 48 bytes + Space losses: 4 bytes internal + 0 bytes external = 4 bytes total + */ + private static final long HEAP_SIZE_BLOOM_FILTER = 48L; + /* + com.google.common.hash.BloomFilterStrategies$LockFreeBitArray object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 8 (object header: class) N/A + 16 8 java.util.concurrent.atomic.AtomicLongArray LockFreeBitArray.data N/A + 24 8 com.google.common.hash.LongAddable LockFreeBitArray.bitCount N/A + Instance size: 32 bytes + Space losses: 0 bytes internal + 0 bytes external = 0 bytes total + */ + private static final long HEAP_SIZE_BIT_ARRAY = 32L; + /* + We assume that com.google.common.hash.LongAddables uses the pure-Java implementation, not Guava's + heap-expensive LongAdder implementation based on its Striped64 with 144 bytes per cell. + + java.util.concurrent.atomic.AtomicLong object internals (com.google.common.hash.LongAddables.PureJavaLongAddable): + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 (alignment/padding gap) + 16 8 long AtomicLong.value N/A + 24 8 (object alignment gap) + Instance size: 32 bytes + Space losses: 4 bytes internal + 8 bytes external = 12 bytes total + */ + private static final long HEAP_SIZE_LONG_ADDER = 40L; + /* + java.util.concurrent.atomic.AtomicLongArray object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 (alignment/padding gap) + 16 8 long[] AtomicLongArray.array N/A + 24 8 (object alignment gap) + Instance size: 32 bytes + Space losses: 4 bytes internal + 8 bytes external = 12 bytes total + */ + private static final long HEAP_SIZE_ATOMIC_LONG_ARRAY = 32L; + /* + long[] : 16 + 8*length + */ + private static final long HEAP_SIZE_PRIMITIVE_LONG_ARRAY = 16L; +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java new file mode 100644 index 00000000000..5757ea1e29d --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +public interface ReferencedObjectsResolver { + ResolveResult resolve() throws MustRestartWithBiggerFilterException; + + /** Return the current statistics, works even if {@link #resolve()} throws an exception. */ + ResolveStats getStats(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java new file mode 100644 index 00000000000..2382adfeaf3 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java @@ -0,0 +1,349 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static com.google.common.base.Preconditions.checkState; +import static org.projectnessie.versioned.storage.cleanup.PurgeObjectsContext.purgeObjectsContext; +import static org.projectnessie.versioned.storage.common.logic.CommitLogQuery.commitLogQuery; +import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.indexesLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; +import static org.projectnessie.versioned.storage.common.logic.ReferencesQuery.referencesQuery; +import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.COMMIT; +import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.VALUE; + +import com.google.common.annotations.VisibleForTesting; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.agrona.collections.ObjectHashSet; +import org.projectnessie.model.Content; +import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement; +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; +import org.projectnessie.versioned.storage.common.objtypes.CommitOp; +import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Reference; +import org.projectnessie.versioned.store.DefaultStoreWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ReferencedObjectsResolverImpl implements ReferencedObjectsResolver { + private static final Logger LOGGER = LoggerFactory.getLogger(ReferencedObjectsResolverImpl.class); + + private final ObjectHashSet pendingObjs = new ObjectHashSet<>(); + private final ObjectHashSet pendingCommits = new ObjectHashSet<>(); + + /** + * Set of recently handled 'ObjId's to prevent re-processing the same objects multiple times. This + * happens, when the values referenced from the commit index are iterated, because it iterates + * over all keys, not only the keys added by a particular commit. + */ + private final LinkedHashMap recentObjIds; + + private final ReferencedObjectsContext referencedObjectsContext; + + private final ResolveStatsBuilder stats; + private final RateLimit commitRateLimiter; + private final RateLimit objRateLimiter; + + private final AtomicBoolean used = new AtomicBoolean(); + + ReferencedObjectsResolverImpl(ReferencedObjectsContext referencedObjectsContext) { + this.referencedObjectsContext = referencedObjectsContext; + this.stats = new ResolveStatsBuilder(); + this.commitRateLimiter = + RateLimit.create(referencedObjectsContext.params().resolveCommitRatePerSecond()); + this.objRateLimiter = + RateLimit.create(referencedObjectsContext.params().resolveObjRatePerSecond()); + + var recentObjIdsFilterSize = referencedObjectsContext.params().recentObjIdsFilterSize(); + this.recentObjIds = + new LinkedHashMap<>() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() >= recentObjIdsFilterSize; + } + }; + } + + @Override + public ResolveResult resolve() throws MustRestartWithBiggerFilterException { + checkState(used.compareAndSet(false, true), "resolve() has already been called."); + + LOGGER.debug( + "Identifying referenced objects in repository '{}', processing {} commits per second, processing {} objects per second", + referencedObjectsContext.persist().config().repositoryId(), + commitRateLimiter, + objRateLimiter); + + var persist = referencedObjectsContext.persist(); + var clock = persist.config().clock(); + var params = referencedObjectsContext.params(); + + stats.started = clock.instant(); + try { + + checkState( + repositoryLogic(persist).repositoryExists(), + "The provided repository has not been initialized."); + + params.relatedObjects().repositoryRelatedObjects().forEach(this::pendingObj); + + for (String internalReferenceName : params.internalReferenceNames()) { + var intRef = persist.fetchReference(internalReferenceName); + checkState(intRef != null, "Internal reference %s not found!", internalReferenceName); + walkReference(intRef); + } + + var referenceLogic = referenceLogic(persist); + referenceLogic.queryReferences(referencesQuery()).forEachRemaining(this::walkReference); + + while (!pendingCommits.isEmpty() || !pendingObjs.isEmpty()) { + if (!pendingCommits.isEmpty()) { + processPendingCommits(); + } + if (!pendingObjs.isEmpty()) { + processPendingObjs(); + } + } + } catch (MustRestartWithBiggerFilterException mustRestart) { + LOGGER.warn( + "Must restart identifying referenced objects for repository '{}', current parameters: expected object count: {}, FPP: {}, allowed FPP: {}", + persist.config().repositoryId(), + params.expectedObjCount(), + params.falsePositiveProbability(), + params.allowedFalsePositiveProbability()); + stats.mustRestart = true; + stats.failure = mustRestart; + throw mustRestart; + } catch (RuntimeException e) { + stats.failure = e; + throw e; + } finally { + stats.ended = clock.instant(); + LOGGER.debug( + "Finished identifying referenced objects in repository '{}', processed {} commits in {} references over {} objects", + persist.config().repositoryId(), + stats.numCommits, + stats.numReferences, + stats.numObjs); + } + + return ImmutableResolveResult.of(stats.build(), purgeObjectsContext(referencedObjectsContext)); + } + + @Override + public ResolveStats getStats() { + return stats.build(); + } + + @VisibleForTesting + void walkReference(Reference reference) { + stats.numReferences++; + + var persist = referencedObjectsContext.persist(); + + if (reference.deleted()) { + LOGGER.trace( + "Skipping deleted reference {} in repository '{}'", + reference.name(), + persist.config().repositoryId()); + return; + } + + LOGGER.debug( + "Walking reference {} in repository '{}' starting at commit {}", + reference.name(), + persist.config().repositoryId(), + reference.pointer()); + + referencedObjectsContext + .params() + .relatedObjects() + .referenceRelatedObjects(reference) + .forEach(this::pendingObj); + + commitLogic(persist) + .commitLog(commitLogQuery(reference.pointer())) + .forEachRemaining(this::handleCommit); + + var extendedInfo = reference.extendedInfoObj(); + if (extendedInfo != null) { + referencedObjectsContext.referencedObjects().markReferenced(extendedInfo); + } + } + + @VisibleForTesting + void handleCommit(CommitObj commit) { + if (referencedObjectsContext.visitedCommitFilter().visited(commit.id())) { + // Prevent visiting the same commit more often than once + return; + } + + commitRateLimiter.acquire(); + + stats.numCommits++; + + var persist = referencedObjectsContext.persist(); + + LOGGER.debug( + "Handling commit {} in repository '{}'", commit.id(), persist.config().repositoryId()); + + stats.numUniqueCommits++; + + referencedObjectsContext.referencedObjects().markReferenced(commit.id()); + + referencedObjectsContext + .params() + .relatedObjects() + .commitRelatedObjects(commit) + .forEach(this::pendingObj); + + var indexesLogic = indexesLogic(referencedObjectsContext.persist()); + var index = indexesLogic.buildCompleteIndexOrEmpty(commit); + for (StoreIndexElement indexElement : index) { + var content = indexElement.content(); + if (content.action().exists()) { + var value = content.value(); + pendingObj(value); + } + } + + commit.secondaryParents().forEach(this::pendingCommit); + } + + private void pendingCommit(ObjId commitObjId) { + if (referencedObjectsContext.visitedCommitFilter().visited(commitObjId)) { + // Prevent visiting the same commit more often than once + return; + } + + if (!pendingCommits.add(commitObjId)) { + return; + } + + stats.numPendingCommits++; + + if (pendingCommits.size() >= referencedObjectsContext.params().pendingObjsBatchSize()) { + processPendingCommits(); + } + } + + @VisibleForTesting + void processPendingCommits() { + stats.numPendingCommitBulkFetches++; + + var persist = referencedObjectsContext.persist(); + + LOGGER.trace( + "Fetching {} pending commits in repository '{}'", + pendingCommits.size(), + persist.config().repositoryId()); + + var commits = + persist.fetchTypedObjsIfExist( + pendingCommits.toArray(ObjId[]::new), COMMIT, CommitObj.class); + + for (CommitObj commit : commits) { + handleCommit(commit); + } + + pendingCommits.clear(); + } + + private void pendingObj(ObjId objId) { + if (recentObjIds.containsKey(objId)) { + return; + } + + if (!pendingObjs.add(objId)) { + return; + } + + stats.numPendingObjs++; + + if (pendingObjs.size() >= referencedObjectsContext.params().pendingObjsBatchSize()) { + processPendingObjs(); + } + } + + @VisibleForTesting + void processPendingObjs() { + stats.numPendingObjsBulkFetches++; + + var persist = referencedObjectsContext.persist(); + + LOGGER.debug( + "Fetching {} pending objects in repository '{}'", + pendingObjs.size(), + persist.config().repositoryId()); + + var objs = persist.fetchObjsIfExist(pendingObjs.toArray(ObjId[]::new)); + + for (Obj obj : objs) { + if (obj != null) { + handleObj(obj); + } + } + + pendingObjs.clear(); + } + + @VisibleForTesting + void handleObj(Obj obj) { + objRateLimiter.acquire(); + + recentObjIds.put(obj.id(), obj.id()); + + stats.numObjs++; + + var persist = referencedObjectsContext.persist(); + + LOGGER.debug( + "Handling obj {} of type {}/{} in repository '{}'", + obj.id(), + obj.type().name(), + obj.type().shortName(), + persist.config().repositoryId()); + + referencedObjectsContext.referencedObjects().markReferenced(obj.id()); + + var type = obj.type(); + + if (VALUE.equals(type)) { + var contentValueObj = (ContentValueObj) obj; + var content = + DefaultStoreWorker.instance() + .valueFromStore(contentValueObj.payload(), contentValueObj.data()); + + handleContent(content); + } + } + + @VisibleForTesting + void handleContent(Content content) { + stats.numContents++; + + referencedObjectsContext + .params() + .relatedObjects() + .contentRelatedObjects(content) + .forEach(this::pendingObj); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java new file mode 100644 index 00000000000..a92b8ec1697 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface ResolveResult { + ResolveStats stats(); + + PurgeObjectsContext purgeObjectsContext(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java new file mode 100644 index 00000000000..603c7361560 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.util.Optional; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface ResolveStats { + Instant started(); + + Instant ended(); + + boolean mustRestart(); + + Optional failure(); + + long numReferences(); + + long numCommits(); + + long numUniqueCommits(); + + long numPendingCommits(); + + long numPendingObjs(); + + long numObjs(); + + long numContents(); + + long numPendingCommitBulkFetches(); + + long numPendingObjsBulkFetches(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java new file mode 100644 index 00000000000..a7c37b78453 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.util.Optional; + +final class ResolveStatsBuilder { + Instant started; + Instant ended; + + boolean mustRestart; + Exception failure; + + long numReferences; + long numCommits; + long numUniqueCommits; + long numPendingCommits; + long numPendingObjs; + long numObjs; + long numContents; + long numPendingCommitBulkFetches; + long numPendingObjsBulkFetches; + + ResolveStats build() { + return ImmutableResolveStats.of( + started, + ended, + mustRestart, + Optional.ofNullable(failure), + numReferences, + numCommits, + numUniqueCommits, + numPendingCommits, + numPendingObjs, + numObjs, + numContents, + numPendingCommitBulkFetches, + numPendingObjsBulkFetches); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java new file mode 100644 index 00000000000..7a647245941 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +/** + * Filter to prevent processing the same {@linkplain CommitObj Nessie commit} more than once. + * + *

There are two implementations of this interface: {@linkplain #ALLOW_DUPLICATE_TRAVERSALS one} + * that does not prevent duplicate processing, and {@linkplain VisitedCommitFilterImpl the + * default one} that does. The parameter {@link CleanupParams#allowDuplicateCommitTraversals()} is + * used to decide which implementation is being used. + */ +public interface VisitedCommitFilter { + boolean visited(ObjId commitObjId); + + long estimatedHeapPressure(); + + VisitedCommitFilter ALLOW_DUPLICATE_TRAVERSALS = + new VisitedCommitFilter() { + @Override + public boolean visited(ObjId commitObjId) { + return false; + } + + @Override + public long estimatedHeapPressure() { + return 0; + } + }; +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java new file mode 100644 index 00000000000..265a6d8b3ae --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.agrona.collections.Hashing.DEFAULT_LOAD_FACTOR; + +import org.agrona.collections.ObjectHashSet; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +final class VisitedCommitFilterImpl implements VisitedCommitFilter { + private final ObjectHashSet visited = new ObjectHashSet<>(64, DEFAULT_LOAD_FACTOR); + + @Override + public boolean visited(ObjId commitObjId) { + return !visited.add(commitObjId); + } + + @Override + public long estimatedHeapPressure() { + var sz = visited.size(); + var cap = visited.capacity(); + return HEAP_SIZE_OBJECT_HASH_SET + HEAP_SIZE_PRIMITIVE_OBJ_ARRAY * cap + HEAP_SIZE_OBJ_ID * sz; + } + + /* + org.agrona.collections.ObjectHashSet object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 float ObjectHashSet.loadFactor N/A + 16 4 int ObjectHashSet.resizeThreshold N/A + 20 4 int ObjectHashSet.size N/A + 24 1 boolean ObjectHashSet.shouldAvoidAllocation N/A + 25 7 (alignment/padding gap) + 32 8 java.lang.Object[] ObjectHashSet.values N/A + 40 8 org.agrona.collections.ObjectHashSet.ObjectIterator ObjectHashSet.iterator N/A + 48 8 java.util.function.IntConsumer ObjectHashSet.resizeNotifier N/A + 56 8 (object alignment gap) + Instance size: 64 bytes + Space losses: 7 bytes internal + 8 bytes external = 15 bytes total + */ + private static final long HEAP_SIZE_OBJECT_HASH_SET = 64L; + /* + org.projectnessie.versioned.storage.common.persist.ObjId$ObjId256 object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 (alignment/padding gap) + 16 8 long ObjId256.l0 N/A + 24 8 long ObjId256.l1 N/A + 32 8 long ObjId256.l2 N/A + 40 8 long ObjId256.l3 N/A + Instance size: 48 bytes + Space losses: 4 bytes internal + 0 bytes external = 4 bytes total + */ + private static final long HEAP_SIZE_OBJ_ID = 48L; + /* + long[] : 16 + 8*length + */ + private static final long HEAP_SIZE_PRIMITIVE_OBJ_ARRAY = 16L; +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java new file mode 100644 index 00000000000..76c3a326303 --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteArray; +import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestReferencedObjectsFilterImpl { + @InjectSoftAssertions SoftAssertions soft; + + @Test + public void emptyFilterContainsNothing() { + ReferencedObjectsFilterImpl filter = + new ReferencedObjectsFilterImpl(CleanupParams.builder().build()); + soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isFalse(); + for (int i = 0; i < 100; i++) { + ObjId id = randomObjId(); + soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isFalse(); + } + } + + @Test + public void filterContainsAdded() { + ReferencedObjectsFilterImpl filter = + new ReferencedObjectsFilterImpl(CleanupParams.builder().build()); + + soft.assertThat(filter.markReferenced(ObjId.EMPTY_OBJ_ID)).isTrue(); + + Set ids = new HashSet<>(3000); + for (int i = 0; i < 1000; i++) { + ids.add(randomObjId()); + } + + for (int i = 0; i < 1000; i++) { + byte[] bytes = new byte[4 + ThreadLocalRandom.current().nextInt(33)]; + ThreadLocalRandom.current().nextBytes(bytes); + ids.add(objIdFromByteArray(bytes)); + } + + for (ObjId id : ids) { + // There is a theoretical chance that this assertion fails, but that change is extremely low. + // (We're adding 2000 object IDs to a bloom filter with an expected object count of 1M and a + // low FPP.) + soft.assertThat(filter.markReferenced(id)).isTrue(); + } + + soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isTrue(); + for (ObjId id : ids) { + soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isTrue(); + } + } + + @ParameterizedTest + @ValueSource(ints = {100, 1_000, 10_000}) + public void withinExpectedFpp(int expected) { + ReferencedObjectsFilterImpl filter = + new ReferencedObjectsFilterImpl(CleanupParams.builder().expectedObjCount(expected).build()); + + for (int i = 0; i < expected; i++) { + ObjId id = randomObjId(); + soft.assertThatCode(() -> filter.markReferenced(id)).doesNotThrowAnyException(); + soft.assertThat(filter.withinExpectedFpp()).isTrue(); + } + + // "withinExpectedFpp" should trigger at some point + boolean thrown = false; + for (int i = 0; i < expected / 2; i++) { + ObjId id = randomObjId(); + try { + filter.markReferenced(id); + soft.assertThat(filter.withinExpectedFpp()).isTrue(); + } catch (MustRestartWithBiggerFilterException e) { + soft.assertThat(filter.withinExpectedFpp()).isFalse(); + thrown = true; + break; + } + } + soft.assertThat(thrown).isTrue(); + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsResolverImpl.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsResolverImpl.java new file mode 100644 index 00000000000..47b43442c6c --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsResolverImpl.java @@ -0,0 +1,220 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static java.util.UUID.randomUUID; +import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8; +import static org.projectnessie.versioned.storage.cleanup.Cleanup.createCleanup; +import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder; +import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; +import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders; +import static org.projectnessie.versioned.storage.common.objtypes.CommitType.NORMAL; +import static org.projectnessie.versioned.storage.common.objtypes.ContentValueObj.contentValue; +import static org.projectnessie.versioned.storage.common.objtypes.StringObj.stringData; +import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID; +import static org.projectnessie.versioned.testworker.OnRefOnly.onRef; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.versioned.storage.common.objtypes.Compression; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.Reference; +import org.projectnessie.versioned.storage.testextension.NessiePersist; +import org.projectnessie.versioned.storage.testextension.PersistExtension; +import org.projectnessie.versioned.store.DefaultStoreWorker; + +@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class}) +public class TestReferencedObjectsResolverImpl { + @InjectSoftAssertions protected SoftAssertions soft; + + @NessiePersist protected Persist persist; + + @Test + void againstEmptyRepository() { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var cleanup = createCleanup(CleanupParams.builder().build()); + var referencedObjectsContext = + cleanup.buildReferencedObjectsContext(persist, persist.config().currentTimeMicros()); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + var resolveResult = referencedObjectsResolver.resolve(); + soft.assertThat(resolveResult.stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numPendingObjs, + ResolveStats::numObjs) + .containsExactly(Optional.empty(), 3L, 3L, 3L, 2L, 2L); + var purgeObjects = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext()); + var purgeResult = purgeObjects.purge(); + soft.assertThat(purgeResult.stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly(Optional.empty(), 5L, 0L); + } + + @Test + void againstEmptyRepositoryWithGarbage() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + var unreferenced = new ArrayList(); + var refs = new ArrayList(); + var keptUnreferenced = new ArrayList(); + var referencedCommits = new ArrayList(); + var referenced = new ArrayList(); + + for (int i = 0; i < 25; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + for (int i = 0; i < 25; i++) { + var cid = randomUUID(); + var obj = + contentValue( + cid.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("dummy " + i, cid.toString()))); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + + // 10 new references + // 10 new RefObj + for (int i = 0; i < 10; i++) { + var head = EMPTY_OBJ_ID; + for (int i1 = 0; i1 < 20; i1++) { + var cid1 = randomUUID(); + var cid2 = randomUUID(); + var obj1 = + contentValue( + cid1.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 1", cid1.toString()))); + var obj2 = + contentValue( + cid2.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 2", cid2.toString()))); + var commit = + commitLogic.doCommit( + newCommitBuilder() + .commitType(NORMAL) + .parentCommitId(head) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "1"), + 42, + obj1.id(), + null, + cid1)) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "2"), + 42, + obj2.id(), + null, + cid2)) + .headers(newCommitHeaders().add("created", "foo-" + i + "-" + i1).build()) + .message("commit " + i1 + " on " + i) + .build(), + List.of(obj1, obj2)); + head = commit.id(); + + referencedCommits.add(head); + referenced.add(obj1.id()); + referenced.add(obj2.id()); + } + + var extendedInfo = + stringData("ref/foo", Compression.NONE, null, List.of(), copyFromUtf8("ext-info " + i)); + soft.assertThat(persist.storeObj(extendedInfo)).isTrue(); + referenced.add(extendedInfo.id()); + + var ref = referenceLogic.createReference("refs/heads/myref-" + i, head, extendedInfo.id()); + refs.add(ref); + } + + var maxObjReferenced = persist.config().currentTimeMicros(); + + // Unreferenced, but newer than 'maxObjReferenced' + for (int i = 100; i < 125; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + for (int i = 100; i < 125; i++) { + var obj = contentValue("cid-" + i, 42, copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + + var cleanup = createCleanup(CleanupParams.builder().build()); + var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + var resolveResult = referencedObjectsResolver.resolve(); + soft.assertThat(resolveResult.stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numPendingObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + 3L + 10L, + 3L + 10L + referencedCommits.size(), + 3L + 10L + referencedCommits.size(), + 2L + referenced.size(), + 2L + referenced.size()); + var purgeObjects = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext()); + var purgeResult = purgeObjects.purge(); + soft.assertThat(purgeResult.stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), 5L + 100L + 20L + referencedCommits.size() + referenced.size(), 50L); + + soft.assertThat(persist.fetchObjsIfExist(unreferenced.toArray(new ObjId[0]))) + .containsOnlyNulls(); + soft.assertThat(persist.fetchObjsIfExist(keptUnreferenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + soft.assertThat(persist.fetchObjsIfExist(referenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + } +}