Skip to content

Commit

Permalink
Persistence: purge unreferenced Objs
Browse files Browse the repository at this point in the history
This is an attempt to implement the algorithm mentioned in the PR #9401.

The `Obj.referenced()` attribute contains the timestamp when the object was last "referenced" (aka: attempted to be written). It is ...
* set when an object is first persisted via a `storeObj()`
* updated in the database, when an object was not persisted via `storeObj()`
* set/updated via `upsertObj()`
* updated via `updateConditional()`

Let's assume that there is a mechanism to identify the IDs of all referenced objects (it would be very similar to what the export functionality does). The algorithm to purge unreferenced objects must never delete an object that is referenced at any point of time, and must consider the case that an object that was unreferenced when a purge-unreferenced-objects routine started, but became referenced while it is running.

An approach could work as follows:

1. Memoize the current timestamp (minus some wall-clock drift adjustment).
2. Identify the IDs of all referenced objects. We could leverage a bloom filter, if the set of IDs is big.
3. Then scan all objects in the repository. Objects can be purged, if ...
    * the ID is not in the set (or bloom filter) generated in step 2 ...
    * _AND_ have a `referenced` timestamp less than the memoized timestamp.

Any deletion in the backing database would follow the meaning of this pseudo SQL: `DELETE FROM objs WHERE obj_id = :objId AND referenced < :memoizedTimestamp`.

Noting, that the `referenced` attribute is rather incorrect when retrieved from the objects cache (aka: during normal operations), which is not a problem, because that `referenced` attribute is irrelevant for production accesses.

There are two edge cases / race conditions:
* (for some backends): A `storeObj()` operation detected that the object already exists - then the purge routine deletes that object - and then the `storeObj()` tries to upddate the `referenced` attribute. The result is the loss of that object. This race condition can only occur, if the object existed but was not referenced.
* While the referenced objects are being identified, create a new named reference (branch / tag) pointing to commit(s) that would be identified as unreferenced and being later purged.
  • Loading branch information
snazy committed Oct 30, 2024
1 parent 10abbfb commit 86478ac
Show file tree
Hide file tree
Showing 33 changed files with 2,978 additions and 0 deletions.
1 change: 1 addition & 0 deletions bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ dependencies {
api(project(":nessie-versioned-storage-cassandra-tests"))
api(project(":nessie-versioned-storage-cassandra2"))
api(project(":nessie-versioned-storage-cassandra2-tests"))
api(project(":nessie-versioned-storage-cleanup"))
api(project(":nessie-versioned-storage-common"))
api(project(":nessie-versioned-storage-common-proto"))
api(project(":nessie-versioned-storage-common-serialize"))
Expand Down
1 change: 1 addition & 0 deletions gradle/projects.main.properties
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ nessie-versioned-storage-cassandra=versioned/storage/cassandra
nessie-versioned-storage-cassandra-tests=versioned/storage/cassandra-tests
nessie-versioned-storage-cassandra2=versioned/storage/cassandra2
nessie-versioned-storage-cassandra2-tests=versioned/storage/cassandra2-tests
nessie-versioned-storage-cleanup=versioned/storage/cleanup
nessie-versioned-storage-common=versioned/storage/common
nessie-versioned-storage-common-proto=versioned/storage/common-proto
nessie-versioned-storage-common-serialize=versioned/storage/common-serialize
Expand Down
61 changes: 61 additions & 0 deletions versioned/storage/cleanup/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2022 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id("nessie-conventions-server") }

publishingHelper { mavenName = "Nessie - Storage - Cleanup unreferenced objects" }

description = "Identify and purge unreferenced objects in the Nessie repository."

dependencies {
implementation(project(":nessie-model"))
implementation(project(":nessie-versioned-storage-common"))
implementation(project(":nessie-versioned-spi"))
implementation(project(":nessie-versioned-transfer-related"))

compileOnly(libs.jakarta.validation.api)
compileOnly(libs.jakarta.annotation.api)
compileOnly(libs.microprofile.openapi)

compileOnly(platform(libs.jackson.bom))
compileOnly("com.fasterxml.jackson.core:jackson-annotations")

compileOnly(libs.errorprone.annotations)
implementation(libs.guava)
implementation(libs.agrona)
implementation(libs.slf4j.api)

compileOnly(project(":nessie-versioned-storage-testextension"))

compileOnly(project(":nessie-immutables"))
annotationProcessor(project(":nessie-immutables", configuration = "processor"))

testImplementation(project(":nessie-versioned-storage-testextension"))
testImplementation(project(":nessie-versioned-storage-inmemory"))
testImplementation(project(":nessie-versioned-tests"))
testImplementation(project(path = ":nessie-protobuf-relocated", configuration = "shadow"))
testImplementation(platform(libs.junit.bom))
testImplementation(libs.bundles.junit.testing)
testRuntimeOnly(libs.logback.classic)

testCompileOnly(project(":nessie-immutables"))
testAnnotationProcessor(project(":nessie-immutables", configuration = "processor"))

testCompileOnly(libs.microprofile.openapi)

testCompileOnly(platform(libs.jackson.bom))
testCompileOnly("com.fasterxml.jackson.core:jackson-annotations")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.versioned.storage.cleanup;

import static org.projectnessie.versioned.storage.cleanup.PurgeFilter.ReferencedObjectsPurgeFilter.referencedObjectsPurgeFilter;
import static org.projectnessie.versioned.storage.cleanup.ReferencedObjectsContext.objectsResolverContext;

import org.projectnessie.versioned.storage.common.persist.Obj;
import org.projectnessie.versioned.storage.common.persist.Persist;

/**
* Primary point of entry to remove unreferenced objects from Nessie's backend database.
*
* <p>Simplified example code flow: <code><pre>
* var params =
* CleanupParams.builder().build();
* var cleanup =
* createCleanup(params);
*
* var referencedObjectsContext =
* cleanup.buildReferencedObjectsContext(persist,
* TimeUnit.MILLISECONDS.toMicros(
* Instant.now().minus(3, ChronoUnit.DAYS)
* .toEpochMilli()));
* var referencedObjectsResolver =
* cleanup.createReferencedObjectsResolver(referencedObjectsContext);
*
* // Must handle MustRestartWithBiggerFilterException
* var resolveResult =
* referencedObjectsResolver.resolve();
*
* var purgeObjects =
* cleanup.createPurgeObjects(resolveResult.purgeObjectsContext());
* var purgeResult =
* purgeObjects.purge();
* </pre></code>
*/
public class Cleanup {
private final CleanupParams cleanupParams;

private Cleanup(CleanupParams cleanupParams) {
this.cleanupParams = cleanupParams;
}

public static Cleanup createCleanup(CleanupParams params) {
return new Cleanup(params);
}

/**
* Create the context holder used when identifying referenced objects and purging unreferenced
* objects.
*
* <p>Choosing an appropriate value for {@code maxObjReferenced} is crucial. Technically, this
* value must be at max the current timestamp - but logically {@code maxObjReferenced} should be
* the timestamp of a few days ago to not delete unreferenced objects too early and give users a
* chance to reset branches to another commit ID in case some table/view metadata is broken.
*
* <p>Uses an instance of {@link
* org.projectnessie.versioned.storage.cleanup.PurgeFilter.ReferencedObjectsPurgeFilter} using a
* bloom filter based {@link ReferencedObjectsFilter}, both configured using {@link
* CleanupParams}'s attributes.
*
* @param persist the persistence/repository to run against
* @param maxObjReferenced only {@link Obj}s with a {@link Obj#referenced()} older than {@code
* maxObjReferenced} will be deleted. Production workloads should set this to something like
* "now minus 7 days" to have the chance to reset branches, just in case. Technically, this
* value must not be greater than "now". "Now" should be inquired using {@code
* Persist.config().clock().instant()}.
*/
public ReferencedObjectsContext buildReferencedObjectsContext(
Persist persist, long maxObjReferenced) {
var referencedObjects = new ReferencedObjectsFilterImpl(cleanupParams);
var purgeFilter = referencedObjectsPurgeFilter(referencedObjects, maxObjReferenced);
return objectsResolverContext(persist, cleanupParams, referencedObjects, purgeFilter);
}

/**
* Creates a new objects-resolver instance to identify <em>referenced</em> objects, which must be
* retained.
*
* @param objectsResolverContext context, preferably created using {@link
* #buildReferencedObjectsContext(Persist, long)}
*/
public ReferencedObjectsResolver createReferencedObjectsResolver(
ReferencedObjectsContext objectsResolverContext) {
return new ReferencedObjectsResolverImpl(
objectsResolverContext, cleanupParams.rateLimitFactory());
}

/**
* Creates a new objects-purger instance to delete <em>unreferenced</em> objects.
*
* @param purgeObjectsContext return value of {@link ReferencedObjectsResolver#resolve()}.
*/
public PurgeObjects createPurgeObjects(PurgeObjectsContext purgeObjectsContext) {
return new PurgeObjectsImpl(purgeObjectsContext, cleanupParams.rateLimitFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.versioned.storage.cleanup;

import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REFS;
import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REPO;
import static org.projectnessie.versioned.transfer.related.CompositeTransferRelatedObjects.createCompositeTransferRelatedObjects;

import java.util.List;
import java.util.function.IntFunction;
import org.immutables.value.Value;
import org.projectnessie.nessie.immutables.NessieImmutable;
import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
import org.projectnessie.versioned.storage.common.persist.Obj;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.transfer.related.TransferRelatedObjects;

/**
* Technically and implementation oriented parameters for Nessie's backend database cleanup,
* considered for internal use only.
*
* <p>Any API or functionality that exposes Nessie's backend database cleanup must provide a
* functionally oriented way for configuration and generate a {@link CleanupParams} from it.
*/
@NessieImmutable
public interface CleanupParams {
// Following defaults result in a serialized bloom filter size of about 3000000 bytes.
long DEFAULT_EXPECTED_OBJ_COUNT = 1_000_000L;
double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.00001d;
double DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY = 0.0001d;
boolean DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS = false;
int DEFAULT_PENDING_OBJS_BATCH_SIZE = 20;
int DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE = 100_000;

static ImmutableCleanupParams.Builder builder() {
return ImmutableCleanupParams.builder();
}

/**
* Number of expected {@link Obj}s, defaults to {@value #DEFAULT_EXPECTED_OBJ_COUNT}, used to size
* the bloom filter identifying the referenced {@link Obj}s. If {@link
* ReferencedObjectsResolver#resolve()} throws {@link MustRestartWithBiggerFilterException}, it is
* recommended to increase this value.
*/
@Value.Default
default long expectedObjCount() {
return DEFAULT_EXPECTED_OBJ_COUNT;
}

/**
* Returns an updated instance of {@code this} value with {@link #expectedObjCount()} increased by
* {@value #DEFAULT_EXPECTED_OBJ_COUNT} as a convenience function to handle {@link
* MustRestartWithBiggerFilterException} thrown by {@link ReferencedObjectsResolver#resolve()} .
*/
default CleanupParams withIncreasedExpectedObjCount() {
return builder()
.from(this)
.expectedObjCount(expectedObjCount() + DEFAULT_EXPECTED_OBJ_COUNT)
.build();
}

/**
* Related to {@link #expectedObjCount()}, used to size the bloom filter identifying the
* referenced {@link Obj}s, defaults to {@value #DEFAULT_FALSE_POSITIVE_PROBABILITY}.
*/
@Value.Default
default double falsePositiveProbability() {
return DEFAULT_FALSE_POSITIVE_PROBABILITY;
}

/**
* Maximum allowed FPP, checked when adding to the bloom filter identifying the referenced {@link
* Obj}s, defaults to {@value #DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY}. If this value is
* exceeded, a {@link MustRestartWithBiggerFilterException} will be thrown from {@link
* ReferencedObjectsResolver#resolve()}.
*/
@Value.Default
default double allowedFalsePositiveProbability() {
return DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY;
}

/** Helper functionality to identify related {@link Obj}s, see {@link TransferRelatedObjects}. */
@Value.Default
default TransferRelatedObjects relatedObjects() {
return createCompositeTransferRelatedObjects();
}

/**
* {@link ReferencedObjectsResolver} tries to not walk a commit more than once by memoizing the
* visited {@link CommitObj#id() commit IDs}, default is {@link
* #DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS}. Setting this to {@code true} disables this
* optimization.
*/
@Value.Default
default boolean allowDuplicateCommitTraversals() {
return DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS;
}

/**
* Rate limit for commit objects per second during {@link ReferencedObjectsResolver#resolve()},
* default is unlimited. Any positive value enables rate limiting, any value {@code <=0} disables
* rate limiting.
*/
@Value.Default
default int resolveCommitRatePerSecond() {
return 0;
}

/**
* Rate limit for (non commit) objects per second during {@link
* ReferencedObjectsResolver#resolve()}, default is unlimited. Any positive value enables rate
* limiting, any value {@code <=0} disables rate limiting.
*/
@Value.Default
default int resolveObjRatePerSecond() {
return 0;
}

/**
* Rate limit for scanning objects per second during {@link PurgeObjects#purge()}, default is
* unlimited. Any positive value enables rate limiting, any value {@code <=0} disables rate
* limiting.
*/
@Value.Default
default int purgeScanObjRatePerSecond() {
return 0;
}

/**
* Rate limit for purging objects per second during {@link PurgeObjects#purge()}, default is
* unlimited. Any positive value enables rate limiting, any value {@code <=0} disables rate
* limiting.
*/
@Value.Default
default int purgeDeleteObjRatePerSecond() {
return 0;
}

/**
* {@link ReferencedObjectsResolver} attempts to fetch objects from the backend database in
* batches, this parameter defines the batch size, defaults to {@link
* #DEFAULT_PENDING_OBJS_BATCH_SIZE}.
*/
@Value.Default
default int pendingObjsBatchSize() {
return DEFAULT_PENDING_OBJS_BATCH_SIZE;
}

/**
* Size of the "recent object IDs" filter to prevent processing the same {@link ObjId}s. This *
* happens, when the values referenced from the commit index are iterated, because it iterates *
* over all keys, not only the keys added by a particular commit.
*
* <p>The value defaults to {@value #DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE}. It should be higher than
* the maximum number of keys in a commit.
*/
@Value.Default
default int recentObjIdsFilterSize() {
return DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE;
}

/** Rate limiter factory for the rate limits defined above, useful for testing purposes. */
@Value.Default
default IntFunction<RateLimit> rateLimitFactory() {
return RateLimit::create;
}

/** Defines the names of the Nessie internal references, do not change. */
@Value.Default
default List<String> internalReferenceNames() {
return List.of(REF_REFS.name(), REF_REPO.name());
}

/**
* Optionally enable a dry-run mode, which does not delete any objects from the backend database,
* defaults to {@code false}.
*/
@Value.Default
default boolean dryRun() {
return false;
}
}
Loading

0 comments on commit 86478ac

Please sign in to comment.