Skip to content

Commit

Permalink
Merge pull request #944: [proxima-beam-core] state expander: drop ele…
Browse files Browse the repository at this point in the history
…ments with timestamp behind state write time
  • Loading branch information
je-ik authored Dec 9, 2024
2 parents 3b4a5c3 + 711d7ec commit 980338b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ public Boolean apply(Object[] args) {
// set the initial timer
flushTimer.set(stateWriteInstant);
}
if (stateWriteInstant.isAfter(ts)) {
log.debug(
"Dropping element {} with {}, which precedes stateWriteInstant {}",
elem,
ts,
stateWriteInstant);
return false;
}
boolean shouldBuffer =
nextFlush == null /* we have not finished reading state */
|| nextFlush.isBefore(ts) /* the timestamp if after next flush */;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,14 @@ public static List<Class<? extends PipelineRunner<?>>> params() {

@Test
public void testSimpleExpand() throws IOException {
Instant now = new Instant(0);
Pipeline pipeline = createPipeline();
PCollection<String> inputs = pipeline.apply(Create.of("1", "2", "3"));
PCollection<String> inputs =
pipeline.apply(
Create.timestamped(
TimestampedValue.of("1", now),
TimestampedValue.of("2", now),
TimestampedValue.of("3", now)));
PCollection<KV<Integer, String>> withKeys =
inputs.apply(
WithKeys.<Integer, String>of(e -> Integer.parseInt(e) % 2)
Expand All @@ -100,7 +106,33 @@ public void testSimpleExpand() throws IOException {
ExternalStateExpander.expand(
pipeline,
Create.empty(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
new Instant(0),
now,
ign -> BoundedWindow.TIMESTAMP_MAX_VALUE,
dummy());
expanded.run();
}

@Test
public void testSimpleExpandWithDrop() throws IOException {
Instant now = new Instant(0);
Pipeline pipeline = createPipeline();
PCollection<String> inputs =
pipeline.apply(
Create.timestamped(
TimestampedValue.of("1", now.minus(1)),
TimestampedValue.of("2", now.minus(1)),
TimestampedValue.of("3", now.minus(1))));
PCollection<KV<Integer, String>> withKeys =
inputs.apply(
WithKeys.<Integer, String>of(e -> Integer.parseInt(e) % 2)
.withKeyType(TypeDescriptors.integers()));
PCollection<Long> count = withKeys.apply(ParDo.of(getSumFn()));
PAssert.that(count).empty();
Pipeline expanded =
ExternalStateExpander.expand(
pipeline,
Create.empty(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
now,
ign -> BoundedWindow.TIMESTAMP_MAX_VALUE,
dummy());
expanded.run();
Expand Down Expand Up @@ -136,7 +168,13 @@ public void testWithTimer() throws IOException {
@Test
public void testSimpleExpandMultiOutput() throws IOException {
Pipeline pipeline = createPipeline();
PCollection<String> inputs = pipeline.apply(Create.of("1", "2", "3"));
Instant now = new Instant(0);
PCollection<String> inputs =
pipeline.apply(
Create.timestamped(
TimestampedValue.of("1", now),
TimestampedValue.of("2", now),
TimestampedValue.of("3", now)));
PCollection<KV<Integer, String>> withKeys =
inputs.apply(
WithKeys.<Integer, String>of(e -> Integer.parseInt(e) % 2)
Expand All @@ -151,7 +189,7 @@ public void testSimpleExpandMultiOutput() throws IOException {
ExternalStateExpander.expand(
pipeline,
Create.empty(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
new Instant(0),
now,
ign -> BoundedWindow.TIMESTAMP_MAX_VALUE,
dummy());
expanded.run();
Expand All @@ -170,24 +208,33 @@ public PCollection<Long> expand(PCollection<String> input) {
return withKeys.apply(ParDo.of(getSumFn()));
}
};
Instant now = new Instant(0);
Pipeline pipeline = createPipeline();
PCollection<String> inputs = pipeline.apply(Create.of("1", "2", "3"));
PCollection<String> inputs =
pipeline.apply(
Create.timestamped(
TimestampedValue.of("1", now),
TimestampedValue.of("2", now),
TimestampedValue.of("3", now)));
PCollection<Long> count = inputs.apply(transform);
PAssert.that(count).containsInAnyOrder(2L, 4L);
Pipeline expanded =
ExternalStateExpander.expand(
pipeline,
Create.empty(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
new Instant(0),
now,
ign -> BoundedWindow.TIMESTAMP_MAX_VALUE,
dummy());
expanded.run();
}

@Test
public void testSimpleExpandWithInitialState() throws IOException {
Instant now = new Instant(0);
Pipeline pipeline = createPipeline();
PCollection<String> inputs = pipeline.apply(Create.of("3", "4"));
PCollection<String> inputs =
pipeline.apply(
Create.timestamped(TimestampedValue.of("3", now), TimestampedValue.of("4", now)));
PCollection<KV<Integer, String>> withKeys =
inputs.apply(
WithKeys.<Integer, String>of(e -> Integer.parseInt(e) % 2)
Expand All @@ -213,7 +260,7 @@ public void testSimpleExpandWithInitialState() throws IOException {
"sum",
CoderUtils.encodeToByteArray(longCoder, 1L))))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
new Instant(0),
now,
current -> BoundedWindow.TIMESTAMP_MAX_VALUE,
dummy());
expanded.run();
Expand Down

0 comments on commit 980338b

Please sign in to comment.