Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit behavior of multiple mutations for same record in transaction in Consensus Commit #2340

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

brfrn169
Copy link
Collaborator

@brfrn169 brfrn169 commented Nov 14, 2024

Description

This PR revisits the behavior of multiple mutations for the same record in a transaction in Consensus Commit.

Expected behavior for combinations of mutations on the same record are as follows:

  1. Insert then Insert: the second Insert fails.
  2. Insert then Upsert: merge Insert and Upsert (with implicit pre-read disabled), then execute.
  3. Insert then Update: merge Insert and Update (with implicit pre-read disabled), then execute.
  4. Insert then Delete: only execute Delete.
  5. Upsert then Insert: Insert fails.
  6. Upsert then Upsert: merge the two Upserts, then execute.
  7. Upsert then Update: merge Upsert and Update, then execute.
  8. Upsert then Delete: only execute Delete.
  9. Update then Insert: if the target record exists, Insert fails; if not, only execute Insert.
  10. Update then Upsert: if the target record exists, only execute Upsert; if not, merge Update and Upsert, then execute.
  11. Update then Update: if the target record exists, merge the two Updates; if not, do nothing.
  12. Update then Delete: only execute Delete.
  13. Delete then Insert: not allowed in Consensus Commit.
  14. Delete then Upsert: not allowed in Consensus Commit.
  15. Delete then Update: do nothing.
  16. Delete then Delete: only execute the second Delete.

I've added some test cases to ConsensusCommitIntegrationTestBase.java to check the behavior.

To achieve this behavior, we need to make the following changes:

  1. Consider the write set for conditional updates.
  2. Add validation to ensure that Insert is not executed on an already written record.
  3. Updated the merge logic for the write set. Please see the inline comment for details.

Related issues and/or PRs

N/A

Changes made

  • Use the result that's merged with mutations in the write set for conditional updates.
  • Added validation to ensure that Insert is not executed for already written record.
  • Updated the merge logic for the write set.
  • Refactored Snapshot.

Checklist

  • I have commented my code, particularly in hard-to-understand areas.
  • I have updated the documentation to reflect the changes.
  • Any remaining open issues linked to this PR are documented and up-to-date (Jira, GitHub, etc.).
  • Tests (unit, integration, etc.) have been added for the changes.
  • My changes generate no new warnings.
  • Any dependent changes in other PRs have been merged and published.

Additional notes (optional)

The Insert, Upsert, and Update are converted to Put internally. See the following for details:

static Put createPutForInsert(Insert insert) {
PutBuilder.Buildable buildable =
Put.newBuilder()
.namespace(insert.forNamespace().orElse(null))
.table(insert.forTable().orElse(null))
.partitionKey(insert.getPartitionKey());
insert.getClusteringKey().ifPresent(buildable::clusteringKey);
insert.getColumns().values().forEach(buildable::value);
buildable.enableInsertMode();
return buildable.build();
}
static Put createPutForUpsert(Upsert upsert) {
PutBuilder.Buildable buildable =
Put.newBuilder()
.namespace(upsert.forNamespace().orElse(null))
.table(upsert.forTable().orElse(null))
.partitionKey(upsert.getPartitionKey());
upsert.getClusteringKey().ifPresent(buildable::clusteringKey);
upsert.getColumns().values().forEach(buildable::value);
buildable.enableImplicitPreRead();
return buildable.build();
}
static Put createPutForUpdate(Update update) {
PutBuilder.Buildable buildable =
Put.newBuilder()
.namespace(update.forNamespace().orElse(null))
.table(update.forTable().orElse(null))
.partitionKey(update.getPartitionKey());
update.getClusteringKey().ifPresent(buildable::clusteringKey);
update.getColumns().values().forEach(buildable::value);
if (update.getCondition().isPresent()) {
if (update.getCondition().get() instanceof UpdateIf) {
update
.getCondition()
.ifPresent(c -> buildable.condition(ConditionBuilder.putIf(c.getExpressions())));
} else {
assert update.getCondition().get() instanceof UpdateIfExists;
buildable.condition(ConditionBuilder.putIfExists());
}
} else {
buildable.condition(ConditionBuilder.putIfExists());
}
buildable.enableImplicitPreRead();
return buildable.build();
}

Release notes

Fixed the behavior of multiple mutations for the same record in a transaction in Consensus Commit.

@brfrn169 brfrn169 self-assigned this Nov 14, 2024
@@ -113,47 +115,55 @@ Isolation getIsolation() {

// Although this class is not thread-safe, this method is actually thread-safe because the readSet
// is a concurrent map
public void put(Key key, Optional<TransactionResult> result) {
public void putIntoReadSet(Key key, Optional<TransactionResult> result) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the put methods in Snapshot to make them more explicit.

Comment on lines +187 to +209
public Optional<TransactionResult> getResult(Key key) throws CrudException {
Optional<TransactionResult> result = readSet.getOrDefault(key, Optional.empty());
return mergeResult(key, result);
}

public Optional<TransactionResult> getResult(Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getSet.getOrDefault(get, Optional.empty());
return mergeResult(key, result, get.getConjunctions());
}

public Optional<Map<Snapshot.Key, TransactionResult>> getResults(Scan scan) throws CrudException {
if (!scanSet.containsKey(scan)) {
return Optional.empty();
}

Map<Key, TransactionResult> results = new LinkedHashMap<>();
for (Entry<Snapshot.Key, TransactionResult> entry : scanSet.get(scan).entrySet()) {
mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}

return Optional.of(results);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added getResult() and getResults() methods to Snapshot. These methods return results that are merged with mutations in the write set, if necessary. Following this change, we can make the mergeResult() methods private.

@@ -213,10 +207,10 @@ public void put(Put put) throws CrudException {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
put, snapshot.getFromReadSet(key).orElse(null));
put, snapshot.getResult(key).orElse(null));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the added getResult() method to use the result that's merged with a mutation for the same record in the write set for conditional updates.

@@ -227,10 +221,10 @@ public void delete(Delete delete) throws CrudException {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
delete, snapshot.getFromReadSet(key).orElse(null));
delete, snapshot.getResult(key).orElse(null));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Use the added getResult() method to use the result that's merged with a mutation for the same record in the write set for conditional deletes.

Comment on lines +138 to +141
if (put.isInsertModeEnabled()) {
throw new IllegalArgumentException(
CoreError.CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED.buildMessage());
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add validation to ensure that Insert is not executed on an already written record.

Comment on lines +145 to +156
PutBuilder.BuildableFromExisting putBuilder = Put.newBuilder(originalPut);
put.getColumns().values().forEach(putBuilder::value);

// If the implicit pre-read is enabled for the new put, it should also be enabled for the
// merged put. However, if the previous put is in insert mode, this doesn’t apply. This is
// because, in insert mode, the read set is not used during the preparation phase. Therefore,
// we only need to enable the implicit pre-read if the previous put is not in insert mode
if (put.isImplicitPreReadEnabled() && !originalPut.isInsertModeEnabled()) {
putBuilder.enableImplicitPreRead();
}

writeSet.put(key, putBuilder.build());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the merge logic for the write set. We should also merge the implicit pre-read, with one exception. Please see the comments for details.

Comment on lines +669 to +670
CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED(
Category.USER_ERROR, "0146", "Inserting already-written data is not allowed", "", ""),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josh-wong Could you please review this message when you have time? 🙇

@komamitsu
Copy link
Contributor

Insert then Upsert: merge Insert and Upsert (with implicit pre-read disabled), then execute.
Insert then Update: merge Insert and Update (with implicit pre-read disabled), then execute.
Insert then Delete: only execute Delete.

@brfrn169 I assume these 3 cases should fail if a record with the same key already exists, since the first Inserts should fail from semantic perspective. Is my understanding correct?

@brfrn169
Copy link
Collaborator Author

Insert then Upsert: merge Insert and Upsert (with implicit pre-read disabled), then execute.
Insert then Update: merge Insert and Update (with implicit pre-read disabled), then execute.
Insert then Delete: only execute Delete.

@brfrn169 I assume these 3 cases should fail if a record with the same key already exists, since the first Inserts should fail from semantic perspective. Is my understanding correct?

@komamitsu Yes, that's correct.

Strictly speaking, in Consensus Commit, when inserting a record, we don’t check if the target record exists or not. As a result, users encounter a conflict error only when committing the transaction, not when inserting the record.

@komamitsu
Copy link
Contributor

@brfrn169 Thanks for the details!

I executed this code with the implementation of this PR, but it didn't fail. I expected this failed since the INSERT conflicted with the existing record... What do you think?

    TransactionFactory factory = TransactionFactory.create("/tmp/local-pg.properties");
    DistributedTransactionManager transactionManager = factory.getTransactionManager();
    try {
      DistributedTransaction tx = transactionManager.begin();
      System.out.println(">>>> " + tx.get(
          Get.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build()));          // >>>> Optional[FilteredResult{payload=null, ycsb_key=1}]
      tx.insert(
          Insert.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build());
      tx.delete(
          Delete.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build());
      tx.commit();
    }
    finally{
      transactionManager.close();
    }

@brfrn169
Copy link
Collaborator Author

brfrn169 commented Nov 15, 2024

I executed this code with the implementation of this PR, but it didn't fail. I expected this failed since the INSERT conflicted with the existing record... What do you think?

@komamitsu That's a really good point.

As I mentioned in the previous comment, we don’t check whether the target record exists when inserting a record in Consensus Commit. As a result, if an Insert is followed by a Delete, the Insert is removed, and only the Delete is executed when committing the transaction, even if the record exists. This is a limitation of Consensus Commit.

By the way, the reason we don’t check whether the target record exists during an insert is for performance optimization. Checking for the existence of a target record requires an additional read from the database, which we wanted to avoid.

@brfrn169
Copy link
Collaborator Author

@komamitsu One possible idea is to disallow deleting a record that has been inserted in the same transaction by adding validation. What do you think?

@komamitsu
Copy link
Contributor

komamitsu commented Nov 15, 2024

@brfrn169 Thanks for the explanation. It matches my understanding from the source code.

One possible idea is to disallow deleting a record that has been inserted in the same transaction by adding validation

This seems better to me in terms of consistency. But it might be a bit too strict in terms of convenience? I want to hear opinions from @feeblefakie as well.

BTW, I also tried Upsert and Update after Insert as follows.

    TransactionFactory factory = TransactionFactory.create("/Users/mitsunorikomatsu/tmp/local-pg.properties");
    DistributedTransactionManager transactionManager = factory.getTransactionManager();
    try {
      DistributedTransaction tx = transactionManager.begin();
      System.out.println(">>>> " + tx.get(
          Get.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build()));
      tx.insert(
          Insert.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .build());
      tx.upsert(
          Upsert.newBuilder()
              .namespace("ycsb").table("usertable")
              .partitionKey(Key.ofInt("ycsb_key", 1))
              .value(TextColumn.of("payload", "hello"))
              .build());
      tx.commit();
    }
    finally{
      transactionManager.close();
    }

But it failed and the error message "The record being prepared already exists" seemed a bit odd to me. It also happens with the current master branch, though.

Exception in thread "main" com.scalar.db.exception.transaction.CommitConflictException: CORE-20013: The record being prepared already exists. Transaction ID: GMiUdvxZCcPyxLLaJt5yKnfR$70452dbf-7653-46d7-af8c-85b4f3337d65
	at com.scalar.db.transaction.consensuscommit.CommitHandler.commit(CommitHandler.java:124)
	at com.scalar.db.transaction.consensuscommit.ConsensusCommit.commit(ConsensusCommit.java:227)
	at com.scalar.db.common.DecoratedDistributedTransaction.commit(DecoratedDistributedTransaction.java:129)
	at com.scalar.db.common.TransactionDecorationDistributedTransactionManager$StateManagedTransaction.commit(TransactionDecorationDistributedTransactionManager.java:147)
	at com.scalar.db.common.DecoratedDistributedTransaction.commit(DecoratedDistributedTransaction.java:129)
	at com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager$ActiveTransaction.commit(ActiveTransactionManagedDistributedTransactionManager.java:168)
	at com.scalar.db.transaction.consensuscommit.CommitHandler.main(CommitHandler.java:361)
Caused by: com.scalar.db.exception.transaction.PreparationConflictException: CORE-20013: The record being prepared already exists. Transaction ID: GMiUdvxZCcPyxLLaJt5yKnfR$70452dbf-7653-46d7-af8c-85b4f3337d65
	at com.scalar.db.transaction.consensuscommit.CommitHandler.prepare(CommitHandler.java:184)
	at com.scalar.db.transaction.consensuscommit.CommitHandler.commit(CommitHandler.java:119)
	... 6 more
Caused by: com.scalar.db.exception.storage.NoMutationException: CORE-20000: No mutation was applied
	at com.scalar.db.storage.jdbc.JdbcDatabase.put(JdbcDatabase.java:115)
	at com.scalar.db.storage.multistorage.MultiStorage.put(MultiStorage.java:105)
	at com.scalar.db.storage.multistorage.MultiStorage.mutate(MultiStorage.java:130)
	at com.scalar.db.transaction.consensuscommit.CommitHandler.lambda$prepareRecords$0(CommitHandler.java:206)
	at com.scalar.db.transaction.consensuscommit.ParallelExecutor.lambda$executeTasksInParallel$0(ParallelExecutor.java:181)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Caused by: com.scalar.db.exception.transaction.PreparationConflictException: CORE-20013: The record being prepared already exists. Transaction ID: GMiUdvxZCcPyxLLaJt5yKnfR$70452dbf-7653-46d7-af8c-85b4f3337d65

	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Caused by: com.scalar.db.exception.storage.NoMutationException: CORE-20000: No mutation was applied

	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

@brfrn169
Copy link
Collaborator Author

But it failed and the error message "The record being prepared already exists" seemed a bit odd to me. It also happens with the current master branch, though.

@komamitsu I think the error indicates that you tried to insert a record that already exists. Maybe even if you remove the Upsert from the code, you still encounter the same error, right?

@komamitsu
Copy link
Contributor

@brfrn169 Oh, sorry, I forgot to call rollback() as error handling. Probably that's the cause 🙇 Please ignore the comment about Upsert and Update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants