diff --git a/CHANGELOG.md b/CHANGELOG.md index df0f2299a7..f81191c6f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] -## [2.6.4] - 2018-04-25 +## [2.6.4] - 2018-04-26 ### Added - Add optional status to the /subscriptions endpoint @@ -14,7 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed - Fixed commit for subscriptions that use direct assignment of partitions - Fixed OutOfMemoryError when using huge values for batch_limit and max_uncommitted_events - +- Added flushing of collected events when reaching stream_timeout in subscription API ## [2.6.3] - 2018-04-10 diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index fbe206c818..c04ecfaf7b 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -101,6 +101,24 @@ public void whenStreamTimeoutReachedPossibleToCommit() throws Exception { Assert.assertEquals(SC_NO_CONTENT, statusCode); } + @Test(timeout = 10000) + public void whenStreamTimeoutReachedThenEventsFlushed() throws Exception { + final TestStreamingClient client = TestStreamingClient + .create(URL, subscription.getId(), + "batch_flush_timeout=600&batch_limit=1000&stream_timeout=2&max_uncommitted_events=1000") + .start(); + waitFor(() -> assertThat(client.getSessionId(), not(equalTo(SESSION_ID_UNKNOWN)))); + + publishEvents(eventType.getName(), 4, x -> "{\"foo\":\"bar\"}"); + + // when stream_timeout is reached we should get 2 batches: + // first one containing 4 events, second one with debug message + waitFor(() -> assertThat(client.getBatches(), hasSize(2))); + assertThat(client.getBatches().get(0).getEvents(), hasSize(4)); + assertThat(client.getBatches().get(1).getEvents(), hasSize(0)); + System.out.println(client.getBatches()); + } + @Test(timeout = 30000) public void whenOffsetIsCommittedNextSessionStartsFromNextEventAfterCommitted() throws Exception { // write 4 events to event-type diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java index fd6bc1f67a..ec2a3b588e 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java @@ -56,12 +56,16 @@ class PartitionData { @Nullable List takeEventsToStream(final long currentTimeMillis, final int batchSize, - final long batchTimeoutMillis) { + final long batchTimeoutMillis, final boolean streamTimeoutReached) { final boolean countReached = (nakadiEvents.size() >= batchSize) && batchSize > 0; final boolean timeReached = (currentTimeMillis - lastSendMillis) >= batchTimeoutMillis; if (countReached || timeReached) { lastSendMillis = currentTimeMillis; return extract(batchSize); + } else if (streamTimeoutReached) { + lastSendMillis = currentTimeMillis; + final List extractedEvents = extract(batchSize); + return extractedEvents.isEmpty() ? null : extractedEvents; } else { return null; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index b07d6076ed..25c0a4bab7 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -95,9 +95,10 @@ public void onEnter() { scheduleTask(this::checkBatchTimeouts, getParameters().batchTimeoutMillis, TimeUnit.MILLISECONDS); scheduleTask(() -> { + streamToOutput(true); final String debugMessage = "Stream timeout reached"; - this.sendMetadata(debugMessage); - this.shutdownGracefully(debugMessage); + sendMetadata(debugMessage); + shutdownGracefully(debugMessage); }, getParameters().streamTimeoutMillis, TimeUnit.MILLISECONDS); @@ -221,6 +222,10 @@ private void checkBatchTimeouts() { } private void streamToOutput() { + streamToOutput(false); + } + + private void streamToOutput(final boolean streamTimeoutReached) { final long currentTimeMillis = System.currentTimeMillis(); int messagesAllowedToSend = (int) getMessagesAllowedToSend(); final boolean wasCommitted = isEverythingCommitted(); @@ -231,7 +236,8 @@ private void streamToOutput() { while (null != (toSend = e.getValue().takeEventsToStream( currentTimeMillis, Math.min(getParameters().batchLimitEvents, messagesAllowedToSend), - getParameters().batchTimeoutMillis))) { + getParameters().batchTimeoutMillis, + streamTimeoutReached))) { sentSomething |= !toSend.isEmpty(); flushData(e.getKey(), toSend, batchesSent == 0 ? Optional.of("Stream started") : Optional.empty()); this.sentEvents += toSend.size(); @@ -649,6 +655,7 @@ private void removeFromStreaming(final EventTypePartition key) { /** * If stream doesn't have any partitions - start timer that will close this session * in commitTimeout*2 if it doesn't get any partitions during that time + * * @param topology the new topology */ private void trackIdleness(final ZkSubscriptionClient.Topology topology) { diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java index 78f8f1405e..af2324ff42 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java @@ -63,7 +63,7 @@ public void normalOperationShouldNotReconfigureKafkaConsumer() { pd.addEvent(new ConsumedEvent(("test_" + i).getBytes(), createCursor(100L + i + 1))); } // Now say to it that it was sent - pd.takeEventsToStream(currentTimeMillis(), 1000, 0L); + pd.takeEventsToStream(currentTimeMillis(), 1000, 0L, false); assertEquals(100L, pd.getUnconfirmed()); for (long i = 0; i < 10; ++i) { final PartitionData.CommitResult cr = pd.onCommitOffset(createCursor(110L + i * 10L)); @@ -77,14 +77,14 @@ public void normalOperationShouldNotReconfigureKafkaConsumer() { public void keepAliveCountShouldIncreaseOnEachEmptyCall() { final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (int i = 0; i < 100; ++i) { - pd.takeEventsToStream(currentTimeMillis(), 10, 0L); + pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(i + 1, pd.getKeepAliveInARow()); } pd.addEvent(new ConsumedEvent("".getBytes(), createCursor(101L))); assertEquals(100, pd.getKeepAliveInARow()); - pd.takeEventsToStream(currentTimeMillis(), 10, 0L); + pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(0, pd.getKeepAliveInARow()); - pd.takeEventsToStream(currentTimeMillis(), 10, 0L); + pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(1, pd.getKeepAliveInARow()); } @@ -97,26 +97,26 @@ public void eventsShouldBeStreamedOnTimeout() { for (int i = 0; i < 100; ++i) { pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); } - List data = pd.takeEventsToStream(currentTime, 1000, timeout); + List data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNull(data); assertEquals(0, pd.getKeepAliveInARow()); currentTime += timeout + 1; - data = pd.takeEventsToStream(currentTime, 1000, timeout); + data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNotNull(data); assertEquals(100, data.size()); for (int i = 100; i < 200; ++i) { pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); } - data = pd.takeEventsToStream(currentTime, 1000, timeout); + data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNull(data); assertEquals(0, pd.getKeepAliveInARow()); currentTime += timeout + 1; - data = pd.takeEventsToStream(currentTime, 1000, timeout); + data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNotNull(data); assertEquals(100, data.size()); } @@ -128,9 +128,29 @@ public void eventsShouldBeStreamedOnBatchSize() { for (int i = 0; i < 100; ++i) { pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); } - assertNull(pd.takeEventsToStream(currentTimeMillis(), 1000, timeout)); - final List eventsToStream = pd.takeEventsToStream(currentTimeMillis(), 99, timeout); + assertNull(pd.takeEventsToStream(currentTimeMillis(), 1000, timeout, false)); + final List eventsToStream = pd.takeEventsToStream(currentTimeMillis(), 99, timeout, false); assertNotNull(eventsToStream); assertEquals(99, eventsToStream.size()); } + + @Test + public void eventsShouldBeStreamedOnStreamTimeout() { + final long timeout = TimeUnit.SECONDS.toMillis(100); + final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); + for (int i = 0; i < 10; ++i) { + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i))); + } + assertEquals(10, pd.takeEventsToStream(currentTimeMillis(), 100, timeout, true).size()); + } + + @Test + public void noEmptyBatchShouldBeStreamedOnStreamTimeoutWhenNoEvents() { + final long timeout = TimeUnit.SECONDS.toMillis(100); + final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); + for (int i = 0; i < 10; ++i) { + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i))); + } + assertNull(pd.takeEventsToStream(currentTimeMillis(), 0, timeout, true)); + } }