Skip to content

Commit

Permalink
Merge pull request #935: [proxima-direct] O2-Czech-Republic#341 fix t…
Browse files Browse the repository at this point in the history
…ransaction idempotency
  • Loading branch information
je-ik authored Oct 22, 2024
2 parents 93985ed + ae7cd31 commit c378f97
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ private <T> CompletableFuture<T> waitForInFlight(@Nullable T result) {

private CompletableFuture<Collection<StreamElement>> runTransforms(
List<StreamElement> outputs) {

if (state != State.Flags.OPEN) {
return CompletableFuture.failedFuture(
new TransactionRejectedException(
Expand All @@ -278,7 +279,10 @@ private CompletableFuture<Response> sendCommitRequest(Collection<StreamElement>
runningUpdates.clear();
}
if (state != State.Flags.OPEN) {
return CompletableFuture.completedFuture(Response.empty().aborted());
if (state != State.Flags.COMMITTED) {
return CompletableFuture.completedFuture(Response.empty().aborted());
}
return CompletableFuture.completedFuture(Response.empty().duplicate(sequenceId));
}
log.debug("Sending commit request for transformed elements {}", transformed);
return manager.commit(transactionId, transformed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,13 +437,15 @@ private void processTransactionUpdateRequest(
State currentState = manager.getCurrentState(transactionId);
State newState = transitionState(transactionId, currentState, request);
monitoringPolicy.stateUpdate(transactionId, currentState, newState);
Response response = getResponseForNewState(request, currentState, newState);
// we have successfully computed new state, produce response
log.info(
"Transaction {} transitioned to state {} from {}",
"Transaction {} transitioned due to {} from state {} to state {}, returning {}",
transactionId,
request.getFlags(),
currentState.getFlags(),
newState.getFlags(),
currentState.getFlags());
Response response = getResponseForNewState(request, currentState, newState);
response.getFlags());
manager.ensureTransactionOpen(transactionId, newState);
monitoringPolicy.outgoingResponse(transactionId, response);
manager.writeResponseAndUpdateState(
Expand All @@ -464,7 +466,8 @@ private Response getResponseForNewState(Request request, State oldState, State n
? Response.forRequest(request).open(newState.getSequentialId(), newState.getStamp())
: Response.forRequest(request).updated();
case COMMITTED:
if (request.getFlags() == Request.Flags.OPEN) {
if (request.getFlags() != Request.Flags.COMMIT
|| oldState.getFlags() == State.Flags.COMMITTED) {
return Response.forRequest(request).duplicate(newState.getSequentialId());
}
return Response.forRequest(request).committed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.junit.Assert.*;

import cz.o2.proxima.core.annotations.DeclaredThreadSafe;
import cz.o2.proxima.core.functional.Factory;
import cz.o2.proxima.core.repository.ConfigConstants;
import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor.Regular;
import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor.Wildcard;
Expand Down Expand Up @@ -270,6 +271,37 @@ public void testDeletedAttributeGet() throws InterruptedException {
verifyNumInAttributeIs(key, numWrites + 1, attrA);
}

@Test(timeout = 10_000)
public void testTransactionCommitIdempotent() throws InterruptedException {
String transactionId = UUID.randomUUID().toString();
String key = "key";
String attrA = device.toAttributePrefix() + "A";
String attrB = device.toAttributePrefix() + "B";
replicatedLatch = new CountDownLatch(1);
writeSeedValue(attrA, key);
replicatedLatch.await();
assertTrue(view.get(key, attrA, device).isPresent());
assertFalse(view.get(key, attrB, device).isPresent());
// swap with stable transaction ID
swapValueBetween(key, attrA, attrB, () -> transactionId);
{
Optional<KeyValue<byte[]>> valueA = view.get(key, attrA, device);
Optional<KeyValue<byte[]>> valueB = view.get(key, attrB, device);
assertFalse(valueA.isPresent());
assertTrue(valueB.isPresent());
}

// try reissuing the same transaction
swapValueBetween(key, attrA, attrB, () -> transactionId);
{
// same outcome as previously
Optional<KeyValue<byte[]>> valueA = view.get(key, attrA, device);
Optional<KeyValue<byte[]>> valueB = view.get(key, attrB, device);
assertFalse(valueA.isPresent());
assertTrue(valueB.isPresent());
}
}

private void writeSeedValue(String attribute, String key) {
OnlineAttributeWriter writer = Optionals.get(direct.getWriter(device));
StreamElement upsert =
Expand All @@ -286,9 +318,16 @@ private void writeSeedValue(String attribute, String key) {
private void swapValueBetween(String key, String attrA, String attrB)
throws InterruptedException {

swapValueBetween(key, attrA, attrB, () -> UUID.randomUUID().toString());
}

private void swapValueBetween(
String key, String attrA, String attrB, Factory<String> transactionIdFn)
throws InterruptedException {

long retrySleep = 1;
do {
String transactionId = UUID.randomUUID().toString();
String transactionId = transactionIdFn.apply();
BlockingQueue<Response> responses = new ArrayBlockingQueue<>(5);
Optional<KeyValue<byte[]>> valA = view.get(key, attrA, device);
Optional<KeyValue<byte[]>> valB = view.get(key, attrB, device);
Expand All @@ -306,7 +345,7 @@ private void swapValueBetween(String key, String attrA, String attrB)
client.begin(transactionId, fetched).thenApply(responses::add);

Response response = responses.take();
if (response.getFlags() != Flags.OPEN) {
if (response.getFlags() != Flags.OPEN && response.getFlags() != Flags.DUPLICATE) {
TimeUnit.MILLISECONDS.sleep(Math.min(8, retrySleep *= 2));
continue;
}
Expand All @@ -329,7 +368,8 @@ private void swapValueBetween(String key, String attrA, String attrB)
client.commit(transactionId, updates).thenApply(responses::add);

response = responses.take();
if (response.getFlags() != Flags.COMMITTED) {
boolean committed = response.getFlags() == Flags.COMMITTED;
if (response.getFlags() != Flags.COMMITTED && response.getFlags() != Flags.DUPLICATE) {
TimeUnit.MILLISECONDS.sleep(Math.min(8, retrySleep *= 2));
continue;
}
Expand All @@ -338,17 +378,19 @@ private void swapValueBetween(String key, String attrA, String attrB)
"There was error detected during transaction, it should be impossible to commit this transaction!",
errorDetected);

CountDownLatch latch = new CountDownLatch(1);
CommitCallback callback =
(succ, exc) -> {
if (!succ) {
client.rollback(transactionId);
}
latch.countDown();
};
CommitCallback multiCallback = CommitCallback.afterNumCommits(updates.size(), callback);
updates.forEach(u -> view.write(u, multiCallback));
latch.await();
if (committed) {
CountDownLatch latch = new CountDownLatch(1);
CommitCallback callback =
(succ, exc) -> {
if (!succ) {
client.rollback(transactionId);
}
latch.countDown();
};
CommitCallback multiCallback = CommitCallback.afterNumCommits(updates.size(), callback);
updates.forEach(u -> view.write(u, multiCallback));
latch.await();
}
break;
} while (true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,8 +802,8 @@ public void testCreateTransactionCommitRollback() throws InterruptedException {
responseQueue.take();
clientManager.rollback(transactionId).thenAccept(responseQueue::add);
response = responseQueue.take();
// once committed the transaction must stay committed
assertEquals(Response.Flags.COMMITTED, response.getFlags());
// once committed the transaction must stay committed, but return as duplicate
assertEquals(Response.Flags.DUPLICATE, response.getFlags());
}

@Test(timeout = 10000)
Expand Down

0 comments on commit c378f97

Please sign in to comment.