Skip to content

Commit

Permalink
Persist: introduce deleteWithReferenced(Obj)
Browse files Browse the repository at this point in the history
... to delete an `Obj` only if its `referenced()` timestamp has the expected value.
  • Loading branch information
snazy committed Oct 9, 2024
1 parent 136b790 commit 15a37be
Show file tree
Hide file tree
Showing 24 changed files with 317 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ public void deleteObjs(@Nonnull @javax.annotation.Nonnull ObjId[] ids) {
}
}

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
throw new UnsupportedOperationException();
}

@Override
public boolean deleteConditional(@Nonnull UpdateableObj obj) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,20 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
}
}

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
Filter condition =
FILTERS
.chain()
.filter(FILTERS.qualifier().exactMatch(QUALIFIER_OBJ_REFERENCED))
.filter(FILTERS.value().exactMatch(copyFromUtf8(Long.toString(obj.referenced()))));

ConditionalRowMutation conditionalRowMutation =
conditionalRowMutation(obj, condition, Mutation.create().deleteRow());

return backend.client().checkAndMutateRow(conditionalRowMutation);
}

@Override
public boolean deleteConditional(@Nonnull UpdateableObj obj) {
ConditionalRowMutation conditionalRowMutation =
Expand All @@ -591,9 +605,6 @@ public boolean updateConditional(@Nonnull UpdateableObj expected, @Nonnull Updat
@Nonnull
private ConditionalRowMutation mutationForConditional(
@Nonnull UpdateableObj obj, Mutation mutation) {
checkArgument(obj.id() != null, "Obj to store must have a non-null ID");
ByteString key = dbKey(obj.id());

Filters.ChainFilter objTypeFilter =
FILTERS
.chain()
Expand All @@ -607,6 +618,15 @@ private ConditionalRowMutation mutationForConditional(
Filter condition =
FILTERS.condition(objTypeFilter).then(objVersionFilter).otherwise(FILTERS.block());

return conditionalRowMutation(obj, condition, mutation);
}

@Nonnull
private ConditionalRowMutation conditionalRowMutation(
@Nonnull Obj obj, @Nonnull Filter condition, @Nonnull Mutation mutation) {
checkArgument(obj.id() != null, "Obj to store must have a non-null ID");
ByteString key = dbKey(obj.id());

return ConditionalRowMutation.create(backend.tableObjsId, key)
.condition(condition)
.then(mutation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,15 @@ public void deleteObjs(@Nonnull ObjId[] ids) {
}
}

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
try {
return persist.deleteWithReferenced(obj);
} finally {
cache.remove(obj.id());
}
}

@Override
public boolean deleteConditional(@Nonnull UpdateableObj obj) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ public final class CassandraConstants {
+ COL_OBJ_VERS
+ "=?";

static final String DELETE_OBJ_REFERENCED =
"DELETE FROM %s."
+ TABLE_OBJS
+ " WHERE "
+ COL_REPO_ID
+ "=? AND "
+ COL_OBJ_ID
+ "=? IF "
+ COL_OBJ_REFERENCED
+ "=?";

public static final String INSERT_OBJ_PREFIX =
"INSERT INTO %s."
+ TABLE_OBJS
Expand Down Expand Up @@ -124,7 +135,10 @@ public final class CassandraConstants {
+ " AND "
+ COL_OBJ_ID
+ "=:"
+ COL_OBJ_ID;
+ COL_OBJ_ID
// IF EXISTS is necessary to prevent writing just the referenced timestamp after an object
// has been deleted.
+ " IF EXISTS";

static final Set<CqlColumn> COLS_OBJS_ALL =
Stream.concat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.COL_REPO_ID;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.DELETE_OBJ;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.DELETE_OBJ_CONDITIONAL;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.DELETE_OBJ_REFERENCED;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.EXPECTED_SUFFIX;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.FETCH_OBJ_TYPE;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.FIND_OBJS;
Expand Down Expand Up @@ -342,6 +343,18 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
persistObjs(objs, referenced, true);
}

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
BoundStatement stmt =
backend.buildStatement(
DELETE_OBJ_REFERENCED,
false,
config.repositoryId(),
serializeObjId(obj.id()),
obj.referenced());
return backend.executeCas(stmt);
}

@Override
public boolean deleteConditional(@Nonnull UpdateableObj obj) {
BoundStatement stmt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ public final class Cassandra2Constants {
+ COL_OBJ_VERS
+ "=?";

static final String DELETE_OBJ_REFERENCED =
"DELETE FROM %s."
+ TABLE_OBJS
+ " WHERE "
+ COL_REPO_ID
+ "=? AND "
+ COL_OBJ_ID
+ "=? IF "
+ COL_OBJ_REFERENCED
+ "=?";

static final String CREATE_TABLE_OBJS =
"CREATE TABLE %s."
+ TABLE_OBJS
Expand Down Expand Up @@ -376,7 +387,10 @@ public final class Cassandra2Constants {
+ " AND "
+ COL_OBJ_ID
+ "=:"
+ COL_OBJ_ID;
+ COL_OBJ_ID
// IF EXISTS is necessary to prevent writing just the referenced timestamp after an object
// has been deleted.
+ " IF EXISTS";

private Cassandra2Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.COL_REPO_ID;
import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.DELETE_OBJ;
import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.DELETE_OBJ_CONDITIONAL;
import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.DELETE_OBJ_REFERENCED;
import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.EXPECTED_SUFFIX;
import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.FETCH_OBJ_TYPE;
import static org.projectnessie.versioned.storage.cassandra2.Cassandra2Constants.FIND_OBJS;
Expand Down Expand Up @@ -342,6 +343,18 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
persistObjs(objs, referenced, true);
}

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
BoundStatement stmt =
backend.buildStatement(
DELETE_OBJ_REFERENCED,
false,
config.repositoryId(),
serializeObjId(obj.id()),
obj.referenced());
return backend.executeCas(stmt);
}

@Override
public boolean deleteConditional(@Nonnull UpdateableObj obj) {
BoundStatement stmt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,27 @@ public void referencedLifecycleForUpdateable() throws Exception {
soft.assertThat(updated.referenced()).isGreaterThan(upserted.referenced());
}

@ParameterizedTest
@MethodSource("allObjectTypeSamples")
public void referencedDelete(Obj obj) throws Exception {
assumeThat(persist.isCaching())
.describedAs("'referenced' not tested against a caching Persist")
.isFalse();

soft.assertThat(persist.storeObj(obj)).isTrue();
Obj stored = persist.fetchObj(obj.id());
Thread.sleep(0, 1000);
persist.storeObj(stored);
Obj stored2 = persist.fetchObj(obj.id());
soft.assertThat(stored.referenced()).isNotEqualTo(stored2.referenced());

soft.assertThat(persist.deleteWithReferenced(stored)).isFalse();
soft.assertThatCode(() -> persist.fetchObj(obj.id())).doesNotThrowAnyException();
soft.assertThat(persist.deleteWithReferenced(stored2)).isTrue();
soft.assertThatExceptionOfType(ObjNotFoundException.class)
.isThrownBy(() -> persist.fetchObj(obj.id()));
}

@ParameterizedTest
@MethodSource("allObjectTypeSamples")
public void singleObjectCreateDelete(Obj obj) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
delegate.upsertObjs(objs);
}

@WithSpan
@Override
@Counted(PREFIX)
@Timed(value = PREFIX, histogram = true)
public boolean deleteWithReferenced(@Nonnull Obj obj) {
return delegate.deleteWithReferenced(obj);
}

@WithSpan
@Override
@Counted(PREFIX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@ boolean storeObj(@Nonnull Obj obj, boolean ignoreSoftSizeRestrictions)
*/
void deleteObjs(@Nonnull ObjId[] ids);

/**
* Deletes the given object, if its {@link Obj#referenced()} value is equal to the persisted
* object's value. Since a caching {@link Persist} does not guarantee that the {@link
* Obj#referenced()} value is up-to-date, callers must ensure that they read the uncached object
* state, for example via a {@link #scanAllObjects(Set)}.
*/
boolean deleteWithReferenced(@Nonnull Obj obj);

/**
* Deletes the object, if the current state in the database is equal to the given state, comparing
* the {@link UpdateableObj#versionToken()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,28 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
}
}

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
ObjId id = obj.id();

Map<String, ExpectedAttributeValue> expectedValues =
Map.of(
COL_OBJ_REFERENCED,
ExpectedAttributeValue.builder().value(fromS(Long.toString(obj.referenced()))).build());

try {
backend
.client()
.deleteItem(
b -> b.tableName(backend.tableObjs).key(objKeyMap(id)).expected(expectedValues));
return true;
} catch (ConditionalCheckFailedException checkFailedException) {
return false;
} catch (RuntimeException e) {
throw unhandledException(e);
}
}

@Override
public boolean deleteConditional(@Nonnull UpdateableObj obj) {
ObjId id = obj.id();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,28 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
}
}

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
ObjId id = obj.id();

Map<String, ExpectedAttributeValue> expectedValues =
Map.of(
COL_OBJ_REFERENCED,
ExpectedAttributeValue.builder().value(fromS(Long.toString(obj.referenced()))).build());

try {
backend
.client()
.deleteItem(
b -> b.tableName(backend.tableObjs).key(objKeyMap(id)).expected(expectedValues));
return true;
} catch (ConditionalCheckFailedException checkFailedException) {
return false;
} catch (RuntimeException e) {
throw unhandledException(e);
}
}

@Override
public boolean deleteConditional(@Nonnull UpdateableObj obj) {
ObjId id = obj.id();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,24 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
}
}

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
AtomicBoolean result = new AtomicBoolean();
inmemory.objects.compute(
compositeKey(obj.id()),
(k, v) -> {
if (v == null) {
// not present
return null;
} else if (v.referenced() != obj.referenced()) {
return v;
}
result.set(true);
return null;
});
return result.get();
}

@Override
public boolean deleteConditional(@Nonnull UpdateableObj obj) {
return updateDeleteConditional(obj, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.projectnessie.versioned.storage.jdbc.SqlConstants.COL_REPO_ID;
import static org.projectnessie.versioned.storage.jdbc.SqlConstants.DELETE_OBJ;
import static org.projectnessie.versioned.storage.jdbc.SqlConstants.DELETE_OBJ_CONDITIONAL;
import static org.projectnessie.versioned.storage.jdbc.SqlConstants.DELETE_OBJ_REFERENCED;
import static org.projectnessie.versioned.storage.jdbc.SqlConstants.FETCH_OBJ_TYPE;
import static org.projectnessie.versioned.storage.jdbc.SqlConstants.FIND_OBJS;
import static org.projectnessie.versioned.storage.jdbc.SqlConstants.FIND_OBJS_TYPED;
Expand Down Expand Up @@ -442,6 +443,17 @@ protected final Void updateObjs(@Nonnull Connection conn, @Nonnull Obj[] objs)
return null;
}

protected final boolean deleteWithReferenced(@Nonnull Connection conn, @Nonnull Obj obj) {
try (PreparedStatement ps = conn.prepareStatement(DELETE_OBJ_REFERENCED)) {
ps.setString(1, config.repositoryId());
serializeObjId(ps, 2, obj.id(), databaseSpecific);
ps.setLong(3, obj.referenced());
return ps.executeUpdate() == 1;
} catch (SQLException e) {
throw unhandledSQLException(e);
}
}

protected final boolean deleteConditional(@Nonnull Connection conn, @Nonnull UpdateableObj obj) {
try (PreparedStatement ps = conn.prepareStatement(DELETE_OBJ_CONDITIONAL)) {
ps.setString(1, config.repositoryId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ public void upsertObjs(@Nonnull Obj[] objs) throws ObjTooLargeException {
withConnectionException(false, conn -> super.updateObjs(conn, objs));
}

@Override
public boolean deleteWithReferenced(@Nonnull Obj obj) {
return withConnectionException(false, conn -> super.deleteWithReferenced(conn, obj));
}

@Override
public boolean deleteConditional(@Nonnull UpdateableObj obj) {
return withConnectionException(false, conn -> super.deleteConditional(conn, obj));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ final class SqlConstants {
+ "=? AND "
+ COL_OBJ_ID
+ "=?";
static final String DELETE_OBJ_REFERENCED =
"DELETE FROM "
+ TABLE_OBJS
+ " WHERE "
+ COL_REPO_ID
+ "=? AND "
+ COL_OBJ_ID
+ "=? AND "
+ COL_OBJ_REFERENCED
+ "=?";

static final String COL_REFS_NAME = "ref_name";
static final String COL_REFS_POINTER = "pointer";
Expand Down
Loading

0 comments on commit 15a37be

Please sign in to comment.