Skip to content

Commit

Permalink
[transactions] enforce timeout on sync()
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Jan 14, 2025
1 parent a22e498 commit 6e5e851
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ interface TransactionConfig {
long getCleanupInterval();

long getTransactionTimeoutMs();

long getSyncTimeoutMs();
}

EntityDescriptor getTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public TransactionMonitoringPolicy getTransactionMonitoringPolicy() {
public int getServerTerminationTimeoutSeconds() {
return serverTerminationTimeoutSeconds;
}

@Override
public long getSyncTimeoutMs() {
return syncTimeoutMs;
}
}

private class CachedWriters implements AutoCloseable {
Expand Down Expand Up @@ -392,6 +397,9 @@ public void withHandle(ObserveHandle handle) {
@Getter(AccessLevel.PACKAGE)
private final int serverTerminationTimeoutSeconds;

@Getter(AccessLevel.PACKAGE)
private final int syncTimeoutMs;

@VisibleForTesting
public TransactionResourceManager(DirectDataOperator direct, Map<String, Object> cfg) {
this.direct = direct;
Expand All @@ -405,6 +413,7 @@ public TransactionResourceManager(DirectDataOperator direct, Map<String, Object>
this.initialSequenceIdPolicy = getInitialSequenceIdPolicy(cfg);
this.transactionMonitoringPolicy = getTransactionMonitoringPolicy(cfg);
this.serverTerminationTimeoutSeconds = getServerTerminationTimeout(cfg);
this.syncTimeoutMs = getSyncTimeout(cfg);

log.info(
"Created {} with transaction timeout {} ms",
Expand Down Expand Up @@ -454,6 +463,13 @@ private static int getServerTerminationTimeout(Map<String, Object> cfg) {
.orElse(2);
}

private int getSyncTimeout(Map<String, Object> cfg) {
return Optional.ofNullable(cfg.get("sync-timeout-ms"))
.map(Object::toString)
.map(Integer::valueOf)
.orElse(5000);
}

@Override
public void close() {
openTransactionMap.forEach((k, v) -> v.close());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Getter;
Expand Down Expand Up @@ -258,6 +259,7 @@ private CompletableFuture<Collection<StreamElement>> waitForInFlight() {
private <T> CompletableFuture<T> waitForInFlight(@Nullable T result) {
synchronized (runningUpdates) {
CompletableFuture<?>[] futures = runningUpdates.toArray(new CompletableFuture<?>[] {});
runningUpdates.clear();
return CompletableFuture.allOf(futures).thenApply(ign -> result);
}
}
Expand Down Expand Up @@ -452,11 +454,11 @@ public CompletableFuture<Void> rollback() {

public void sync() throws TransactionRejectedException {
try {
waitForInFlight().get();
waitForInFlight().get(manager.getCfg().getSyncTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED);
} catch (ExecutionException e) {
} catch (ExecutionException | TimeoutException e) {
if (e.getCause() instanceof TransactionRejectedException) {
throw (TransactionRejectedException) e.getCause();
}
Expand Down

0 comments on commit 6e5e851

Please sign in to comment.