Skip to content

Commit

Permalink
Merge pull request #934: [proxima-direct-core] O2-Czech-Republic#340
Browse files Browse the repository at this point in the history
…do not terminate transaction after sync()
  • Loading branch information
je-ik authored Oct 18, 2024
2 parents b262c63 + b5e91ea commit 93985ed
Showing 1 changed file with 4 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public class Transaction implements AutoCloseable {
private final List<CompletableFuture<?>> runningUpdates =
Collections.synchronizedList(new ArrayList<>());
private final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>());
private boolean terminated = false;

private Transaction(String transactionId) {
this.transactionId = transactionId;
Expand Down Expand Up @@ -206,7 +205,6 @@ public void update(List<KeyAttribute> addedInputs) {
new TransactionRejectedException(transactionId, Flags.ABORTED));
}
synchronized (runningUpdates) {
Preconditions.checkState(!terminated);
// wait for receiving the response
runningUpdates.add(future);
// and for processing it
Expand All @@ -227,7 +225,7 @@ public void commitWrite(List<StreamElement> outputs, CommitCallback callback) {
waitForInFlight()
.thenCompose(ign -> runTransforms(outputs))
// need to wait for any requests added during possible transforms
.thenCompose(elements -> waitForInFlight(elements, true))
.thenCompose(this::waitForInFlight)
.thenCompose(this::sendCommitRequest)
.thenCompose(this::handleCommitResponse)
.completeOnTimeout(
Expand All @@ -249,13 +247,12 @@ public void commitWrite(List<StreamElement> outputs, CommitCallback callback) {
}

private CompletableFuture<Collection<StreamElement>> waitForInFlight() {
return waitForInFlight(null, false);
return waitForInFlight(null);
}

private <T> CompletableFuture<T> waitForInFlight(@Nullable T result, boolean isFinal) {
private <T> CompletableFuture<T> waitForInFlight(@Nullable T result) {
synchronized (runningUpdates) {
CompletableFuture<?>[] futures = runningUpdates.toArray(new CompletableFuture<?>[] {});
this.terminated = terminated || isFinal;
return CompletableFuture.allOf(futures).thenApply(ign -> result);
}
}
Expand Down Expand Up @@ -446,7 +443,7 @@ public CompletableFuture<Void> rollback() {

public void sync() throws TransactionRejectedException {
try {
waitForInFlight(null, true).get();
waitForInFlight().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED);
Expand Down

0 comments on commit 93985ed

Please sign in to comment.