diff --git a/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamStream.java b/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamStream.java index e4a13c4cd9..69e0438f07 100644 --- a/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamStream.java +++ b/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamStream.java @@ -1381,6 +1381,9 @@ public TypeDescriptor> getOutputTypeDescriptor() { @VisibleForTesting static class ReduceValueStateByKey extends DoFn, Pair> { + private static final Instant MAX_ACCEPTABLE_STAMP = + BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(300)); + static ReduceValueStateByKey of( Closure initialState, Closure stateUpdate, @@ -1458,16 +1461,19 @@ public void processElement( @OnTimer("earlyTimer") public void onTimer( - OnTimerContext context, + @Timestamp Instant ts, @StateId("value") ValueState> valueState, - @TimerId("earlyTimer") Timer earlyTimer) { + @TimerId("earlyTimer") Timer earlyTimer, + OutputReceiver> collector) { Pair current = Objects.requireNonNull(valueState.read()); O outputElem = output.call(current.getSecond(), null); if (outputElem != null) { - context.output(Pair.of(current.getFirst(), outputElem)); + collector.output(Pair.of(current.getFirst(), outputElem)); + } + if (ts.isBefore(MAX_ACCEPTABLE_STAMP)) { + earlyTimer.offset(earlyEmitting).setRelative(); } - earlyTimer.offset(earlyEmitting).setRelative(); } @SuppressWarnings("unchecked") diff --git a/pom.xml b/pom.xml index c05d0a5339..e86667d1b4 100644 --- a/pom.xml +++ b/pom.xml @@ -657,12 +657,30 @@ ${grpc.version} + + io.grpc + grpc-api + ${grpc.version} + + io.grpc grpc-protobuf ${grpc.version} + + io.grpc + grpc-context + ${grpc.version} + + + + io.grpc + grpc-census + ${grpc.version} + + io.grpc grpc-stub