From 6fc19dcf7698840cfa47aaddc9274ed7acc2a047 Mon Sep 17 00:00:00 2001 From: Marco Lehmann Date: Mon, 4 Feb 2019 11:33:49 +0100 Subject: [PATCH 1/2] Fixed check if keep alive limit is reached for indefinitely configured limit --- .../nakadi/service/subscription/StreamParameters.java | 2 +- .../nakadi/service/subscription/StreamParametersTest.java | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java index 3dfb62df51..f76bd17333 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java @@ -83,7 +83,7 @@ public boolean isStreamLimitReached(final long commitedEvents) { } public boolean isKeepAliveLimitReached(final IntStream keepAlive) { - return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it)).orElse(false); + return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it) && it > 0).orElse(false); } public Client getConsumingClient() { diff --git a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java index d876610c46..a921fd1501 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java @@ -70,6 +70,14 @@ public void checkIsKeepAliveLimitReached() throws Exception { assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 4, 12)), is(false)); } + @Test + public void checkIsKeepAliveLimitReachedIndefinitely() throws Exception { + final StreamParameters streamParameters = createStreamParameters(1, null, 0, null, 0, 0, 0, mock(Client.class)); + + assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 6, 12)), is(false)); + assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 4, 12)), is(false)); + } + @Test public void checkGetMessagesAllowedToSend() throws Exception { final StreamParameters streamParameters = createStreamParameters(1, 200L, 0, null, null, 0, 0, From 613be22ac4962387b3765d4a4e9458f00ca73d91 Mon Sep 17 00:00:00 2001 From: Marco Lehmann Date: Mon, 4 Feb 2019 13:21:48 +0100 Subject: [PATCH 2/2] Moved filter to constructor --- .../nakadi/service/subscription/StreamParameters.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java index f76bd17333..f690ee99fd 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java @@ -61,7 +61,7 @@ private StreamParameters( .filter(timeout -> timeout > 0 && timeout <= EventStreamConfig.MAX_STREAM_TIMEOUT) .orElse((long) EventStreamConfig.generateDefaultStreamTimeout())); this.maxUncommittedMessages = userParameters.getMaxUncommittedEvents().orElse(10); - this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit(); + this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit().filter(v -> v != 0); this.partitions = userParameters.getPartitions(); this.consumingClient = consumingClient; @@ -78,12 +78,12 @@ public long getMessagesAllowedToSend(final long limit, final long sentSoFar) { return streamLimitEvents.map(v -> Math.max(0, Math.min(limit, v - sentSoFar))).orElse(limit); } - public boolean isStreamLimitReached(final long commitedEvents) { - return streamLimitEvents.map(v -> v <= commitedEvents).orElse(false); + public boolean isStreamLimitReached(final long committedEvents) { + return streamLimitEvents.map(v -> v <= committedEvents).orElse(false); } public boolean isKeepAliveLimitReached(final IntStream keepAlive) { - return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it) && it > 0).orElse(false); + return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it)).orElse(false); } public Client getConsumingClient() {