From 711d7eccd6027679c4d11fc2cdd4a188514da835 Mon Sep 17 00:00:00 2001
From: Jan Lukavsky <je.ik@seznam.cz>
Date: Mon, 9 Dec 2024 09:55:00 +0100
Subject: [PATCH] [proxima-beam-core] state expander: drop elements with
 timestamp behind state write time

---
 .../ProcessElementParameterExpander.java      |  8 +++
 .../util/state/ExternalStateExpanderTest.java | 63 ++++++++++++++++---
 2 files changed, 63 insertions(+), 8 deletions(-)

diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/util/state/ProcessElementParameterExpander.java b/beam/core/src/main/java/cz/o2/proxima/beam/util/state/ProcessElementParameterExpander.java
index a9dc32318..8e45ac317 100644
--- a/beam/core/src/main/java/cz/o2/proxima/beam/util/state/ProcessElementParameterExpander.java
+++ b/beam/core/src/main/java/cz/o2/proxima/beam/util/state/ProcessElementParameterExpander.java
@@ -209,6 +209,14 @@ public Boolean apply(Object[] args) {
         // set the initial timer
         flushTimer.set(stateWriteInstant);
       }
+      if (stateWriteInstant.isAfter(ts)) {
+        log.debug(
+            "Dropping element {} with {}, which precedes stateWriteInstant {}",
+            elem,
+            ts,
+            stateWriteInstant);
+        return false;
+      }
       boolean shouldBuffer =
           nextFlush == null /* we have not finished reading state */
               || nextFlush.isBefore(ts) /* the timestamp if after next flush */;
diff --git a/beam/core/src/test/java/cz/o2/proxima/beam/util/state/ExternalStateExpanderTest.java b/beam/core/src/test/java/cz/o2/proxima/beam/util/state/ExternalStateExpanderTest.java
index 8223337be..82cbcf5a1 100644
--- a/beam/core/src/test/java/cz/o2/proxima/beam/util/state/ExternalStateExpanderTest.java
+++ b/beam/core/src/test/java/cz/o2/proxima/beam/util/state/ExternalStateExpanderTest.java
@@ -88,8 +88,14 @@ public static List<Class<? extends PipelineRunner<?>>> params() {
 
   @Test
   public void testSimpleExpand() throws IOException {
+    Instant now = new Instant(0);
     Pipeline pipeline = createPipeline();
-    PCollection<String> inputs = pipeline.apply(Create.of("1", "2", "3"));
+    PCollection<String> inputs =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of("1", now),
+                TimestampedValue.of("2", now),
+                TimestampedValue.of("3", now)));
     PCollection<KV<Integer, String>> withKeys =
         inputs.apply(
             WithKeys.<Integer, String>of(e -> Integer.parseInt(e) % 2)
@@ -100,7 +106,33 @@ public void testSimpleExpand() throws IOException {
         ExternalStateExpander.expand(
             pipeline,
             Create.empty(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
-            new Instant(0),
+            now,
+            ign -> BoundedWindow.TIMESTAMP_MAX_VALUE,
+            dummy());
+    expanded.run();
+  }
+
+  @Test
+  public void testSimpleExpandWithDrop() throws IOException {
+    Instant now = new Instant(0);
+    Pipeline pipeline = createPipeline();
+    PCollection<String> inputs =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of("1", now.minus(1)),
+                TimestampedValue.of("2", now.minus(1)),
+                TimestampedValue.of("3", now.minus(1))));
+    PCollection<KV<Integer, String>> withKeys =
+        inputs.apply(
+            WithKeys.<Integer, String>of(e -> Integer.parseInt(e) % 2)
+                .withKeyType(TypeDescriptors.integers()));
+    PCollection<Long> count = withKeys.apply(ParDo.of(getSumFn()));
+    PAssert.that(count).empty();
+    Pipeline expanded =
+        ExternalStateExpander.expand(
+            pipeline,
+            Create.empty(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
+            now,
             ign -> BoundedWindow.TIMESTAMP_MAX_VALUE,
             dummy());
     expanded.run();
@@ -136,7 +168,13 @@ public void testWithTimer() throws IOException {
   @Test
   public void testSimpleExpandMultiOutput() throws IOException {
     Pipeline pipeline = createPipeline();
-    PCollection<String> inputs = pipeline.apply(Create.of("1", "2", "3"));
+    Instant now = new Instant(0);
+    PCollection<String> inputs =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of("1", now),
+                TimestampedValue.of("2", now),
+                TimestampedValue.of("3", now)));
     PCollection<KV<Integer, String>> withKeys =
         inputs.apply(
             WithKeys.<Integer, String>of(e -> Integer.parseInt(e) % 2)
@@ -151,7 +189,7 @@ public void testSimpleExpandMultiOutput() throws IOException {
         ExternalStateExpander.expand(
             pipeline,
             Create.empty(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
-            new Instant(0),
+            now,
             ign -> BoundedWindow.TIMESTAMP_MAX_VALUE,
             dummy());
     expanded.run();
@@ -170,15 +208,21 @@ public PCollection<Long> expand(PCollection<String> input) {
             return withKeys.apply(ParDo.of(getSumFn()));
           }
         };
+    Instant now = new Instant(0);
     Pipeline pipeline = createPipeline();
-    PCollection<String> inputs = pipeline.apply(Create.of("1", "2", "3"));
+    PCollection<String> inputs =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of("1", now),
+                TimestampedValue.of("2", now),
+                TimestampedValue.of("3", now)));
     PCollection<Long> count = inputs.apply(transform);
     PAssert.that(count).containsInAnyOrder(2L, 4L);
     Pipeline expanded =
         ExternalStateExpander.expand(
             pipeline,
             Create.empty(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
-            new Instant(0),
+            now,
             ign -> BoundedWindow.TIMESTAMP_MAX_VALUE,
             dummy());
     expanded.run();
@@ -186,8 +230,11 @@ public PCollection<Long> expand(PCollection<String> input) {
 
   @Test
   public void testSimpleExpandWithInitialState() throws IOException {
+    Instant now = new Instant(0);
     Pipeline pipeline = createPipeline();
-    PCollection<String> inputs = pipeline.apply(Create.of("3", "4"));
+    PCollection<String> inputs =
+        pipeline.apply(
+            Create.timestamped(TimestampedValue.of("3", now), TimestampedValue.of("4", now)));
     PCollection<KV<Integer, String>> withKeys =
         inputs.apply(
             WithKeys.<Integer, String>of(e -> Integer.parseInt(e) % 2)
@@ -213,7 +260,7 @@ public void testSimpleExpandWithInitialState() throws IOException {
                             "sum",
                             CoderUtils.encodeToByteArray(longCoder, 1L))))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
-            new Instant(0),
+            now,
             current -> BoundedWindow.TIMESTAMP_MAX_VALUE,
             dummy());
     expanded.run();