Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #836 from zalando/ARUHA-1510
Browse files Browse the repository at this point in the history
Speed up rebalance process
  • Loading branch information
antban authored Feb 15, 2018
2 parents 485927c + d660473 commit 37ef951
Show file tree
Hide file tree
Showing 19 changed files with 371 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ private String subscriptionPath() {

private void setPartitions(final Partition[] partitions) throws Exception {
final String topologyPath = subscriptionPath() + "/topology";
final byte[] topologyData = MAPPER.writeValueAsBytes(new NewZkSubscriptionClient.Topology(partitions, 0));
final byte[] topologyData = MAPPER.writeValueAsBytes(
new NewZkSubscriptionClient.Topology(partitions, null, 0));
if (null == CURATOR.checkExists().forPath(topologyPath)) {
CURATOR.create().forPath(topologyPath, topologyData);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.zalando.nakadi.view.SubscriptionCursor;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

import java.util.Collection;
import java.util.List;

/**
Expand All @@ -31,7 +32,7 @@ NakadiCursor convert(String eventTypeName, Cursor cursor) throws
NakadiCursor convert(SubscriptionCursorWithoutToken cursor) throws
InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException, InvalidCursorException;

List<NakadiCursor> convert(List<SubscriptionCursorWithoutToken> cursor) throws
List<NakadiCursor> convert(Collection<SubscriptionCursorWithoutToken> cursor) throws
InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException, InvalidCursorException;


Expand Down
13 changes: 9 additions & 4 deletions src/main/java/org/zalando/nakadi/service/CursorsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.db.EventTypeCache;
Expand Down Expand Up @@ -107,7 +108,8 @@ private void validateStreamId(final List<NakadiCursor> cursors, final String str
throw new InvalidStreamIdException("Session with stream id " + streamId + " not found", streamId);
}

final Map<EventTypePartition, String> partitionSessions = Stream.of(subscriptionClient.listPartitions())
final Map<EventTypePartition, String> partitionSessions = Stream
.of(subscriptionClient.getTopology().getPartitions())
.collect(Collectors.toMap(Partition::getKey, Partition::getSession));
for (final NakadiCursor cursor : cursors) {
final EventTypePartition etPartition = cursor.getEventTypePartition();
Expand All @@ -124,20 +126,23 @@ private void validateStreamId(final List<NakadiCursor> cursors, final String str
}

public List<SubscriptionCursorWithoutToken> getSubscriptionCursors(final String subscriptionId)
throws NakadiException {
throws NakadiException, ServiceTemporarilyUnavailableException {
final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId);
final ZkSubscriptionClient zkSubscriptionClient = zkSubscriptionFactory.createClient(
subscription, "subscription." + subscriptionId + ".get_cursors");
final ImmutableList.Builder<SubscriptionCursorWithoutToken> cursorsListBuilder = ImmutableList.builder();

Partition[] partitions;
try {
partitions = zkSubscriptionClient.listPartitions();
partitions = zkSubscriptionClient.getTopology().getPartitions();
} catch (final SubscriptionNotInitializedException ex) {
partitions = new Partition[]{};
}
final Map<EventTypePartition, SubscriptionCursorWithoutToken> positions = zkSubscriptionClient.getOffsets(
Stream.of(partitions).map(Partition::getKey).collect(Collectors.toList()));

for (final Partition p : partitions) {
cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey()));
cursorsListBuilder.add(positions.get(p.getKey()));
}
return cursorsListBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.zalando.nakadi.view.SubscriptionCursor;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

import java.util.Collection;
import java.util.EnumMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -48,7 +49,7 @@ public NakadiCursor convert(final SubscriptionCursorWithoutToken cursor)
}

@Override
public List<NakadiCursor> convert(final List<SubscriptionCursorWithoutToken> cursors)
public List<NakadiCursor> convert(final Collection<SubscriptionCursorWithoutToken> cursors)
throws InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException,
InvalidCursorException {
final LinkedHashMap<SubscriptionCursorWithoutToken, AtomicReference<NakadiCursor>> orderingMap =
Expand All @@ -57,7 +58,7 @@ public List<NakadiCursor> convert(final List<SubscriptionCursorWithoutToken> cur

final Map<Version, List<SubscriptionCursorWithoutToken>> mappedByVersions = cursors.stream()
.collect(Collectors.groupingBy(c -> guessVersion(c.getOffset())));
for (final Map.Entry<Version, List<SubscriptionCursorWithoutToken>> entry: mappedByVersions.entrySet()) {
for (final Map.Entry<Version, List<SubscriptionCursorWithoutToken>> entry : mappedByVersions.entrySet()) {
final List<NakadiCursor> result = converters.get(entry.getKey()).convertBatched(entry.getValue());
IntStream.range(0, entry.getValue().size())
.forEach(idx -> orderingMap.get(entry.getValue().get(idx)).set(result.get(idx)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -13,10 +14,10 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

class ExactWeightRebalancer implements BiFunction<Session[], Partition[], Partition[]> {
class ExactWeightRebalancer implements BiFunction<Collection<Session>, Partition[], Partition[]> {
@Override
public Partition[] apply(final Session[] sessions, final Partition[] currentPartitions) {
final Map<String, Integer> activeSessionWeights = Stream.of(sessions)
public Partition[] apply(final Collection<Session> sessions, final Partition[] currentPartitions) {
final Map<String, Integer> activeSessionWeights = sessions.stream()
.collect(Collectors.toMap(Session::getId, Session::getWeight));
// sorted session ids.
final List<String> activeSessionIds = activeSessionWeights.keySet().stream().sorted()
Expand Down Expand Up @@ -45,7 +46,7 @@ public Partition[] apply(final Session[] sessions, final Partition[] currentPart
final List<Partition> candidates = partitions.get(sessionId);
final Partition toTakeItem = candidates.stream()
.filter(p -> p.getState() == Partition.State.REASSIGNING).findAny().orElse(
candidates.get(candidates.size() - 1));
candidates.get(candidates.size() - 1));
candidates.remove(toTakeItem);
toRebalance.add(toTakeItem);
toTake -= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
Expand All @@ -37,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class StreamingContext implements SubscriptionStreamer {

Expand All @@ -54,7 +56,7 @@ public class StreamingContext implements SubscriptionStreamer {
private final BlacklistService blacklistService;
private final ScheduledExecutorService timer;
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private final BiFunction<Session[], Partition[], Partition[]> rebalancer;
private final BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer;
private final String loggingPath;
private final CursorConverter cursorConverter;
private final Subscription subscription;
Expand Down Expand Up @@ -212,7 +214,7 @@ public void switchState(final State newState) {
});
}

public void registerSession() throws Exception {
public void registerSession() throws NakadiRuntimeException {
log.info("Registering session {}", session);
// Install rebalance hook on client list change.
sessionListSubscription = zkClient.subscribeForSessionListChanges(() -> addTask(this::rebalance));
Expand Down Expand Up @@ -264,11 +266,24 @@ public ObjectMapper getObjectMapper() {
private void rebalance() {
if (null != sessionListSubscription) {
// This call is needed to renew subscription for session list changes.
sessionListSubscription.getData();
final List<String> newSessions = sessionListSubscription.getData();
final String sessionsHash = ZkSubscriptionClient.Topology.calculateSessionsHash(newSessions);
zkClient.runLocked(() -> {
final Partition[] changeset = rebalancer.apply(zkClient.listSessions(), zkClient.listPartitions());
if (changeset.length > 0) {
zkClient.updatePartitionsConfiguration(changeset);
final ZkSubscriptionClient.Topology topology = zkClient.getTopology();

if (!topology.isSameHash(sessionsHash)) {
log.info("Performing rebalance, hash changed: {}", sessionsHash);
final Collection<Session> newSessionsUnderLock = zkClient.listSessions();

// after taking the lock list of sessions may change, so we need to update hash to correct value.
final Partition[] changeset = rebalancer.apply(newSessionsUnderLock, topology.getPartitions());
if (changeset.length > 0) {
final String actualHash = ZkSubscriptionClient.Topology.calculateSessionsHash(
newSessionsUnderLock.stream().map(Session::getId).collect(Collectors.toList()));
zkClient.updatePartitionsConfiguration(actualHash, changeset);
}
} else {
log.info("Skipping rebalance, because hash is the same: {}", sessionsHash);
}
});
}
Expand Down Expand Up @@ -314,7 +329,7 @@ public static final class Builder {
private Session session;
private ScheduledExecutorService timer;
private ZkSubscriptionClient zkClient;
private BiFunction<Session[], Partition[], Partition[]> rebalancer;
private BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer;
private long kafkaPollTimeout;
private String loggingPath;
private AtomicBoolean connectionReady;
Expand Down Expand Up @@ -369,7 +384,7 @@ public Builder setZkClient(final ZkSubscriptionClient zkClient) {
return this;
}

public Builder setRebalancer(final BiFunction<Session[], Partition[], Partition[]> rebalancer) {
public Builder setRebalancer(final BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer) {
this.rebalancer = rebalancer;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.domain.ItemsWrapper;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.PaginationLinks;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.zalando.nakadi.service.FeatureToggleService;
import org.zalando.nakadi.service.NakadiKpiPublisher;
import org.zalando.nakadi.service.Result;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode;
Expand All @@ -55,13 +57,13 @@
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class SubscriptionService {
Expand Down Expand Up @@ -242,7 +244,7 @@ private List<SubscriptionEventTypeStats> createSubscriptionStat(final Subscripti
throw new ServiceTemporarilyUnavailableException(e);
}

final ZkSubscriptionNode zkSubscriptionNode = subscriptionClient.getZkSubscriptionNodeLocked();
final Optional<ZkSubscriptionNode> zkSubscriptionNode = subscriptionClient.getZkSubscriptionNodeLocked();

return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient, topicPartitions);
}
Expand All @@ -265,18 +267,15 @@ private List<PartitionEndStatistics> loadPartitionEndStatistics(final Collection

private List<SubscriptionEventTypeStats> loadStats(
final Collection<EventType> eventTypes,
final ZkSubscriptionNode subscriptionNode,
final Optional<ZkSubscriptionNode> subscriptionNode,
final ZkSubscriptionClient client,
final List<PartitionEndStatistics> stats)
throws ServiceTemporarilyUnavailableException, InconsistentStateException {
final List<SubscriptionEventTypeStats> result = new ArrayList<>(eventTypes.size());
final List<NakadiCursor> committedPositions;
try {
committedPositions = loadCommittedPositions(subscriptionNode, client);
} catch (final InternalNakadiException | NoSuchEventTypeException | InvalidCursorException |
ServiceUnavailableException e) {
throw new ServiceTemporarilyUnavailableException(e);
}

final Collection<NakadiCursor> committedPositions = subscriptionNode
.map(node -> loadCommittedPositions(node.getPartitions(), client))
.orElse(Collections.emptyList());

for (final EventType eventType : eventTypes) {
final List<SubscriptionEventTypeStats.Partition> resultPartitions = new ArrayList<>(stats.size());
Expand All @@ -298,28 +297,36 @@ private List<SubscriptionEventTypeStats> loadStats(
})
.orElse(null);

final String state = subscriptionNode.guessState(stat.getTimeline().getEventType(), stat.getPartition())
.getDescription();
final String streamId = Optional.ofNullable(subscriptionNode.guessStream(
stat.getTimeline().getEventType(), stat.getPartition())).orElse("");
final Partition.State state = subscriptionNode
.map(node -> node.guessState(stat.getTimeline().getEventType(), stat.getPartition()))
.orElse(Partition.State.UNASSIGNED);

final String streamId = subscriptionNode
.map(node -> node.guessStream(stat.getTimeline().getEventType(), stat.getPartition()))
.orElse("");

resultPartitions.add(new SubscriptionEventTypeStats.Partition(
lastPosition.getPartition(), state, distance, streamId));
lastPosition.getPartition(), state.getDescription(), distance, streamId));
}
resultPartitions.sort(Comparator.comparing(SubscriptionEventTypeStats.Partition::getPartition));
result.add(new SubscriptionEventTypeStats(eventType.getName(), resultPartitions));
}
return result;
}

private List<NakadiCursor> loadCommittedPositions(
final ZkSubscriptionNode subscriptionNode,
final ZkSubscriptionClient client) throws InternalNakadiException, InvalidCursorException,
NoSuchEventTypeException, ServiceUnavailableException {
final List<SubscriptionCursorWithoutToken> views = Stream.of(subscriptionNode.getPartitions()).map(
partition -> client.getOffset(partition.getKey()))
.collect(Collectors.toList());
return converter.convert(views);
private Collection<NakadiCursor> loadCommittedPositions(
final Collection<Partition> partitions, final ZkSubscriptionClient client)
throws ServiceTemporarilyUnavailableException {
try {

final Map<EventTypePartition, SubscriptionCursorWithoutToken> committed = client.getOffsets(
partitions.stream().map(Partition::getKey).collect(Collectors.toList()));

return converter.convert(committed.values());
} catch (final InternalNakadiException | NoSuchEventTypeException | InvalidCursorException |
ServiceUnavailableException e) {
throw new ServiceTemporarilyUnavailableException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Objects;

public class Partition {
public enum State {
Expand Down Expand Up @@ -135,4 +136,26 @@ public String getSessionOrNextSession() {
public String toString() {
return eventType + ":" + partition + "->" + state + ":" + session + "->" + nextSession;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Partition partition1 = (Partition) o;
return Objects.equals(eventType, partition1.eventType) &&
Objects.equals(partition, partition1.partition) &&
Objects.equals(session, partition1.session) &&
Objects.equals(nextSession, partition1.nextSession) &&
state == partition1.state;
}

@Override
public int hashCode() {

return Objects.hash(eventType, partition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.model.Session;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;
Expand Down Expand Up @@ -50,7 +49,7 @@ protected Response.StatusType getStatus() {
* 1. Checks, that subscription node is present in zk. If not - creates it.
* <p>
* 2. If cursor reset is in progress it will switch to cleanup state.
* <p>
* <p>У
* 3. Registers session.
* <p>
* 4. Switches to streaming state.
Expand All @@ -59,9 +58,8 @@ private void createSubscriptionLocked() {
final boolean subscriptionJustInitialized = initializeSubscriptionLocked(getZk(),
getContext().getSubscription(), getContext().getTimelineService(), getContext().getCursorConverter());
if (!subscriptionJustInitialized) {
final Session[] sessions = getZk().listSessions();
final Partition[] partitions = getZk().listPartitions();
if (sessions.length >= partitions.length) {
final Partition[] partitions = getZk().getTopology().getPartitions();
if (getZk().listSessions().size() >= partitions.length) {
switchState(new CleanupState(new NoStreamingSlotsAvailable(partitions.length)));
return;
}
Expand Down
Loading

0 comments on commit 37ef951

Please sign in to comment.