diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 70b3d570d9..fc1eac7719 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -13,7 +13,6 @@ import org.zalando.nakadi.service.subscription.zk.ZKSubscription; import org.zalando.nakadi.view.SubscriptionCursor; -import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -31,13 +30,13 @@ class StreamingState extends State { - private ZKSubscription topologyChangeSubscription; - private Consumer kafkaConsumer; private final Map offsets = new HashMap<>(); // Maps partition barrier when releasing must be completed or stream will be closed. // The reasons for that if there are two partitions (p0, p1) and p0 is reassigned, if p1 is working // correctly, and p0 is not receiving any updates - reassignment won't complete. private final Map releasingPartitions = new HashMap<>(); + private ZKSubscription topologyChangeSubscription; + private Consumer kafkaConsumer; private boolean pollPaused; private long lastCommitMillis; private long committedEvents; diff --git a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java index 6d0fc4c776..310f14f633 100644 --- a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java @@ -26,14 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.zalando.nakadi.domain.ConsumedEvent; -import org.zalando.nakadi.domain.NakadiCursor; -import org.zalando.nakadi.exceptions.NakadiException; -import org.zalando.nakadi.repository.kafka.NakadiKafkaConsumer; -import org.zalando.nakadi.util.FeatureToggleService; + import static java.util.Collections.nCopies; import static java.util.Optional.empty; import static org.hamcrest.MatcherAssert.assertThat; @@ -64,6 +57,66 @@ public static void createCursorConverter() { cursorConverter = new CursorConverter(featureToggleService); } + private static NakadiKafkaConsumer emptyConsumer() throws NakadiException { + final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); + when(nakadiKafkaConsumer.readEvent()).thenReturn(empty()); + return nakadiKafkaConsumer; + } + + private static NakadiKafkaConsumer endlessDummyConsumerForPartition(final String partition) throws NakadiException { + final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); + when(nakadiKafkaConsumer.readEvent()) + .thenReturn(Optional.of(new ConsumedEvent(DUMMY, new NakadiCursor(TOPIC, partition, "0")))); + return nakadiKafkaConsumer; + } + + private static NakadiKafkaConsumer nCountDummyConsumerForPartition(final int eventNum, final String partition) + throws NakadiException { + final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); + final AtomicInteger eventsToCreate = new AtomicInteger(eventNum); + when(nakadiKafkaConsumer.readEvent()).thenAnswer(invocation -> { + if (eventsToCreate.get() > 0) { + eventsToCreate.set(eventsToCreate.get() - 1); + return Optional.of(new ConsumedEvent(DUMMY, new NakadiCursor(TOPIC, partition, "0"))); + } else { + return empty(); + } + }); + return nakadiKafkaConsumer; + } + + private static NakadiKafkaConsumer predefinedConsumer(final Queue events) + throws NakadiException { + final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); + when(nakadiKafkaConsumer.readEvent()).thenAnswer(invocation -> Optional.ofNullable(events.poll())); + return nakadiKafkaConsumer; + } + + private static NakadiKafkaConsumer endlessDummyConsumer() throws NakadiException { + return endlessDummyConsumerForPartition("0"); + } + + private static String jsonBatch(final String partition, final String offset, + final Optional> eventsOrNone) { + return jsonBatch(partition, offset, eventsOrNone, Optional.empty()); + } + + private static String jsonBatch(final String partition, final String offset, + final Optional> eventsOrNone, final Optional metadata) { + final String eventsStr = eventsOrNone + .map(events -> { + final StringBuilder builder = new StringBuilder(",\"events\":["); + events.forEach(event -> builder.append("\"").append(event).append("\",")); + builder.deleteCharAt(builder.length() - 1).append("]"); + return builder.toString(); + }) + .orElse(""); + final String metadataStr = metadata.map(m -> ",\"metadata\":{\"debug\":\"" + m + "\"}").orElse(""); + + return String.format("{\"cursor\":{\"partition\":\"%s\",\"offset\":\"%s\"}%s%s}", partition, offset, eventsStr, + metadataStr); + } + @Test(timeout = 15000) public void whenIOExceptionThenStreamIsClosed() throws NakadiException, InterruptedException, IOException { final EventStreamConfig config = EventStreamConfig @@ -109,7 +162,7 @@ public void whenCrutchWorkedThenStreamIsClosed() throws NakadiException, Interru thread.start(); Thread.sleep(TimeUnit.SECONDS.toMillis(1)); - waitFor(()-> Assert.assertTrue(thread.isAlive())); + waitFor(() -> Assert.assertTrue(thread.isAlive())); // simulation of client closing the connection using crutch streamOpen.set(false); @@ -290,64 +343,4 @@ public void whenReadFromMultiplePartitionsThenGroupedInBatchesAccordingToPartiti assertThat(batches[2], sameJSONAs(jsonBatch("2", "000000000000000000", Optional.of(nCopies(2, DUMMY))))); } - private static NakadiKafkaConsumer emptyConsumer() throws NakadiException { - final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); - when(nakadiKafkaConsumer.readEvent()).thenReturn(empty()); - return nakadiKafkaConsumer; - } - - private static NakadiKafkaConsumer endlessDummyConsumerForPartition(final String partition) throws NakadiException { - final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); - when(nakadiKafkaConsumer.readEvent()) - .thenReturn(Optional.of(new ConsumedEvent(DUMMY, new NakadiCursor(TOPIC, partition, "0")))); - return nakadiKafkaConsumer; - } - - private static NakadiKafkaConsumer nCountDummyConsumerForPartition(final int eventNum, final String partition) - throws NakadiException { - final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); - final AtomicInteger eventsToCreate = new AtomicInteger(eventNum); - when(nakadiKafkaConsumer.readEvent()).thenAnswer(invocation -> { - if (eventsToCreate.get() > 0) { - eventsToCreate.set(eventsToCreate.get() - 1); - return Optional.of(new ConsumedEvent(DUMMY, new NakadiCursor(TOPIC, partition, "0"))); - } else { - return empty(); - } - }); - return nakadiKafkaConsumer; - } - - private static NakadiKafkaConsumer predefinedConsumer(final Queue events) - throws NakadiException { - final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); - when(nakadiKafkaConsumer.readEvent()).thenAnswer(invocation -> Optional.ofNullable(events.poll())); - return nakadiKafkaConsumer; - } - - private static NakadiKafkaConsumer endlessDummyConsumer() throws NakadiException { - return endlessDummyConsumerForPartition("0"); - } - - private static String jsonBatch(final String partition, final String offset, - final Optional> eventsOrNone) { - return jsonBatch(partition, offset, eventsOrNone, Optional.empty()); - } - - private static String jsonBatch(final String partition, final String offset, - final Optional> eventsOrNone, final Optional metadata) { - final String eventsStr = eventsOrNone - .map(events -> { - final StringBuilder builder = new StringBuilder(",\"events\":["); - events.forEach(event -> builder.append("\"").append(event).append("\",")); - builder.deleteCharAt(builder.length() - 1).append("]"); - return builder.toString(); - }) - .orElse(""); - final String metadataStr = metadata.map(m -> ",\"metadata\":{\"debug\":\"" + m + "\"}").orElse(""); - - return String.format("{\"cursor\":{\"partition\":\"%s\",\"offset\":\"%s\"}%s%s}", partition, offset, eventsStr, - metadataStr); - } - }