From b8a8694da4a61d59dab2b472ac545f655962bb0e Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 27 Jun 2022 17:25:26 +0200 Subject: [PATCH] [proxima-beam] update dependencies and loop timer --- .../proxima/beam/tools/groovy/BeamStream.java | 14 ++++++++++---- pom.xml | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamStream.java b/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamStream.java index e4a13c4cd..69e0438f0 100644 --- a/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamStream.java +++ b/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamStream.java @@ -1381,6 +1381,9 @@ public TypeDescriptor> getOutputTypeDescriptor() { @VisibleForTesting static class ReduceValueStateByKey extends DoFn, Pair> { + private static final Instant MAX_ACCEPTABLE_STAMP = + BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(300)); + static ReduceValueStateByKey of( Closure initialState, Closure stateUpdate, @@ -1458,16 +1461,19 @@ public void processElement( @OnTimer("earlyTimer") public void onTimer( - OnTimerContext context, + @Timestamp Instant ts, @StateId("value") ValueState> valueState, - @TimerId("earlyTimer") Timer earlyTimer) { + @TimerId("earlyTimer") Timer earlyTimer, + OutputReceiver> collector) { Pair current = Objects.requireNonNull(valueState.read()); O outputElem = output.call(current.getSecond(), null); if (outputElem != null) { - context.output(Pair.of(current.getFirst(), outputElem)); + collector.output(Pair.of(current.getFirst(), outputElem)); + } + if (ts.isBefore(MAX_ACCEPTABLE_STAMP)) { + earlyTimer.offset(earlyEmitting).setRelative(); } - earlyTimer.offset(earlyEmitting).setRelative(); } @SuppressWarnings("unchecked") diff --git a/pom.xml b/pom.xml index c05d0a533..e86667d1b 100644 --- a/pom.xml +++ b/pom.xml @@ -657,12 +657,30 @@ ${grpc.version} + + io.grpc + grpc-api + ${grpc.version} + + io.grpc grpc-protobuf ${grpc.version} + + io.grpc + grpc-context + ${grpc.version} + + + + io.grpc + grpc-census + ${grpc.version} + + io.grpc grpc-stub