From ac9229bcba8a0e07b901ea75e7285e8c34ea1674 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Wed, 11 Dec 2024 14:40:48 +0100 Subject: [PATCH] [proxima-beam-core] #345 add WatermarkShift transform --- .../proxima/beam/util/FilterLatecomers.java | 36 +++---- .../o2/proxima/beam/util/WatermarkShift.java | 101 ++++++++++++++++++ .../beam/util/FilterLatecomersTest.java | 7 +- .../proxima/beam/util/WatermarkShiftTest.java | 68 ++++++++++++ 4 files changed, 185 insertions(+), 27 deletions(-) create mode 100644 beam/core/src/main/java/cz/o2/proxima/beam/util/WatermarkShift.java create mode 100644 beam/core/src/test/java/cz/o2/proxima/beam/util/WatermarkShiftTest.java diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/util/FilterLatecomers.java b/beam/core/src/main/java/cz/o2/proxima/beam/util/FilterLatecomers.java index 1313cc440..293f59824 100644 --- a/beam/core/src/main/java/cz/o2/proxima/beam/util/FilterLatecomers.java +++ b/beam/core/src/main/java/cz/o2/proxima/beam/util/FilterLatecomers.java @@ -16,7 +16,6 @@ package cz.o2.proxima.beam.util; import cz.o2.proxima.beam.util.state.ExcludeExternal; -import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.state.Timer; @@ -50,25 +49,13 @@ public static FilterLatecomers of() { } @SuppressWarnings("unchecked") - public static PCollection getOnTime(PCollectionTuple tuple, TypeDescriptor type) { - Coder coder = getCoder(tuple, type); - return (PCollection) - tuple.get(ON_TIME_TAG).setTypeDescriptor((TypeDescriptor) type).setCoder(coder); + public static PCollection getOnTime(PCollectionTuple tuple) { + return (PCollection) tuple.get(ON_TIME_TAG); } @SuppressWarnings("unchecked") - public static PCollection getLate(PCollectionTuple tuple, TypeDescriptor type) { - final Coder coder = getCoder(tuple, type); - return (PCollection) - tuple.get(LATE_TAG).setTypeDescriptor((TypeDescriptor) type).setCoder(coder); - } - - private static Coder getCoder(PCollectionTuple tuple, TypeDescriptor type) { - try { - return tuple.getPipeline().getCoderRegistry().getCoder(type); - } catch (CannotProvideCoderException e) { - throw new IllegalStateException(e); - } + public static PCollection getLate(PCollectionTuple tuple) { + return (PCollection) tuple.get(LATE_TAG); } @ExcludeExternal @@ -107,16 +94,21 @@ public TypeDescriptor getOutputTypeDescriptor() { public void timer() {} } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) @Override public PCollectionTuple expand(PCollection input) { PCollection> withKeys = input.apply( WithKeys.of(Object::hashCode).withKeyType(TypeDescriptors.integers())); TupleTag mainTag = (TupleTag) ON_TIME_TAG; - return withKeys.apply( - "filter", - ParDo.of(new FilterFn<>(input.getTypeDescriptor())) - .withOutputTags(mainTag, TupleTagList.of(LATE_TAG))); + PCollectionTuple tuple = + withKeys.apply( + "filter", + ParDo.of(new FilterFn<>(input.getTypeDescriptor())) + .withOutputTags(mainTag, TupleTagList.of(LATE_TAG))); + final Coder coder = input.getCoder(); + tuple.get(LATE_TAG).setCoder((Coder) coder).setTypeDescriptor(input.getTypeDescriptor()); + tuple.get(ON_TIME_TAG).setCoder((Coder) coder).setTypeDescriptor(input.getTypeDescriptor()); + return tuple; } } diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/util/WatermarkShift.java b/beam/core/src/main/java/cz/o2/proxima/beam/util/WatermarkShift.java new file mode 100644 index 000000000..101db8634 --- /dev/null +++ b/beam/core/src/main/java/cz/o2/proxima/beam/util/WatermarkShift.java @@ -0,0 +1,101 @@ +/* + * Copyright 2017-2024 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.util; + +import cz.o2.proxima.beam.util.state.ExcludeExternal; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Shift watermark of input {@link PCollection} by given duration back in time. + * + * @param type parameter + */ +public class WatermarkShift extends PTransform, PCollection> { + + public static WatermarkShift of(Duration duration) { + return new WatermarkShift<>(duration); + } + + private final Duration shiftDuration; + + public WatermarkShift(Duration shiftDuration) { + this.shiftDuration = shiftDuration; + } + + @Override + public PCollection expand(PCollection input) { + PCollection impulse = input.getPipeline().apply(Impulse.create()); + // filter elements out, just take watermark + PCollection originalWatermark = + input + .apply(Filter.by(e -> false)) + .apply(MapElements.into(impulse.getTypeDescriptor()).via(e -> new byte[0])); + PCollection holdWatermark = + PCollectionList.of(impulse) + .and(originalWatermark) + .apply(Flatten.pCollections()) + .apply(WithKeys.of("")) + .apply("hold", ParDo.of(new HoldFn())) + .apply(MapElements.into(input.getTypeDescriptor()).via(e -> null)) + .setCoder(input.getCoder()); + return PCollectionList.of(input) + .and(holdWatermark) + .apply(Flatten.pCollections()) + .setCoder(input.getCoder()) + .setTypeDescriptor(input.getTypeDescriptor()); + } + + @ExcludeExternal + private class HoldFn extends DoFn, byte[]> { + + private final Instant minInstant = BoundedWindow.TIMESTAMP_MIN_VALUE.plus(shiftDuration); + + @TimerId("holder") + private final TimerSpec holderTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processImpulse(@TimerId("holder") Timer holder) { + // start the timer + holder.withOutputTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE).set(minInstant); + } + + @OnTimer("holder") + public void onTimer(@TimerId("holder") Timer holder) { + Instant current = holder.getCurrentRelativeTime(); + if (current.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + Instant shifted = current.minus(shiftDuration); + holder.withOutputTimestamp(shifted).offset(Duration.ZERO).setRelative(); + } + } + } +} diff --git a/beam/core/src/test/java/cz/o2/proxima/beam/util/FilterLatecomersTest.java b/beam/core/src/test/java/cz/o2/proxima/beam/util/FilterLatecomersTest.java index 86264b40b..b3dd3c1a5 100644 --- a/beam/core/src/test/java/cz/o2/proxima/beam/util/FilterLatecomersTest.java +++ b/beam/core/src/test/java/cz/o2/proxima/beam/util/FilterLatecomersTest.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.Instant; import org.junit.Test; @@ -44,10 +43,8 @@ public void testFiltering() { .addElements(TimestampedValue.of(0, now.minus(1))) .advanceWatermarkToInfinity()); PCollectionTuple result = input.apply(FilterLatecomers.of()); - PAssert.that(FilterLatecomers.getOnTime(result, TypeDescriptors.integers())) - .containsInAnyOrder(1, 2, 3); - PAssert.that(FilterLatecomers.getLate(result, TypeDescriptors.integers())) - .containsInAnyOrder(0); + PAssert.that(FilterLatecomers.getOnTime(result)).containsInAnyOrder(1, 2, 3); + PAssert.that(FilterLatecomers.getLate(result)).containsInAnyOrder(0); p.run(); } } diff --git a/beam/core/src/test/java/cz/o2/proxima/beam/util/WatermarkShiftTest.java b/beam/core/src/test/java/cz/o2/proxima/beam/util/WatermarkShiftTest.java new file mode 100644 index 000000000..243ddfe3f --- /dev/null +++ b/beam/core/src/test/java/cz/o2/proxima/beam/util/WatermarkShiftTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017-2024 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.util; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +public class WatermarkShiftTest { + + @Test + public void testWatermarkShift() { + Pipeline p = Pipeline.create(); + Instant now = new Instant(0); + PCollection input = + p.apply( + TestStream.create(VarIntCoder.of()) + .advanceWatermarkTo(now) + .addElements( + TimestampedValue.of(1, now.plus(1)), TimestampedValue.of(2, now.plus(2))) + .advanceWatermarkTo(now.plus(5)) + .addElements(TimestampedValue.of(0, now.minus(1))) + .advanceWatermarkToInfinity()); + PCollectionTuple filtered = input.apply(FilterLatecomers.of()); + PCollection onTimeBeforeShift = FilterLatecomers.getOnTime(filtered); + filtered = input.apply(WatermarkShift.of(Duration.millis(6))).apply(FilterLatecomers.of()); + PCollection onTimeAfterShift = FilterLatecomers.getOnTime(filtered); + PCollection> counts = + PCollectionList.of(onTimeAfterShift) + .and(onTimeBeforeShift) + .apply(Flatten.pCollections()) + .apply( + Window.into(new GlobalWindows()) + .triggering(AfterWatermark.pastEndOfWindow()) + .discardingFiredPanes()) + .apply(Count.perElement()); + // assert that 1, 2 are always on time, but 0 only after the shift + PAssert.that(counts).containsInAnyOrder(KV.of(1, 2L), KV.of(2, 2L), KV.of(0, 1L)); + p.run(); + } +}