diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java index 956a2568db..39f0fa0734 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java @@ -272,7 +272,8 @@ private String subscriptionPath() { private void setPartitions(final Partition[] partitions) throws Exception { final String topologyPath = subscriptionPath() + "/topology"; - final byte[] topologyData = MAPPER.writeValueAsBytes(new NewZkSubscriptionClient.Topology(partitions, 0)); + final byte[] topologyData = MAPPER.writeValueAsBytes( + new NewZkSubscriptionClient.Topology(partitions, null, 0)); if (null == CURATOR.checkExists().forPath(topologyPath)) { CURATOR.create().forPath(topologyPath, topologyData); } else { diff --git a/src/main/java/org/zalando/nakadi/service/CursorConverter.java b/src/main/java/org/zalando/nakadi/service/CursorConverter.java index 86be3d8508..6388963c13 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorConverter.java +++ b/src/main/java/org/zalando/nakadi/service/CursorConverter.java @@ -10,6 +10,7 @@ import org.zalando.nakadi.view.SubscriptionCursor; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; +import java.util.Collection; import java.util.List; /** @@ -31,7 +32,7 @@ NakadiCursor convert(String eventTypeName, Cursor cursor) throws NakadiCursor convert(SubscriptionCursorWithoutToken cursor) throws InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException, InvalidCursorException; - List convert(List cursor) throws + List convert(Collection cursor) throws InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException, InvalidCursorException; diff --git a/src/main/java/org/zalando/nakadi/service/CursorsService.java b/src/main/java/org/zalando/nakadi/service/CursorsService.java index 96959abf1a..3d3a84d94b 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -18,6 +18,7 @@ import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.UnableProcessException; import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; +import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.repository.db.EventTypeCache; @@ -107,7 +108,8 @@ private void validateStreamId(final List cursors, final String str throw new InvalidStreamIdException("Session with stream id " + streamId + " not found", streamId); } - final Map partitionSessions = Stream.of(subscriptionClient.listPartitions()) + final Map partitionSessions = Stream + .of(subscriptionClient.getTopology().getPartitions()) .collect(Collectors.toMap(Partition::getKey, Partition::getSession)); for (final NakadiCursor cursor : cursors) { final EventTypePartition etPartition = cursor.getEventTypePartition(); @@ -124,7 +126,7 @@ private void validateStreamId(final List cursors, final String str } public List getSubscriptionCursors(final String subscriptionId) - throws NakadiException { + throws NakadiException, ServiceTemporarilyUnavailableException { final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId); final ZkSubscriptionClient zkSubscriptionClient = zkSubscriptionFactory.createClient( subscription, "subscription." + subscriptionId + ".get_cursors"); @@ -132,12 +134,15 @@ public List getSubscriptionCursors(final String Partition[] partitions; try { - partitions = zkSubscriptionClient.listPartitions(); + partitions = zkSubscriptionClient.getTopology().getPartitions(); } catch (final SubscriptionNotInitializedException ex) { partitions = new Partition[]{}; } + final Map positions = zkSubscriptionClient.getOffsets( + Stream.of(partitions).map(Partition::getKey).collect(Collectors.toList())); + for (final Partition p : partitions) { - cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey())); + cursorsListBuilder.add(positions.get(p.getKey())); } return cursorsListBuilder.build(); } diff --git a/src/main/java/org/zalando/nakadi/service/converter/CursorConverterImpl.java b/src/main/java/org/zalando/nakadi/service/converter/CursorConverterImpl.java index db16335fbb..d2d3be8116 100644 --- a/src/main/java/org/zalando/nakadi/service/converter/CursorConverterImpl.java +++ b/src/main/java/org/zalando/nakadi/service/converter/CursorConverterImpl.java @@ -15,6 +15,7 @@ import org.zalando.nakadi.view.SubscriptionCursor; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; +import java.util.Collection; import java.util.EnumMap; import java.util.LinkedHashMap; import java.util.List; @@ -48,7 +49,7 @@ public NakadiCursor convert(final SubscriptionCursorWithoutToken cursor) } @Override - public List convert(final List cursors) + public List convert(final Collection cursors) throws InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException, InvalidCursorException { final LinkedHashMap> orderingMap = @@ -57,7 +58,7 @@ public List convert(final List cur final Map> mappedByVersions = cursors.stream() .collect(Collectors.groupingBy(c -> guessVersion(c.getOffset()))); - for (final Map.Entry> entry: mappedByVersions.entrySet()) { + for (final Map.Entry> entry : mappedByVersions.entrySet()) { final List result = converters.get(entry.getKey()).convertBatched(entry.getValue()); IntStream.range(0, entry.getValue().size()) .forEach(idx -> orderingMap.get(entry.getValue().get(idx)).set(result.get(idx))); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancer.java b/src/main/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancer.java index 0227a83b73..baee1172f0 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancer.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancer.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -13,10 +14,10 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -class ExactWeightRebalancer implements BiFunction { +class ExactWeightRebalancer implements BiFunction, Partition[], Partition[]> { @Override - public Partition[] apply(final Session[] sessions, final Partition[] currentPartitions) { - final Map activeSessionWeights = Stream.of(sessions) + public Partition[] apply(final Collection sessions, final Partition[] currentPartitions) { + final Map activeSessionWeights = sessions.stream() .collect(Collectors.toMap(Session::getId, Session::getWeight)); // sorted session ids. final List activeSessionIds = activeSessionWeights.keySet().stream().sorted() @@ -45,7 +46,7 @@ public Partition[] apply(final Session[] sessions, final Partition[] currentPart final List candidates = partitions.get(sessionId); final Partition toTakeItem = candidates.stream() .filter(p -> p.getState() == Partition.State.REASSIGNING).findAny().orElse( - candidates.get(candidates.size() - 1)); + candidates.get(candidates.size() - 1)); candidates.remove(toTakeItem); toRebalance.add(toTakeItem); toTake -= 1; diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 298273c90a..1acc0e1025 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -29,6 +29,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -37,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; +import java.util.stream.Collectors; public class StreamingContext implements SubscriptionStreamer { @@ -54,7 +56,7 @@ public class StreamingContext implements SubscriptionStreamer { private final BlacklistService blacklistService; private final ScheduledExecutorService timer; private final BlockingQueue taskQueue = new LinkedBlockingQueue<>(); - private final BiFunction rebalancer; + private final BiFunction, Partition[], Partition[]> rebalancer; private final String loggingPath; private final CursorConverter cursorConverter; private final Subscription subscription; @@ -212,7 +214,7 @@ public void switchState(final State newState) { }); } - public void registerSession() throws Exception { + public void registerSession() throws NakadiRuntimeException { log.info("Registering session {}", session); // Install rebalance hook on client list change. sessionListSubscription = zkClient.subscribeForSessionListChanges(() -> addTask(this::rebalance)); @@ -264,11 +266,24 @@ public ObjectMapper getObjectMapper() { private void rebalance() { if (null != sessionListSubscription) { // This call is needed to renew subscription for session list changes. - sessionListSubscription.getData(); + final List newSessions = sessionListSubscription.getData(); + final String sessionsHash = ZkSubscriptionClient.Topology.calculateSessionsHash(newSessions); zkClient.runLocked(() -> { - final Partition[] changeset = rebalancer.apply(zkClient.listSessions(), zkClient.listPartitions()); - if (changeset.length > 0) { - zkClient.updatePartitionsConfiguration(changeset); + final ZkSubscriptionClient.Topology topology = zkClient.getTopology(); + + if (!topology.isSameHash(sessionsHash)) { + log.info("Performing rebalance, hash changed: {}", sessionsHash); + final Collection newSessionsUnderLock = zkClient.listSessions(); + + // after taking the lock list of sessions may change, so we need to update hash to correct value. + final Partition[] changeset = rebalancer.apply(newSessionsUnderLock, topology.getPartitions()); + if (changeset.length > 0) { + final String actualHash = ZkSubscriptionClient.Topology.calculateSessionsHash( + newSessionsUnderLock.stream().map(Session::getId).collect(Collectors.toList())); + zkClient.updatePartitionsConfiguration(actualHash, changeset); + } + } else { + log.info("Skipping rebalance, because hash is the same: {}", sessionsHash); } }); } @@ -314,7 +329,7 @@ public static final class Builder { private Session session; private ScheduledExecutorService timer; private ZkSubscriptionClient zkClient; - private BiFunction rebalancer; + private BiFunction, Partition[], Partition[]> rebalancer; private long kafkaPollTimeout; private String loggingPath; private AtomicBoolean connectionReady; @@ -369,7 +384,7 @@ public Builder setZkClient(final ZkSubscriptionClient zkClient) { return this; } - public Builder setRebalancer(final BiFunction rebalancer) { + public Builder setRebalancer(final BiFunction, Partition[], Partition[]> rebalancer) { this.rebalancer = rebalancer; return this; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 2c2461aa05..2765b64fc1 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -10,6 +10,7 @@ import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; import org.zalando.nakadi.domain.EventType; +import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PaginationLinks; @@ -43,6 +44,7 @@ import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.Result; +import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode; @@ -55,13 +57,13 @@ import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; @Component public class SubscriptionService { @@ -242,7 +244,7 @@ private List createSubscriptionStat(final Subscripti throw new ServiceTemporarilyUnavailableException(e); } - final ZkSubscriptionNode zkSubscriptionNode = subscriptionClient.getZkSubscriptionNodeLocked(); + final Optional zkSubscriptionNode = subscriptionClient.getZkSubscriptionNodeLocked(); return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient, topicPartitions); } @@ -265,18 +267,15 @@ private List loadPartitionEndStatistics(final Collection private List loadStats( final Collection eventTypes, - final ZkSubscriptionNode subscriptionNode, + final Optional subscriptionNode, final ZkSubscriptionClient client, final List stats) throws ServiceTemporarilyUnavailableException, InconsistentStateException { final List result = new ArrayList<>(eventTypes.size()); - final List committedPositions; - try { - committedPositions = loadCommittedPositions(subscriptionNode, client); - } catch (final InternalNakadiException | NoSuchEventTypeException | InvalidCursorException | - ServiceUnavailableException e) { - throw new ServiceTemporarilyUnavailableException(e); - } + + final Collection committedPositions = subscriptionNode + .map(node -> loadCommittedPositions(node.getPartitions(), client)) + .orElse(Collections.emptyList()); for (final EventType eventType : eventTypes) { final List resultPartitions = new ArrayList<>(stats.size()); @@ -298,13 +297,16 @@ private List loadStats( }) .orElse(null); - final String state = subscriptionNode.guessState(stat.getTimeline().getEventType(), stat.getPartition()) - .getDescription(); - final String streamId = Optional.ofNullable(subscriptionNode.guessStream( - stat.getTimeline().getEventType(), stat.getPartition())).orElse(""); + final Partition.State state = subscriptionNode + .map(node -> node.guessState(stat.getTimeline().getEventType(), stat.getPartition())) + .orElse(Partition.State.UNASSIGNED); + + final String streamId = subscriptionNode + .map(node -> node.guessStream(stat.getTimeline().getEventType(), stat.getPartition())) + .orElse(""); resultPartitions.add(new SubscriptionEventTypeStats.Partition( - lastPosition.getPartition(), state, distance, streamId)); + lastPosition.getPartition(), state.getDescription(), distance, streamId)); } resultPartitions.sort(Comparator.comparing(SubscriptionEventTypeStats.Partition::getPartition)); result.add(new SubscriptionEventTypeStats(eventType.getName(), resultPartitions)); @@ -312,14 +314,19 @@ private List loadStats( return result; } - private List loadCommittedPositions( - final ZkSubscriptionNode subscriptionNode, - final ZkSubscriptionClient client) throws InternalNakadiException, InvalidCursorException, - NoSuchEventTypeException, ServiceUnavailableException { - final List views = Stream.of(subscriptionNode.getPartitions()).map( - partition -> client.getOffset(partition.getKey())) - .collect(Collectors.toList()); - return converter.convert(views); + private Collection loadCommittedPositions( + final Collection partitions, final ZkSubscriptionClient client) + throws ServiceTemporarilyUnavailableException { + try { + + final Map committed = client.getOffsets( + partitions.stream().map(Partition::getKey).collect(Collectors.toList())); + + return converter.convert(committed.values()); + } catch (final InternalNakadiException | NoSuchEventTypeException | InvalidCursorException | + ServiceUnavailableException e) { + throw new ServiceTemporarilyUnavailableException(e); + } } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java b/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java index 40312befba..c9852a8a7a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java @@ -6,6 +6,7 @@ import javax.annotation.Nullable; import java.util.Collection; +import java.util.Objects; public class Partition { public enum State { @@ -135,4 +136,26 @@ public String getSessionOrNextSession() { public String toString() { return eventType + ":" + partition + "->" + state + ":" + session + "->" + nextSession; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Partition partition1 = (Partition) o; + return Objects.equals(eventType, partition1.eventType) && + Objects.equals(partition, partition1.partition) && + Objects.equals(session, partition1.session) && + Objects.equals(nextSession, partition1.nextSession) && + state == partition1.state; + } + + @Override + public int hashCode() { + + return Objects.hash(eventType, partition); + } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java index cc4714c792..5ae73a3b7b 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java @@ -12,7 +12,6 @@ import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.subscription.model.Partition; -import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -50,7 +49,7 @@ protected Response.StatusType getStatus() { * 1. Checks, that subscription node is present in zk. If not - creates it. *

* 2. If cursor reset is in progress it will switch to cleanup state. - *

+ *

У * 3. Registers session. *

* 4. Switches to streaming state. @@ -59,9 +58,8 @@ private void createSubscriptionLocked() { final boolean subscriptionJustInitialized = initializeSubscriptionLocked(getZk(), getContext().getSubscription(), getContext().getTimelineService(), getContext().getCursorConverter()); if (!subscriptionJustInitialized) { - final Session[] sessions = getZk().listSessions(); - final Partition[] partitions = getZk().listPartitions(); - if (sessions.length >= partitions.length) { + final Partition[] partitions = getZk().getTopology().getPartitions(); + if (getZk().listSessions().size() >= partitions.length) { switchState(new CleanupState(new NoStreamingSlotsAvailable(partitions.length))); return; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index ea444cb567..0db8fafe2b 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -379,7 +379,7 @@ void reactOnTopologyChange() { void recheckTopology() { // Sometimes topology is not refreshed. One need to explicitly check that topology is still valid. - final Partition[] partitions = Stream.of(getZk().listPartitions()) + final Partition[] partitions = Stream.of(getZk().getTopology().getPartitions()) .filter(p -> getSessionId().equalsIgnoreCase(p.getSession())) .toArray(Partition[]::new); if (refreshTopologyUnlocked(partitions)) { @@ -433,7 +433,9 @@ boolean refreshTopologyUnlocked(final Partition[] assignedPartitions) { .collect(Collectors.toList()); if (!newAssignedPartitions.isEmpty()) { modified = true; - newAssignedPartitions.forEach(this::addToStreaming); + final Map committed = getZk().getOffsets( + newAssignedPartitions.stream().map(Partition::getKey).collect(Collectors.toList())); + newAssignedPartitions.forEach(p -> this.addToStreaming(p, committed)); } // 5. Check if something can be released right now if (modified) { @@ -564,8 +566,9 @@ private NakadiCursor createNakadiCursor(final SubscriptionCursorWithoutToken cur } } - private void addToStreaming(final Partition partition) { - final NakadiCursor cursor = createNakadiCursor(getZk().getOffset(partition.getKey())); + private void addToStreaming(final Partition partition, + final Map cursorMap) { + final NakadiCursor cursor = createNakadiCursor(cursorMap.get(partition.getKey())); getLog().info("Adding to streaming {} with start position {}", partition.getKey(), cursor); final ZkSubscription subscription = getZk().subscribeForOffsetChanges( partition.getKey(), diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 9544f1e8c8..75fd4ab359 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -19,13 +19,14 @@ import org.zalando.nakadi.exceptions.runtime.OperationInterruptedException; import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; import org.zalando.nakadi.exceptions.runtime.RequestInProgressException; +import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -34,8 +35,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import static com.google.common.base.Charsets.UTF_8; @@ -46,6 +51,7 @@ public abstract class AbstractZkSubscriptionClient implements ZkSubscriptionClie private static final int COMMIT_CONFLICT_RETRY_TIMES = 5; protected static final String NODE_TOPOLOGY = "/topology"; public static final int SECONDS_TO_WAIT_FOR_LOCK = 15; + private static final int MAX_ZK_RESPONSE_SECONDS = 5; private final String subscriptionId; private final CuratorFramework curatorFramework; @@ -80,7 +86,7 @@ protected Logger getLog() { } @Override - public final void runLocked(final Runnable function) { + public final T runLocked(final Callable function) { try { Exception releaseException = null; if (null == lock) { @@ -92,8 +98,9 @@ public final void runLocked(final Runnable function) { throw new ServiceUnavailableException("Failed to acquire subscription lock within " + SECONDS_TO_WAIT_FOR_LOCK + " seconds"); } + final T result; try { - function.run(); + result = function.call(); } finally { try { lock.release(); @@ -105,6 +112,7 @@ public final void runLocked(final Runnable function) { if (releaseException != null) { throw releaseException; } + return result; } catch (final NakadiRuntimeException | MyNakadiRuntimeException1 e) { throw e; } catch (final Exception e) { @@ -129,10 +137,7 @@ public final boolean isSubscriptionCreatedAndInitialized() throws NakadiRuntimeE // First step - check that state node was already written try { final String state = new String(getCurator().getData().forPath(getSubscriptionPath("/state")), UTF_8); - if (!state.equals(STATE_INITIALIZED)) { - return false; - } - return true; + return state.equals(STATE_INITIALIZED); } catch (final KeeperException.NoNodeException ex) { return false; } catch (final Exception e) { @@ -186,10 +191,60 @@ public final void unregisterSession(final Session session) { } } + protected Map loadDataAsync(final Collection keys, + final Function keyConverter, + final BiFunction valueConverter) + throws ServiceTemporarilyUnavailableException, NakadiRuntimeException { + final Map result = new HashMap<>(); + final CountDownLatch latch = new CountDownLatch(keys.size()); + try { + for (final K key : keys) { + final String zkKey = keyConverter.apply(key); + getCurator().getData().inBackground((client, event) -> { + try { + if (event.getResultCode() == KeeperException.Code.OK.intValue()) { + final V value = valueConverter.apply(key, event.getData()); + synchronized (result) { + result.put(key, value); + } + } else { + getLog().error( + "Failed to get {} data from zk. status code: {}", + zkKey, event.getResultCode()); + } + } catch (RuntimeException ex) { + getLog().error("Failed to memorize {} key value", key, ex); + } finally { + latch.countDown(); + } + }).forPath(zkKey); + } + } catch (Exception ex) { + throw new NakadiRuntimeException(ex); + } + try { + if (!latch.await(MAX_ZK_RESPONSE_SECONDS, TimeUnit.SECONDS)) { + throw new ServiceTemporarilyUnavailableException("Failed to wait for zk response", null); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new ServiceTemporarilyUnavailableException("Failed to wait for zk response", ex); + } + if (result.size() != keys.size()) { + throw new ServiceTemporarilyUnavailableException("Failed to wait for keys " + + keys.stream() + .filter(v -> !result.containsKey(v)) + .map(String::valueOf) + .collect(Collectors.joining(", ")) + + " to be in response", null); + } + return result; + } + @Override - public final Session[] listSessions() { + public final Collection listSessions() + throws SubscriptionNotInitializedException, NakadiRuntimeException, ServiceTemporarilyUnavailableException { getLog().info("fetching sessions information"); - final List sessions = new ArrayList<>(); final List zkSessions; try { zkSessions = getCurator().getChildren().forPath(getSubscriptionPath("/sessions")); @@ -199,16 +254,10 @@ public final Session[] listSessions() { throw new NakadiRuntimeException(ex); } - try { - for (final String sessionId : zkSessions) { - final int weight = Integer.parseInt(new String(getCurator().getData() - .forPath(getSubscriptionPath("/sessions/" + sessionId)), UTF_8)); - sessions.add(new Session(sessionId, weight)); - } - return sessions.toArray(new Session[sessions.size()]); - } catch (final Exception e) { - throw new NakadiRuntimeException(e); - } + return loadDataAsync( + zkSessions, + key -> getSubscriptionPath("/sessions/" + key), + (key, data) -> new Session(key, Integer.parseInt(new String(data, UTF_8)))).values(); } @Override @@ -324,35 +373,23 @@ public final void resetCursors(final List cursor } @Override - public final ZkSubscription> subscribeForSessionListChanges(final Runnable listener) throws Exception { + public final ZkSubscription> subscribeForSessionListChanges(final Runnable listener) + throws NakadiRuntimeException { getLog().info("subscribeForSessionListChanges: " + listener.hashCode()); return new ZkSubscriptionImpl.ZkSubscriptionChildrenImpl( getCurator(), listener, getSubscriptionPath("/sessions")); } @Override - public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException { - final ZkSubscriptionNode subscriptionNode = new ZkSubscriptionNode(); - try { - if (null == getCurator().checkExists().forPath(getSubscriptionPath(""))) { - return subscriptionNode; - } - } catch (final Exception e) { - // Zk communication failure - throw new NakadiRuntimeException(e); - } - - try { - runLocked(() -> { - subscriptionNode.setPartitions(listPartitions()); - subscriptionNode.setSessions(listSessions()); - }); - } catch (final NakadiRuntimeException ex) { - // this line intentionally left to have the same behavior as it was before - getLog().info("No data about provided subscription {} in ZK", getSubscriptionPath("")); + public final Optional getZkSubscriptionNodeLocked() + throws SubscriptionNotInitializedException, NakadiRuntimeException { + if (!isSubscriptionCreatedAndInitialized()) { + return Optional.empty(); } - return subscriptionNode; + return Optional.of(runLocked(() -> new ZkSubscriptionNode( + Arrays.asList(getTopology().getPartitions()), + listSessions()))); } private void forceCommitOffsets(final List cursors) throws Exception { diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index 9f52041756..f646a6702f 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -5,6 +5,7 @@ import org.apache.zookeeper.KeeperException; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.exceptions.NakadiRuntimeException; +import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -12,6 +13,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.stream.Stream; import static com.google.common.base.Charsets.UTF_8; @@ -81,15 +83,16 @@ protected byte[] createTopologyAndOffsets(final Collection subscribeForTopologyChanges(final Runnable } - @Override - public Partition[] listPartitions() throws NakadiRuntimeException, SubscriptionNotInitializedException { - return readTopology().getPartitions(); - } - protected String getOffsetPath(final EventTypePartition etp) { return getSubscriptionPath("/offsets/" + etp.getEventType() + "/" + etp.getPartition()); } @Override - public SubscriptionCursorWithoutToken getOffset(final EventTypePartition key) throws NakadiRuntimeException { - try { - final String offset = new String(getCurator().getData().forPath(getOffsetPath(key)), UTF_8); - return new SubscriptionCursorWithoutToken(key.getEventType(), key.getPartition(), offset); - } catch (final Exception e) { - throw new NakadiRuntimeException(e); - } + public Map getOffsets( + final Collection keys) + throws NakadiRuntimeException, ServiceTemporarilyUnavailableException { + return loadDataAsync(keys, this::getOffsetPath, (etp, value) -> + new SubscriptionCursorWithoutToken(etp.getEventType(), etp.getPartition(), new String(value, UTF_8))); } @Override public void transfer(final String sessionId, final Collection partitions) throws NakadiRuntimeException, SubscriptionNotInitializedException { getLog().info("session " + sessionId + " releases partitions " + partitions); - final Topology topology = readTopology(); + final Topology topology = getTopology(); final List changeSet = new ArrayList<>(); for (final EventTypePartition etp : partitions) { @@ -170,7 +167,10 @@ public void transfer(final String sessionId, final Collection T runLocked(Callable function); + + default void runLocked(final Runnable function) { + runLocked((Callable) () -> { + function.run(); + return null; + }); + } + /** * Creates subscription node in zookeeper on path /nakadi/subscriptions/{subscriptionId} @@ -54,7 +71,7 @@ public interface ZkSubscriptionClient { /** * Updates specified partitions in zk. */ - void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeException, + void updatePartitionsConfiguration(String newSessionsHash, Partition[] partitions) throws NakadiRuntimeException, SubscriptionNotInitializedException; /** @@ -62,7 +79,8 @@ void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeE * * @return List of existing sessions. */ - Session[] listSessions() throws SubscriptionNotInitializedException; + Collection listSessions() + throws SubscriptionNotInitializedException, NakadiRuntimeException, ServiceTemporarilyUnavailableException; boolean isActiveSession(String streamId) throws ServiceUnavailableException; @@ -71,7 +89,7 @@ void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeE * * @return list of partitions related to this subscription. */ - Partition[] listPartitions() throws SubscriptionNotInitializedException, NakadiRuntimeException; + Topology getTopology() throws SubscriptionNotInitializedException, NakadiRuntimeException; /** * Subscribes to changes of session list in /nakadi/subscriptions/{subscriptionId}/sessions. @@ -79,7 +97,7 @@ void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeE * * @param listener method to call on any change of client list. */ - ZkSubscription> subscribeForSessionListChanges(Runnable listener) throws Exception; + ZkSubscription> subscribeForSessionListChanges(Runnable listener) throws NakadiRuntimeException; /** * Subscribe for topology changes. @@ -93,13 +111,16 @@ ZkSubscription subscribeForOffsetChanges( EventTypePartition key, Runnable commitListener); /** - * Returns current offset value for specified partition key. Offset includes timeline and version data. - * The value that is stored there is a view value, so it will look like 001-0001-00000000000000000001 + * Returns committed offset values for specified partition keys. + * Offsets include timeline and version data. + * The value that is stored there is a view value, so it will look like + * 001-0001-00000000000000000001 * - * @param key Key to get offset for + * @param keys Key to get offset for * @return commit offset */ - SubscriptionCursorWithoutToken getOffset(EventTypePartition key) throws NakadiRuntimeException; + Map getOffsets(Collection keys) + throws NakadiRuntimeException; List commitOffsets(List cursors, Comparator comparator); @@ -129,7 +150,7 @@ void transfer(String sessionId, Collection partitions) * @return list of partitions and sessions wrapped in * {@link org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode} */ - ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException; + Optional getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException; /** * Subscribes to cursor reset event. @@ -159,13 +180,24 @@ void resetCursors(List cursors, long timeout) throws OperationTimeoutException, ZookeeperException; class Topology { + @JsonProperty("partitions") private final Partition[] partitions; - private final int version; + // Each topology is based on a list of sessions, that it was built for. + // In case, when list of sessions wasn't changed, one should not actually perform rebalance, cause nothing have + // changed. + @Nullable + @JsonProperty("sessions_hash") + private final String sessionsHash; + @Nullable + @JsonProperty("version") + private final Integer version; public Topology( @JsonProperty("partitions") final Partition[] partitions, - @JsonProperty("version") final int version) { + @Nullable @JsonProperty("sessions_hash") final String sessionsHash, + @Nullable @JsonProperty("version") final Integer version) { this.partitions = partitions; + this.sessionsHash = sessionsHash; this.version = version; } @@ -173,7 +205,7 @@ public Partition[] getPartitions() { return partitions; } - public Topology withUpdatedPartitions(final Partition[] partitions) { + public Topology withUpdatedPartitions(final String newHash, final Partition[] partitions) { final Partition[] resultPartitions = Arrays.copyOf(this.partitions, this.partitions.length); for (final Partition newValue : partitions) { int selectedIdx = -1; @@ -188,15 +220,56 @@ public Topology withUpdatedPartitions(final Partition[] partitions) { } resultPartitions[selectedIdx] = newValue; } - return new Topology(resultPartitions, version + 1); + return new Topology(resultPartitions, newHash, Optional.ofNullable(version).orElse(0) + 1); } @Override public String toString() { return "Topology{" + "partitions=" + Arrays.toString(partitions) + + ", sessionsHash=" + sessionsHash + ", version=" + version + '}'; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Topology topology = (Topology) o; + return Objects.equals(version, topology.version) && + Arrays.equals(partitions, topology.partitions) && + Objects.equals(sessionsHash, topology.sessionsHash); + } + + @Override + public int hashCode() { + return Objects.hash(sessionsHash, version); + } + + public static String calculateSessionsHash(final Collection sessionIds) + throws ServiceTemporarilyUnavailableException { + final MessageDigest md; + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new ServiceTemporarilyUnavailableException("hash algorithm not found", e); + } + sessionIds.stream().sorted().map(String::getBytes).forEach(md::update); + final byte[] digest = md.digest(); + return Hex.encodeHexString(digest); + } + + public boolean isSameHash(final String newHash) { + return Objects.equals(newHash, sessionsHash); + } + + public String getSessionsHash() { + return sessionsHash; + } } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java index 9f8f092ebd..7778cc464d 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java @@ -4,40 +4,23 @@ import org.zalando.nakadi.service.subscription.model.Session; import javax.annotation.Nullable; +import java.util.Collection; import java.util.Optional; -import java.util.stream.Stream; public final class ZkSubscriptionNode { - private Partition[] partitions; - private Session[] sessions; + private final Collection partitions; + private final Collection sessions; - public ZkSubscriptionNode() { - this.partitions = new Partition[0]; - this.sessions = new Session[0]; - } - - public ZkSubscriptionNode(final Partition[] partitions, final Session[] sessions) { - this.partitions = partitions; - this.sessions = sessions; - } - - public void setPartitions(final Partition[] partitions) { + public ZkSubscriptionNode(final Collection partitions, final Collection sessions) { this.partitions = partitions; - } - - public void setSessions(final Session[] sessions) { this.sessions = sessions; } - public Partition[] getPartitions() { + public Collection getPartitions() { return partitions; } - public Session[] getSessions() { - return sessions; - } - public Partition.State guessState(final String eventType, final String partition) { return getPartitionWithActiveSession(eventType, partition) .map(Partition::getState) @@ -45,9 +28,9 @@ public Partition.State guessState(final String eventType, final String partition } private Optional getPartitionWithActiveSession(final String eventType, final String partition) { - return Stream.of(partitions) + return partitions.stream() .filter(p -> p.getPartition().equals(partition) && p.getEventType().equals(eventType)) - .filter(p -> Stream.of(sessions).anyMatch(s -> s.getId().equalsIgnoreCase(p.getSession()))) + .filter(p -> sessions.stream().anyMatch(s -> s.getId().equalsIgnoreCase(p.getSession()))) .findAny(); } diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index a4e849752f..48adabb6aa 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -32,6 +32,7 @@ import org.zalando.nakadi.security.NakadiClient; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorOperationsService; +import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.subscription.SubscriptionService; import org.zalando.nakadi.service.subscription.model.Partition; @@ -40,7 +41,6 @@ import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode; import org.zalando.nakadi.service.timeline.TimelineService; -import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.utils.EventTypeTestBuilder; import org.zalando.nakadi.utils.RandomSubscriptionBuilder; import org.zalando.nakadi.utils.TestUtils; @@ -49,8 +49,12 @@ import org.zalando.problem.ThrowableProblem; import javax.ws.rs.core.Response; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -227,17 +231,18 @@ public void whenGetSubscriptionAndExceptionThenServiceUnavailable() throws Excep @Test public void whenGetSubscriptionStatThenOk() throws Exception { final Subscription subscription = builder().withEventType(TIMELINE.getEventType()).build(); - final Partition[] partitions = { - new Partition(TIMELINE.getEventType(), "0", "xz", null, Partition.State.ASSIGNED)}; - final ZkSubscriptionNode zkSubscriptionNode = new ZkSubscriptionNode(); - zkSubscriptionNode.setPartitions(partitions); - zkSubscriptionNode.setSessions(new Session[]{new Session("xz", 0)}); + final Collection partitions = Collections.singleton( + new Partition(TIMELINE.getEventType(), "0", "xz", null, Partition.State.ASSIGNED)); + final ZkSubscriptionNode zkSubscriptionNode = + new ZkSubscriptionNode(partitions, Arrays.asList(new Session("xz", 0))); when(subscriptionRepository.getSubscription(subscription.getId())).thenReturn(subscription); - when(zkSubscriptionClient.getZkSubscriptionNodeLocked()).thenReturn(zkSubscriptionNode); + when(zkSubscriptionClient.getZkSubscriptionNodeLocked()).thenReturn(Optional.of(zkSubscriptionNode)); final SubscriptionCursorWithoutToken currentOffset = new SubscriptionCursorWithoutToken(TIMELINE.getEventType(), "0", "3"); - when(zkSubscriptionClient.getOffset(new EventTypePartition(TIMELINE.getEventType(), "0"))) - .thenReturn(currentOffset); + final EventTypePartition etp = new EventTypePartition(TIMELINE.getEventType(), "0"); + final Map offsets = new HashMap<>(); + offsets.put(etp, currentOffset); + when(zkSubscriptionClient.getOffsets(Collections.singleton(etp))).thenReturn(offsets); when(eventTypeRepository.findByName(TIMELINE.getEventType())) .thenReturn(EventTypeTestBuilder.builder().name(TIMELINE.getEventType()).build()); final List statistics = Collections.singletonList( @@ -267,7 +272,8 @@ public void whenGetSubscriptionStatThenOk() throws Exception { public void whenGetSubscriptionNoEventTypesThenStatEmpty() throws Exception { final Subscription subscription = builder().withEventType("myET").build(); when(subscriptionRepository.getSubscription(subscription.getId())).thenReturn(subscription); - when(zkSubscriptionClient.getZkSubscriptionNodeLocked()).thenReturn(new ZkSubscriptionNode()); + when(zkSubscriptionClient.getZkSubscriptionNodeLocked()).thenReturn( + Optional.of(new ZkSubscriptionNode(Collections.emptyList(), Collections.emptyList()))); when(eventTypeRepository.findByName("myET")).thenThrow(NoSuchEventTypeException.class); getSubscriptionStats(subscription.getId()) @@ -338,7 +344,7 @@ public boolean supportsParameter(final MethodParameter parameter) { public Object resolveArgument(final MethodParameter parameter, final ModelAndViewContainer mavContainer, final NativeWebRequest webRequest, - final WebDataBinderFactory binderFactory) throws Exception { + final WebDataBinderFactory binderFactory) { return new NakadiClient("nakadiClientId", ""); } } diff --git a/src/test/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancerTest.java b/src/test/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancerTest.java index 576ac7757e..f9ee2154bf 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancerTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/ExactWeightRebalancerTest.java @@ -5,6 +5,8 @@ import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; +import java.util.Arrays; +import java.util.List; import java.util.stream.Stream; import static org.hamcrest.MatcherAssert.assertThat; @@ -52,9 +54,7 @@ public void rebalanceShouldHaveEmptyChangesetForBalancedData() { final ExactWeightRebalancer rebalancer = new ExactWeightRebalancer(); // 1. Data contains only assigned - final Session[] sessions = new Session[]{ - new Session("0", 1), - new Session("1", 1)}; + final List sessions = Arrays.asList(new Session("0", 1), new Session("1", 1)); assertThat(rebalancer.apply(sessions, new Partition[]{ new Partition("0", "0", "0", null, ASSIGNED), @@ -85,7 +85,7 @@ public void rebalanceShouldHaveEmptyChangesetForBalancedData() { @Test public void rebalanceShouldRemoveDeadSessions() { final Partition[] changeset = new ExactWeightRebalancer().apply( - new Session[]{new Session("1", 1), new Session("2", 1)}, + Arrays.asList(new Session("1", 1), new Session("2", 1)), new Partition[]{ new Partition("0", "0", "0", null, ASSIGNED), new Partition("0", "1", "0", "1", REASSIGNING), @@ -107,7 +107,7 @@ public void rebalanceShouldRemoveDeadSessions() { @Test public void rebalanceShouldMoveToReassigningState() { final Partition[] changeset = new ExactWeightRebalancer().apply( - new Session[]{new Session("1", 1), new Session("2", 1), new Session("3", 1)}, + Arrays.asList(new Session("1", 1), new Session("2", 1), new Session("3", 1)), new Partition[]{ new Partition("0", "0", "1", null, ASSIGNED), new Partition("0", "1", "1", null, ASSIGNED), @@ -126,7 +126,7 @@ public void rebalanceShouldMoveToReassigningState() { @Test public void rebalanceShouldTakeRebalancingPartitions() { final Partition[] changeset = new ExactWeightRebalancer().apply( - new Session[]{new Session("1", 1), new Session("2", 1), new Session("3", 1)}, + Arrays.asList(new Session("1", 1), new Session("2", 1), new Session("3", 1)), new Partition[]{ new Partition("0", "0", "1", null, ASSIGNED), new Partition("0", "1", "1", null, ASSIGNED), diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java index 55fd9e1a7f..0f7e060b77 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java @@ -64,10 +64,8 @@ public void prepareMocks() throws Exception { when(metricRegistry.register(any(), any())).thenReturn(null); when(contextMock.getMetricRegistry()).thenReturn(metricRegistry); - final SubscriptionCursorWithoutToken cursor = mock(SubscriptionCursorWithoutToken.class); zkMock = mock(ZkSubscriptionClient.class); when(contextMock.getZkClient()).thenReturn(zkMock); - when(zkMock.getOffset(any())).thenReturn(cursor); cursorConverter = mock(CursorConverter.class); when(contextMock.getCursorConverter()).thenReturn(cursorConverter); @@ -94,7 +92,7 @@ public void prepareMocks() throws Exception { public void ensureTopologyEventListenerRegisteredRefreshedClosed() { final ZkSubscription topologySubscription = mock(ZkSubscription.class); Mockito.when(topologySubscription.getData()) - .thenReturn(new ZkSubscriptionClient.Topology(new Partition[]{}, 1)); + .thenReturn(new ZkSubscriptionClient.Topology(new Partition[]{}, null, 1)); Mockito.when(zkMock.subscribeForTopologyChanges(Mockito.anyObject())).thenReturn(topologySubscription); state.onEnter(); diff --git a/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClientTest.java b/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClientTest.java new file mode 100644 index 0000000000..3b05d5e6e4 --- /dev/null +++ b/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClientTest.java @@ -0,0 +1,45 @@ +package org.zalando.nakadi.service.subscription.zk; + +import org.junit.Assert; +import org.junit.Test; +import org.zalando.nakadi.service.subscription.model.Partition; +import org.zalando.nakadi.utils.TestUtils; + +import java.io.IOException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ZkSubscriptionClientTest { + + @Test + public void testHashCalculationOrder() { + Assert.assertEquals( + ZkSubscriptionClient.Topology.calculateSessionsHash(Stream.of("1", "2").collect(Collectors.toList())), + ZkSubscriptionClient.Topology.calculateSessionsHash(Stream.of("2", "1").collect(Collectors.toList())) + ); + } + + @Test + public void testHashCalculationDifferent() { + Assert.assertNotEquals( + ZkSubscriptionClient.Topology.calculateSessionsHash(Stream.of("1", "3").collect(Collectors.toList())), + ZkSubscriptionClient.Topology.calculateSessionsHash(Stream.of("2", "1").collect(Collectors.toList())) + ); + } + + @Test + public void testSerializationDeserialization() throws IOException { + final ZkSubscriptionClient.Topology first = new ZkSubscriptionClient.Topology( + new Partition[]{new Partition("1", "2", "3", "4", Partition.State.ASSIGNED)}, + "123", + 456); + + final String serialized = TestUtils.OBJECT_MAPPER.writer().writeValueAsString(first); + + final ZkSubscriptionClient.Topology second = TestUtils.OBJECT_MAPPER.readValue( + serialized, ZkSubscriptionClient.Topology.class); + + Assert.assertEquals(first, second); + } + +} diff --git a/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java b/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java index aa66aef18f..f8ddac5fc3 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java @@ -6,6 +6,8 @@ import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; +import java.util.List; + import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; @@ -15,19 +17,19 @@ public class ZkSubscriptionNodeTest { @Before public void before() { - final Partition[] partitions = ImmutableList.of( + final List partitions = ImmutableList.of( new Partition("et1", "0", "stream1", null, Partition.State.ASSIGNED), new Partition("et1", "1", "stream2", "stream4", Partition.State.REASSIGNING), new Partition("et2", "0", "stream3", null, Partition.State.UNASSIGNED), new Partition("et2", "1", null, null, null) - ).toArray(new Partition[4]); + ); - final Session[] sessions = ImmutableList.of( + final List sessions = ImmutableList.of( new Session("stream1", 1), new Session("stream2", 1), new Session("stream3", 1), new Session("stream4", 1) - ).toArray(new Session[4]); + ); zkSubscriptionNode = new ZkSubscriptionNode(partitions, sessions); }