From 6e5e851237aeabeaf4109325403e09759d037920 Mon Sep 17 00:00:00 2001
From: Jan Lukavsky <je.ik@seznam.cz>
Date: Tue, 14 Jan 2025 13:46:07 +0100
Subject: [PATCH] [transactions] enforce timeout on sync()

---
 .../core/transaction/TransactionManager.java     |  2 ++
 .../transaction/TransactionResourceManager.java  | 16 ++++++++++++++++
 .../TransactionalOnlineAttributeWriter.java      |  6 ++++--
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionManager.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionManager.java
index 1fab33589..38815dc35 100644
--- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionManager.java
+++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionManager.java
@@ -32,6 +32,8 @@ interface TransactionConfig {
     long getCleanupInterval();
 
     long getTransactionTimeoutMs();
+
+    long getSyncTimeoutMs();
   }
 
   EntityDescriptor getTransaction();
diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java
index 7eaf1177b..abdd07994 100644
--- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java
+++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java
@@ -123,6 +123,11 @@ public TransactionMonitoringPolicy getTransactionMonitoringPolicy() {
     public int getServerTerminationTimeoutSeconds() {
       return serverTerminationTimeoutSeconds;
     }
+
+    @Override
+    public long getSyncTimeoutMs() {
+      return syncTimeoutMs;
+    }
   }
 
   private class CachedWriters implements AutoCloseable {
@@ -392,6 +397,9 @@ public void withHandle(ObserveHandle handle) {
   @Getter(AccessLevel.PACKAGE)
   private final int serverTerminationTimeoutSeconds;
 
+  @Getter(AccessLevel.PACKAGE)
+  private final int syncTimeoutMs;
+
   @VisibleForTesting
   public TransactionResourceManager(DirectDataOperator direct, Map<String, Object> cfg) {
     this.direct = direct;
@@ -405,6 +413,7 @@ public TransactionResourceManager(DirectDataOperator direct, Map<String, Object>
     this.initialSequenceIdPolicy = getInitialSequenceIdPolicy(cfg);
     this.transactionMonitoringPolicy = getTransactionMonitoringPolicy(cfg);
     this.serverTerminationTimeoutSeconds = getServerTerminationTimeout(cfg);
+    this.syncTimeoutMs = getSyncTimeout(cfg);
 
     log.info(
         "Created {} with transaction timeout {} ms",
@@ -454,6 +463,13 @@ private static int getServerTerminationTimeout(Map<String, Object> cfg) {
         .orElse(2);
   }
 
+  private int getSyncTimeout(Map<String, Object> cfg) {
+    return Optional.ofNullable(cfg.get("sync-timeout-ms"))
+        .map(Object::toString)
+        .map(Integer::valueOf)
+        .orElse(5000);
+  }
+
   @Override
   public void close() {
     openTransactionMap.forEach((k, v) -> v.close());
diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java
index 69653a156..e10f97873 100644
--- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java
+++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java
@@ -51,6 +51,7 @@
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.Getter;
@@ -258,6 +259,7 @@ private CompletableFuture<Collection<StreamElement>> waitForInFlight() {
     private <T> CompletableFuture<T> waitForInFlight(@Nullable T result) {
       synchronized (runningUpdates) {
         CompletableFuture<?>[] futures = runningUpdates.toArray(new CompletableFuture<?>[] {});
+        runningUpdates.clear();
         return CompletableFuture.allOf(futures).thenApply(ign -> result);
       }
     }
@@ -452,11 +454,11 @@ public CompletableFuture<Void> rollback() {
 
     public void sync() throws TransactionRejectedException {
       try {
-        waitForInFlight().get();
+        waitForInFlight().get(manager.getCfg().getSyncTimeoutMs(), TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED);
-      } catch (ExecutionException e) {
+      } catch (ExecutionException | TimeoutException e) {
         if (e.getCause() instanceof TransactionRejectedException) {
           throw (TransactionRejectedException) e.getCause();
         }