Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #860 from zalando/ARUHA-1604-flush-on-stream-timeout
Browse files Browse the repository at this point in the history
ARUHA-1604: added flushing collected events when reaching stream timeout
  • Loading branch information
v-stepanov authored Apr 26, 2018
2 parents 46f9610 + 53a25fe commit 87bf767
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 16 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [2.6.4] - 2018-04-25
## [2.6.4] - 2018-04-26

### Added
- Add optional status to the /subscriptions endpoint

### Fixed
- Fixed commit for subscriptions that use direct assignment of partitions
- Fixed OutOfMemoryError when using huge values for batch_limit and max_uncommitted_events

- Added flushing of collected events when reaching stream_timeout in subscription API

## [2.6.3] - 2018-04-10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@ public void whenStreamTimeoutReachedPossibleToCommit() throws Exception {
Assert.assertEquals(SC_NO_CONTENT, statusCode);
}

@Test(timeout = 10000)
public void whenStreamTimeoutReachedThenEventsFlushed() throws Exception {
final TestStreamingClient client = TestStreamingClient
.create(URL, subscription.getId(),
"batch_flush_timeout=600&batch_limit=1000&stream_timeout=2&max_uncommitted_events=1000")
.start();
waitFor(() -> assertThat(client.getSessionId(), not(equalTo(SESSION_ID_UNKNOWN))));

publishEvents(eventType.getName(), 4, x -> "{\"foo\":\"bar\"}");

// when stream_timeout is reached we should get 2 batches:
// first one containing 4 events, second one with debug message
waitFor(() -> assertThat(client.getBatches(), hasSize(2)));
assertThat(client.getBatches().get(0).getEvents(), hasSize(4));
assertThat(client.getBatches().get(1).getEvents(), hasSize(0));
System.out.println(client.getBatches());
}

@Test(timeout = 30000)
public void whenOffsetIsCommittedNextSessionStartsFromNextEventAfterCommitted() throws Exception {
// write 4 events to event-type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ class PartitionData {

@Nullable
List<ConsumedEvent> takeEventsToStream(final long currentTimeMillis, final int batchSize,
final long batchTimeoutMillis) {
final long batchTimeoutMillis, final boolean streamTimeoutReached) {
final boolean countReached = (nakadiEvents.size() >= batchSize) && batchSize > 0;
final boolean timeReached = (currentTimeMillis - lastSendMillis) >= batchTimeoutMillis;
if (countReached || timeReached) {
lastSendMillis = currentTimeMillis;
return extract(batchSize);
} else if (streamTimeoutReached) {
lastSendMillis = currentTimeMillis;
final List<ConsumedEvent> extractedEvents = extract(batchSize);
return extractedEvents.isEmpty() ? null : extractedEvents;
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ public void onEnter() {
scheduleTask(this::checkBatchTimeouts, getParameters().batchTimeoutMillis, TimeUnit.MILLISECONDS);

scheduleTask(() -> {
streamToOutput(true);
final String debugMessage = "Stream timeout reached";
this.sendMetadata(debugMessage);
this.shutdownGracefully(debugMessage);
sendMetadata(debugMessage);
shutdownGracefully(debugMessage);
}, getParameters().streamTimeoutMillis,
TimeUnit.MILLISECONDS);

Expand Down Expand Up @@ -221,6 +222,10 @@ private void checkBatchTimeouts() {
}

private void streamToOutput() {
streamToOutput(false);
}

private void streamToOutput(final boolean streamTimeoutReached) {
final long currentTimeMillis = System.currentTimeMillis();
int messagesAllowedToSend = (int) getMessagesAllowedToSend();
final boolean wasCommitted = isEverythingCommitted();
Expand All @@ -231,7 +236,8 @@ private void streamToOutput() {
while (null != (toSend = e.getValue().takeEventsToStream(
currentTimeMillis,
Math.min(getParameters().batchLimitEvents, messagesAllowedToSend),
getParameters().batchTimeoutMillis))) {
getParameters().batchTimeoutMillis,
streamTimeoutReached))) {
sentSomething |= !toSend.isEmpty();
flushData(e.getKey(), toSend, batchesSent == 0 ? Optional.of("Stream started") : Optional.empty());
this.sentEvents += toSend.size();
Expand Down Expand Up @@ -649,6 +655,7 @@ private void removeFromStreaming(final EventTypePartition key) {
/**
* If stream doesn't have any partitions - start timer that will close this session
* in commitTimeout*2 if it doesn't get any partitions during that time
*
* @param topology the new topology
*/
private void trackIdleness(final ZkSubscriptionClient.Topology topology) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void normalOperationShouldNotReconfigureKafkaConsumer() {
pd.addEvent(new ConsumedEvent(("test_" + i).getBytes(), createCursor(100L + i + 1)));
}
// Now say to it that it was sent
pd.takeEventsToStream(currentTimeMillis(), 1000, 0L);
pd.takeEventsToStream(currentTimeMillis(), 1000, 0L, false);
assertEquals(100L, pd.getUnconfirmed());
for (long i = 0; i < 10; ++i) {
final PartitionData.CommitResult cr = pd.onCommitOffset(createCursor(110L + i * 10L));
Expand All @@ -77,14 +77,14 @@ public void normalOperationShouldNotReconfigureKafkaConsumer() {
public void keepAliveCountShouldIncreaseOnEachEmptyCall() {
final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis());
for (int i = 0; i < 100; ++i) {
pd.takeEventsToStream(currentTimeMillis(), 10, 0L);
pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false);
assertEquals(i + 1, pd.getKeepAliveInARow());
}
pd.addEvent(new ConsumedEvent("".getBytes(), createCursor(101L)));
assertEquals(100, pd.getKeepAliveInARow());
pd.takeEventsToStream(currentTimeMillis(), 10, 0L);
pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false);
assertEquals(0, pd.getKeepAliveInARow());
pd.takeEventsToStream(currentTimeMillis(), 10, 0L);
pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false);
assertEquals(1, pd.getKeepAliveInARow());
}

Expand All @@ -97,26 +97,26 @@ public void eventsShouldBeStreamedOnTimeout() {
for (int i = 0; i < 100; ++i) {
pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1)));
}
List<ConsumedEvent> data = pd.takeEventsToStream(currentTime, 1000, timeout);
List<ConsumedEvent> data = pd.takeEventsToStream(currentTime, 1000, timeout, false);
assertNull(data);
assertEquals(0, pd.getKeepAliveInARow());

currentTime += timeout + 1;

data = pd.takeEventsToStream(currentTime, 1000, timeout);
data = pd.takeEventsToStream(currentTime, 1000, timeout, false);
assertNotNull(data);
assertEquals(100, data.size());

for (int i = 100; i < 200; ++i) {
pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1)));
}
data = pd.takeEventsToStream(currentTime, 1000, timeout);
data = pd.takeEventsToStream(currentTime, 1000, timeout, false);
assertNull(data);
assertEquals(0, pd.getKeepAliveInARow());

currentTime += timeout + 1;

data = pd.takeEventsToStream(currentTime, 1000, timeout);
data = pd.takeEventsToStream(currentTime, 1000, timeout, false);
assertNotNull(data);
assertEquals(100, data.size());
}
Expand All @@ -128,9 +128,29 @@ public void eventsShouldBeStreamedOnBatchSize() {
for (int i = 0; i < 100; ++i) {
pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1)));
}
assertNull(pd.takeEventsToStream(currentTimeMillis(), 1000, timeout));
final List<ConsumedEvent> eventsToStream = pd.takeEventsToStream(currentTimeMillis(), 99, timeout);
assertNull(pd.takeEventsToStream(currentTimeMillis(), 1000, timeout, false));
final List<ConsumedEvent> eventsToStream = pd.takeEventsToStream(currentTimeMillis(), 99, timeout, false);
assertNotNull(eventsToStream);
assertEquals(99, eventsToStream.size());
}

@Test
public void eventsShouldBeStreamedOnStreamTimeout() {
final long timeout = TimeUnit.SECONDS.toMillis(100);
final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis());
for (int i = 0; i < 10; ++i) {
pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i)));
}
assertEquals(10, pd.takeEventsToStream(currentTimeMillis(), 100, timeout, true).size());
}

@Test
public void noEmptyBatchShouldBeStreamedOnStreamTimeoutWhenNoEvents() {
final long timeout = TimeUnit.SECONDS.toMillis(100);
final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis());
for (int i = 0; i < 10; ++i) {
pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i)));
}
assertNull(pd.takeEventsToStream(currentTimeMillis(), 0, timeout, true));
}
}

0 comments on commit 87bf767

Please sign in to comment.