Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
aruha-592: fixed checkstyle after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
adyach committed Mar 6, 2017
1 parent 3070935 commit 64be482
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.zalando.nakadi.service.subscription.zk.ZKSubscription;
import org.zalando.nakadi.view.SubscriptionCursor;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -31,13 +30,13 @@


class StreamingState extends State {
private ZKSubscription topologyChangeSubscription;
private Consumer<String, String> kafkaConsumer;
private final Map<Partition.PartitionKey, PartitionData> offsets = new HashMap<>();
// Maps partition barrier when releasing must be completed or stream will be closed.
// The reasons for that if there are two partitions (p0, p1) and p0 is reassigned, if p1 is working
// correctly, and p0 is not receiving any updates - reassignment won't complete.
private final Map<Partition.PartitionKey, Long> releasingPartitions = new HashMap<>();
private ZKSubscription topologyChangeSubscription;
private Consumer<String, String> kafkaConsumer;
private boolean pollPaused;
private long lastCommitMillis;
private long committedEvents;
Expand Down
131 changes: 62 additions & 69 deletions src/test/java/org/zalando/nakadi/service/EventStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.zalando.nakadi.domain.ConsumedEvent;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.repository.kafka.NakadiKafkaConsumer;
import org.zalando.nakadi.util.FeatureToggleService;

import static java.util.Collections.nCopies;
import static java.util.Optional.empty;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -64,6 +57,66 @@ public static void createCursorConverter() {
cursorConverter = new CursorConverter(featureToggleService);
}

private static NakadiKafkaConsumer emptyConsumer() throws NakadiException {
final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class);
when(nakadiKafkaConsumer.readEvent()).thenReturn(empty());
return nakadiKafkaConsumer;
}

private static NakadiKafkaConsumer endlessDummyConsumerForPartition(final String partition) throws NakadiException {
final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class);
when(nakadiKafkaConsumer.readEvent())
.thenReturn(Optional.of(new ConsumedEvent(DUMMY, new NakadiCursor(TOPIC, partition, "0"))));
return nakadiKafkaConsumer;
}

private static NakadiKafkaConsumer nCountDummyConsumerForPartition(final int eventNum, final String partition)
throws NakadiException {
final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class);
final AtomicInteger eventsToCreate = new AtomicInteger(eventNum);
when(nakadiKafkaConsumer.readEvent()).thenAnswer(invocation -> {
if (eventsToCreate.get() > 0) {
eventsToCreate.set(eventsToCreate.get() - 1);
return Optional.of(new ConsumedEvent(DUMMY, new NakadiCursor(TOPIC, partition, "0")));
} else {
return empty();
}
});
return nakadiKafkaConsumer;
}

private static NakadiKafkaConsumer predefinedConsumer(final Queue<ConsumedEvent> events)
throws NakadiException {
final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class);
when(nakadiKafkaConsumer.readEvent()).thenAnswer(invocation -> Optional.ofNullable(events.poll()));
return nakadiKafkaConsumer;
}

private static NakadiKafkaConsumer endlessDummyConsumer() throws NakadiException {
return endlessDummyConsumerForPartition("0");
}

private static String jsonBatch(final String partition, final String offset,
final Optional<List<String>> eventsOrNone) {
return jsonBatch(partition, offset, eventsOrNone, Optional.empty());
}

private static String jsonBatch(final String partition, final String offset,
final Optional<List<String>> eventsOrNone, final Optional<String> metadata) {
final String eventsStr = eventsOrNone
.map(events -> {
final StringBuilder builder = new StringBuilder(",\"events\":[");
events.forEach(event -> builder.append("\"").append(event).append("\","));
builder.deleteCharAt(builder.length() - 1).append("]");
return builder.toString();
})
.orElse("");
final String metadataStr = metadata.map(m -> ",\"metadata\":{\"debug\":\"" + m + "\"}").orElse("");

return String.format("{\"cursor\":{\"partition\":\"%s\",\"offset\":\"%s\"}%s%s}", partition, offset, eventsStr,
metadataStr);
}

@Test(timeout = 15000)
public void whenIOExceptionThenStreamIsClosed() throws NakadiException, InterruptedException, IOException {
final EventStreamConfig config = EventStreamConfig
Expand Down Expand Up @@ -109,7 +162,7 @@ public void whenCrutchWorkedThenStreamIsClosed() throws NakadiException, Interru
thread.start();

Thread.sleep(TimeUnit.SECONDS.toMillis(1));
waitFor(()-> Assert.assertTrue(thread.isAlive()));
waitFor(() -> Assert.assertTrue(thread.isAlive()));

// simulation of client closing the connection using crutch
streamOpen.set(false);
Expand Down Expand Up @@ -290,64 +343,4 @@ public void whenReadFromMultiplePartitionsThenGroupedInBatchesAccordingToPartiti
assertThat(batches[2], sameJSONAs(jsonBatch("2", "000000000000000000", Optional.of(nCopies(2, DUMMY)))));
}

private static NakadiKafkaConsumer emptyConsumer() throws NakadiException {
final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class);
when(nakadiKafkaConsumer.readEvent()).thenReturn(empty());
return nakadiKafkaConsumer;
}

private static NakadiKafkaConsumer endlessDummyConsumerForPartition(final String partition) throws NakadiException {
final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class);
when(nakadiKafkaConsumer.readEvent())
.thenReturn(Optional.of(new ConsumedEvent(DUMMY, new NakadiCursor(TOPIC, partition, "0"))));
return nakadiKafkaConsumer;
}

private static NakadiKafkaConsumer nCountDummyConsumerForPartition(final int eventNum, final String partition)
throws NakadiException {
final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class);
final AtomicInteger eventsToCreate = new AtomicInteger(eventNum);
when(nakadiKafkaConsumer.readEvent()).thenAnswer(invocation -> {
if (eventsToCreate.get() > 0) {
eventsToCreate.set(eventsToCreate.get() - 1);
return Optional.of(new ConsumedEvent(DUMMY, new NakadiCursor(TOPIC, partition, "0")));
} else {
return empty();
}
});
return nakadiKafkaConsumer;
}

private static NakadiKafkaConsumer predefinedConsumer(final Queue<ConsumedEvent> events)
throws NakadiException {
final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class);
when(nakadiKafkaConsumer.readEvent()).thenAnswer(invocation -> Optional.ofNullable(events.poll()));
return nakadiKafkaConsumer;
}

private static NakadiKafkaConsumer endlessDummyConsumer() throws NakadiException {
return endlessDummyConsumerForPartition("0");
}

private static String jsonBatch(final String partition, final String offset,
final Optional<List<String>> eventsOrNone) {
return jsonBatch(partition, offset, eventsOrNone, Optional.empty());
}

private static String jsonBatch(final String partition, final String offset,
final Optional<List<String>> eventsOrNone, final Optional<String> metadata) {
final String eventsStr = eventsOrNone
.map(events -> {
final StringBuilder builder = new StringBuilder(",\"events\":[");
events.forEach(event -> builder.append("\"").append(event).append("\","));
builder.deleteCharAt(builder.length() - 1).append("]");
return builder.toString();
})
.orElse("");
final String metadataStr = metadata.map(m -> ",\"metadata\":{\"debug\":\"" + m + "\"}").orElse("");

return String.format("{\"cursor\":{\"partition\":\"%s\",\"offset\":\"%s\"}%s%s}", partition, offset, eventsStr,
metadataStr);
}

}

0 comments on commit 64be482

Please sign in to comment.