From 6e5e851237aeabeaf4109325403e09759d037920 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky <je.ik@seznam.cz> Date: Tue, 14 Jan 2025 13:46:07 +0100 Subject: [PATCH] [transactions] enforce timeout on sync() --- .../core/transaction/TransactionManager.java | 2 ++ .../transaction/TransactionResourceManager.java | 16 ++++++++++++++++ .../TransactionalOnlineAttributeWriter.java | 6 ++++-- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionManager.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionManager.java index 1fab33589..38815dc35 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionManager.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionManager.java @@ -32,6 +32,8 @@ interface TransactionConfig { long getCleanupInterval(); long getTransactionTimeoutMs(); + + long getSyncTimeoutMs(); } EntityDescriptor getTransaction(); diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java index 7eaf1177b..abdd07994 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java @@ -123,6 +123,11 @@ public TransactionMonitoringPolicy getTransactionMonitoringPolicy() { public int getServerTerminationTimeoutSeconds() { return serverTerminationTimeoutSeconds; } + + @Override + public long getSyncTimeoutMs() { + return syncTimeoutMs; + } } private class CachedWriters implements AutoCloseable { @@ -392,6 +397,9 @@ public void withHandle(ObserveHandle handle) { @Getter(AccessLevel.PACKAGE) private final int serverTerminationTimeoutSeconds; + @Getter(AccessLevel.PACKAGE) + private final int syncTimeoutMs; + @VisibleForTesting public TransactionResourceManager(DirectDataOperator direct, Map<String, Object> cfg) { this.direct = direct; @@ -405,6 +413,7 @@ public TransactionResourceManager(DirectDataOperator direct, Map<String, Object> this.initialSequenceIdPolicy = getInitialSequenceIdPolicy(cfg); this.transactionMonitoringPolicy = getTransactionMonitoringPolicy(cfg); this.serverTerminationTimeoutSeconds = getServerTerminationTimeout(cfg); + this.syncTimeoutMs = getSyncTimeout(cfg); log.info( "Created {} with transaction timeout {} ms", @@ -454,6 +463,13 @@ private static int getServerTerminationTimeout(Map<String, Object> cfg) { .orElse(2); } + private int getSyncTimeout(Map<String, Object> cfg) { + return Optional.ofNullable(cfg.get("sync-timeout-ms")) + .map(Object::toString) + .map(Integer::valueOf) + .orElse(5000); + } + @Override public void close() { openTransactionMap.forEach((k, v) -> v.close()); diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java index 69653a156..e10f97873 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java @@ -51,6 +51,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.Getter; @@ -258,6 +259,7 @@ private CompletableFuture<Collection<StreamElement>> waitForInFlight() { private <T> CompletableFuture<T> waitForInFlight(@Nullable T result) { synchronized (runningUpdates) { CompletableFuture<?>[] futures = runningUpdates.toArray(new CompletableFuture<?>[] {}); + runningUpdates.clear(); return CompletableFuture.allOf(futures).thenApply(ign -> result); } } @@ -452,11 +454,11 @@ public CompletableFuture<Void> rollback() { public void sync() throws TransactionRejectedException { try { - waitForInFlight().get(); + waitForInFlight().get(manager.getCfg().getSyncTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED); - } catch (ExecutionException e) { + } catch (ExecutionException | TimeoutException e) { if (e.getCause() instanceof TransactionRejectedException) { throw (TransactionRejectedException) e.getCause(); }