-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revisit behavior of multiple mutations for same record in transaction in Consensus Commit #2340
base: master
Are you sure you want to change the base?
Conversation
… in Consensus Commit
@@ -113,47 +115,55 @@ Isolation getIsolation() { | |||
|
|||
// Although this class is not thread-safe, this method is actually thread-safe because the readSet | |||
// is a concurrent map | |||
public void put(Key key, Optional<TransactionResult> result) { | |||
public void putIntoReadSet(Key key, Optional<TransactionResult> result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed the put
methods in Snapshot
to make them more explicit.
public Optional<TransactionResult> getResult(Key key) throws CrudException { | ||
Optional<TransactionResult> result = readSet.getOrDefault(key, Optional.empty()); | ||
return mergeResult(key, result); | ||
} | ||
|
||
public Optional<TransactionResult> getResult(Key key, Get get) throws CrudException { | ||
Optional<TransactionResult> result = getSet.getOrDefault(get, Optional.empty()); | ||
return mergeResult(key, result, get.getConjunctions()); | ||
} | ||
|
||
public Optional<Map<Snapshot.Key, TransactionResult>> getResults(Scan scan) throws CrudException { | ||
if (!scanSet.containsKey(scan)) { | ||
return Optional.empty(); | ||
} | ||
|
||
Map<Key, TransactionResult> results = new LinkedHashMap<>(); | ||
for (Entry<Snapshot.Key, TransactionResult> entry : scanSet.get(scan).entrySet()) { | ||
mergeResult(entry.getKey(), Optional.of(entry.getValue())) | ||
.ifPresent(result -> results.put(entry.getKey(), result)); | ||
} | ||
|
||
return Optional.of(results); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added getResult()
and getResults()
methods to Snapshot
. These methods return results that are merged with mutations in the write set, if necessary. Following this change, we can make the mergeResult()
methods private
.
@@ -213,10 +207,10 @@ public void put(Put put) throws CrudException { | |||
read(key, createGet(key)); | |||
} | |||
mutationConditionsValidator.checkIfConditionIsSatisfied( | |||
put, snapshot.getFromReadSet(key).orElse(null)); | |||
put, snapshot.getResult(key).orElse(null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the added getResult()
method to use the result that's merged with a mutation for the same record in the write set for conditional updates.
@@ -227,10 +221,10 @@ public void delete(Delete delete) throws CrudException { | |||
read(key, createGet(key)); | |||
} | |||
mutationConditionsValidator.checkIfConditionIsSatisfied( | |||
delete, snapshot.getFromReadSet(key).orElse(null)); | |||
delete, snapshot.getResult(key).orElse(null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. Use the added getResult()
method to use the result that's merged with a mutation for the same record in the write set for conditional deletes.
if (put.isInsertModeEnabled()) { | ||
throw new IllegalArgumentException( | ||
CoreError.CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED.buildMessage()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add validation to ensure that Insert is not executed on an already written record.
PutBuilder.BuildableFromExisting putBuilder = Put.newBuilder(originalPut); | ||
put.getColumns().values().forEach(putBuilder::value); | ||
|
||
// If the implicit pre-read is enabled for the new put, it should also be enabled for the | ||
// merged put. However, if the previous put is in insert mode, this doesn’t apply. This is | ||
// because, in insert mode, the read set is not used during the preparation phase. Therefore, | ||
// we only need to enable the implicit pre-read if the previous put is not in insert mode | ||
if (put.isImplicitPreReadEnabled() && !originalPut.isInsertModeEnabled()) { | ||
putBuilder.enableImplicitPreRead(); | ||
} | ||
|
||
writeSet.put(key, putBuilder.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the merge logic for the write set. We should also merge the implicit pre-read, with one exception. Please see the comments for details.
CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED( | ||
Category.USER_ERROR, "0146", "Inserting already-written data is not allowed", "", ""), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@josh-wong Could you please review this message when you have time? 🙇
@brfrn169 I assume these 3 cases should fail if a record with the same key already exists, since the first Inserts should fail from semantic perspective. Is my understanding correct? |
@komamitsu Yes, that's correct. Strictly speaking, in Consensus Commit, when inserting a record, we don’t check if the target record exists or not. As a result, users encounter a conflict error only when committing the transaction, not when inserting the record. |
@brfrn169 Thanks for the details! I executed this code with the implementation of this PR, but it didn't fail. I expected this failed since the INSERT conflicted with the existing record... What do you think? TransactionFactory factory = TransactionFactory.create("/tmp/local-pg.properties");
DistributedTransactionManager transactionManager = factory.getTransactionManager();
try {
DistributedTransaction tx = transactionManager.begin();
System.out.println(">>>> " + tx.get(
Get.newBuilder()
.namespace("ycsb").table("usertable")
.partitionKey(Key.ofInt("ycsb_key", 1))
.build())); // >>>> Optional[FilteredResult{payload=null, ycsb_key=1}]
tx.insert(
Insert.newBuilder()
.namespace("ycsb").table("usertable")
.partitionKey(Key.ofInt("ycsb_key", 1))
.build());
tx.delete(
Delete.newBuilder()
.namespace("ycsb").table("usertable")
.partitionKey(Key.ofInt("ycsb_key", 1))
.build());
tx.commit();
}
finally{
transactionManager.close();
} |
@komamitsu That's a really good point. As I mentioned in the previous comment, we don’t check whether the target record exists when inserting a record in Consensus Commit. As a result, if an Insert is followed by a Delete, the Insert is removed, and only the Delete is executed when committing the transaction, even if the record exists. This is a limitation of Consensus Commit. By the way, the reason we don’t check whether the target record exists during an insert is for performance optimization. Checking for the existence of a target record requires an additional read from the database, which we wanted to avoid. |
@komamitsu One possible idea is to disallow deleting a record that has been inserted in the same transaction by adding validation. What do you think? |
@brfrn169 Thanks for the explanation. It matches my understanding from the source code.
This seems better to me in terms of consistency. But it might be a bit too strict in terms of convenience? I want to hear opinions from @feeblefakie as well.
TransactionFactory factory = TransactionFactory.create("/Users/mitsunorikomatsu/tmp/local-pg.properties");
DistributedTransactionManager transactionManager = factory.getTransactionManager();
try {
DistributedTransaction tx = transactionManager.begin();
System.out.println(">>>> " + tx.get(
Get.newBuilder()
.namespace("ycsb").table("usertable")
.partitionKey(Key.ofInt("ycsb_key", 1))
.build()));
tx.insert(
Insert.newBuilder()
.namespace("ycsb").table("usertable")
.partitionKey(Key.ofInt("ycsb_key", 1))
.build());
tx.upsert(
Upsert.newBuilder()
.namespace("ycsb").table("usertable")
.partitionKey(Key.ofInt("ycsb_key", 1))
.value(TextColumn.of("payload", "hello"))
.build());
tx.commit();
}
finally{
transactionManager.close();
}
|
@komamitsu I think the error indicates that you tried to insert a record that already exists. Maybe even if you remove the Upsert from the code, you still encounter the same error, right? |
@brfrn169 Oh, sorry, I forgot to call |
Description
This PR revisits the behavior of multiple mutations for the same record in a transaction in Consensus Commit.
Expected behavior for combinations of mutations on the same record are as follows:
I've added some test cases to ConsensusCommitIntegrationTestBase.java to check the behavior.
To achieve this behavior, we need to make the following changes:
Related issues and/or PRs
N/A
Changes made
Snapshot
.Checklist
Additional notes (optional)
The Insert, Upsert, and Update are converted to Put internally. See the following for details:
scalardb/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitUtils.java
Lines 247 to 293 in 78f3883
Release notes
Fixed the behavior of multiple mutations for the same record in a transaction in Consensus Commit.