Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #690 from zalando/ARUHA-874
Browse files Browse the repository at this point in the history
ARUHA-874 Optimize subscription stats endpoint
  • Loading branch information
v-stepanov authored Aug 3, 2017
2 parents 82a3e2b + c35db03 commit c3c359b
Show file tree
Hide file tree
Showing 23 changed files with 635 additions and 377 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Fixed
- Fixed bug with incorrect lag calculation for subscirption.
- Optimized subscription stats endpoint for subscriptions with many event types inside.

### Changed
- Now it's possible to have a digit after the dot in event-type name.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.jayway.restassured.RestAssured;
import com.jayway.restassured.http.ContentType;
import static org.hamcrest.core.IsEqual.equalTo;
import org.junit.Before;
import org.junit.Test;
import static org.springframework.http.HttpStatus.OK;
import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.utils.EventTypeTestBuilder;
import static org.zalando.nakadi.utils.TestUtils.randomTextString;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;

import static org.hamcrest.core.IsEqual.equalTo;
import static org.springframework.http.HttpStatus.OK;
import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
import static org.zalando.nakadi.utils.TestUtils.randomTextString;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.postEvents;

public class CursorOperationsAT {
Expand Down Expand Up @@ -199,7 +200,10 @@ public void unshiftCursor() {
.when()
.post("/event-types/" + eventType.getName() + "/shifted-cursors")
.then()
.statusCode(UNPROCESSABLE_ENTITY.value());
.statusCode(OK.value())
.body("size()", equalTo(1))
.body("offset[0]", equalTo("001-0001-000000000000000002"))
.body("partition[0]", equalTo("0"));

NakadiTestUtils.switchTimelineDefaultStorage(eventType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import static com.jayway.restassured.RestAssured.given;
import static com.jayway.restassured.RestAssured.when;
import static com.jayway.restassured.http.ContentType.JSON;
import java.io.IOException;
import static java.text.MessageFormat.format;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.http.HttpStatus;
import static org.apache.http.HttpStatus.SC_CONFLICT;
import static org.apache.http.HttpStatus.SC_NO_CONTENT;
import static org.apache.http.HttpStatus.SC_OK;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -33,26 +14,46 @@
import org.zalando.nakadi.domain.ItemsWrapper;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END;
import org.zalando.nakadi.domain.SubscriptionEventTypeStats;
import org.zalando.nakadi.service.BlacklistService;
import org.zalando.nakadi.utils.JsonTestHelper;
import org.zalando.nakadi.utils.RandomSubscriptionBuilder;
import static org.zalando.nakadi.utils.TestUtils.waitFor;
import org.zalando.nakadi.view.Cursor;
import org.zalando.nakadi.view.SubscriptionCursor;
import org.zalando.nakadi.webservice.BaseAT;
import org.zalando.nakadi.webservice.SettingsControllerAT;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;
import org.zalando.nakadi.webservice.utils.TestStreamingClient;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.jayway.restassured.RestAssured.given;
import static com.jayway.restassured.RestAssured.when;
import static com.jayway.restassured.http.ContentType.JSON;
import static java.text.MessageFormat.format;
import static org.apache.http.HttpStatus.SC_CONFLICT;
import static org.apache.http.HttpStatus.SC_NO_CONTENT;
import static org.apache.http.HttpStatus.SC_OK;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END;
import static org.zalando.nakadi.utils.TestUtils.waitFor;
import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken;
import static org.zalando.nakadi.webservice.hila.StreamBatch.singleEventBatch;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvent;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents;
import org.zalando.nakadi.webservice.utils.TestStreamingClient;
import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN;

public class HilaAT extends BaseAT {
Expand Down Expand Up @@ -289,7 +290,7 @@ public void testGetSubscriptionStat() throws Exception {
List<SubscriptionEventTypeStats> subscriptionStats =
Collections.singletonList(new SubscriptionEventTypeStats(
eventType.getName(),
Collections.singleton(
Collections.singletonList(
new SubscriptionEventTypeStats.Partition("0", "assigned", 15L, client.getSessionId())))
);
NakadiTestUtils.getSubscriptionStat(subscription)
Expand All @@ -303,7 +304,7 @@ public void testGetSubscriptionStat() throws Exception {
subscriptionStats =
Collections.singletonList(new SubscriptionEventTypeStats(
eventType.getName(),
Collections.singleton(
Collections.singletonList(
new SubscriptionEventTypeStats.Partition("0", "assigned", 5L, client.getSessionId())))
);
NakadiTestUtils.getSubscriptionStat(subscription)
Expand Down Expand Up @@ -335,15 +336,15 @@ public void testSubscriptionStatsMultiET() throws IOException {
.then()
.content(new StringContains(JSON_TEST_HELPER.asJsonString(new SubscriptionEventTypeStats(
eventTypes.get(0).getName(),
Sets.newHashSet(new SubscriptionEventTypeStats.Partition(
Collections.singletonList(new SubscriptionEventTypeStats.Partition(
"0",
"assigned",
1L,
client.getSessionId()
))))))
.content(new StringContains(JSON_TEST_HELPER.asJsonString(new SubscriptionEventTypeStats(
eventTypes.get(1).getName(),
Sets.newHashSet(new SubscriptionEventTypeStats.Partition(
Collections.singletonList(new SubscriptionEventTypeStats.Partition(
"0",
"assigned",
2L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public ResponseEntity<?> moveCursors(@PathVariable("eventTypeName") final String
}

@RequestMapping(path = "/event-types/{eventTypeName}/cursors-lag", method = RequestMethod.POST)
public ResponseEntity<?> cursorsLag(@PathVariable("eventTypeName") final String eventTypeName,
public List<CursorLag> cursorsLag(@PathVariable("eventTypeName") final String eventTypeName,
@Valid @RequestBody final ValidListWrapper<Cursor> cursors,
final Client client) throws InternalNakadiException, NoSuchEventTypeException {
// TODO: remove once new authorization is in place
Expand All @@ -137,10 +137,8 @@ public ResponseEntity<?> cursorsLag(@PathVariable("eventTypeName") final String
final List<NakadiCursorLag> lagResult = cursorOperationsService
.cursorsLag(eventTypeName, domainCursor);

final List<CursorLag> viewResult = lagResult.stream().map(this::toCursorLag)
return lagResult.stream().map(this::toCursorLag)
.collect(Collectors.toList());

return status(OK).body(viewResult);
}

@ExceptionHandler(InvalidCursorOperation.class)
Expand All @@ -156,6 +154,7 @@ private String clientErrorMessage(final InvalidCursorOperation.Reason reason) {
case TIMELINE_NOT_FOUND: return "Timeline not found. It might happen in case the cursor refers to a " +
"timeline that has already expired.";
case PARTITION_NOT_FOUND: return "Partition not found.";
case CURSOR_FORMAT_EXCEPTION: return "Сursor format is not supported.";
case CURSORS_WITH_DIFFERENT_PARTITION: return "Cursors with different partition. Pairs of cursors should " +
"have matching partitions.";
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.PRECONDITION_FAILED;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -54,7 +35,6 @@
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.metrics.MetricUtils;
import static org.zalando.nakadi.metrics.MetricUtils.metricNameFor;
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.TopicRepository;
Expand All @@ -71,10 +51,32 @@
import org.zalando.nakadi.service.EventTypeChangeListener;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.util.FeatureToggleService;
import static org.zalando.nakadi.util.FeatureToggleService.Feature.LIMIT_CONSUMERS_NUMBER;
import org.zalando.nakadi.view.Cursor;
import org.zalando.problem.Problem;

import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.PRECONDITION_FAILED;
import static org.zalando.nakadi.metrics.MetricUtils.metricNameFor;
import static org.zalando.nakadi.util.FeatureToggleService.Feature.LIMIT_CONSUMERS_NUMBER;

@RestController
public class EventStreamController {

Expand Down Expand Up @@ -135,6 +137,14 @@ List<NakadiCursor> getStreamingStart(final EventType eventType, final String cur
} catch (final IOException ex) {
throw new UnparseableCursorException("incorrect syntax of X-nakadi-cursors header", ex, cursorsStr);
}
// Unfortunately, In order to have consistent exception checking, one can not just call validator
for (final Cursor cursor: cursors) {
if (null == cursor.getPartition()) {
throw new InvalidCursorException(CursorError.NULL_PARTITION, cursor);
} else if (null == cursor.getOffset()) {
throw new InvalidCursorException(CursorError.NULL_OFFSET, cursor);
}
}
}
final Timeline latestTimeline = timelineService.getTimeline(eventType);
final TopicRepository latestTopicRepository = timelineService.getTopicRepository(latestTimeline);
Expand All @@ -148,20 +158,17 @@ List<NakadiCursor> getStreamingStart(final EventType eventType, final String cur
throw new InvalidCursorException(CursorError.INVALID_FORMAT);
}

final Map<TopicRepository, List<NakadiCursor>> timelineToCursors = new HashMap<>();
final Map<TopicRepository, List<NakadiCursor>> topicRepoToCursors = new HashMap<>();
for (final NakadiCursor c: result) {
if (c.getTimeline().isDeleted()) {
throw new InvalidCursorException(CursorError.UNAVAILABLE, c);
}
final TopicRepository topicRepository = timelineService.getTopicRepository(c.getTimeline());
List<NakadiCursor> cursorList = timelineToCursors.get(topicRepository);
if (cursorList == null) {
cursorList = new ArrayList<>();
timelineToCursors.put(topicRepository, cursorList);
}
cursorList.add(c);
topicRepoToCursors
.computeIfAbsent(topicRepository, k -> new ArrayList<>())
.add(c);
}
for (final Map.Entry<TopicRepository, List<NakadiCursor>> entry : timelineToCursors.entrySet()) {
for (final Map.Entry<TopicRepository, List<NakadiCursor>> entry : topicRepoToCursors.entrySet()) {
entry.getKey().validateReadCursors(entry.getValue());
}
return result;
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/org/zalando/nakadi/domain/NakadiCursorLag.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
public class NakadiCursorLag {
private final NakadiCursor firstCursor;
private final NakadiCursor lastCursor;
private long lag;
private final long lag;

public NakadiCursorLag(final NakadiCursor first, final NakadiCursor last) {
public NakadiCursorLag(final NakadiCursor first, final NakadiCursor last, final long lag) {
this.firstCursor = first;
this.lastCursor = last;
this.lag = lag;
}

public String getTopic() {
Expand All @@ -30,7 +31,4 @@ public long getLag() {
return lag;
}

public void setLag(final long lag) {
this.lag = lag;
}
}
12 changes: 11 additions & 1 deletion src/main/java/org/zalando/nakadi/domain/ShiftedNakadiCursor.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.zalando.nakadi.domain;

public class ShiftedNakadiCursor extends NakadiCursor {
private long shift = 0;
private final long shift;

public ShiftedNakadiCursor(final Timeline timeline, final String partition, final String offset, final long shift) {
super(timeline, partition, offset);
Expand All @@ -11,4 +11,14 @@ public ShiftedNakadiCursor(final Timeline timeline, final String partition, fina
public long getShift() {
return shift;
}

@Override
public String toString() {
return "ShiftedNakadiCursor{" +
"partition='" + getPartition() + '\'' +
", offset='" + getOffset() + '\'' +
", shift='" + shift + '\'' +
", timeline='" + getTimeline() + '\'' +
'}';
}
}
2 changes: 2 additions & 0 deletions src/main/java/org/zalando/nakadi/domain/SubscriptionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Collections;
Expand Down Expand Up @@ -36,6 +37,7 @@ public enum InitialPosition {
@NotNull
private InitialPosition readFrom = InitialPosition.END;

@Valid
private List<SubscriptionCursorWithoutToken> initialCursors = ImmutableList.of();

public SubscriptionBase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import java.util.Collections;
import java.util.Set;
import java.util.List;

@Immutable
public class SubscriptionEventTypeStats {

private final String eventType;
private final Set<Partition> partitions;
private final List<Partition> partitions;

public SubscriptionEventTypeStats(
@JsonProperty("event_type") final String eventType,
@JsonProperty("partitions") final Set<Partition> partitions) {
@JsonProperty("partitions") final List<Partition> partitions) {
this.eventType = eventType;
this.partitions = partitions;
}
Expand All @@ -25,8 +25,8 @@ public String getEventType() {
return eventType;
}

public Set<Partition> getPartitions() {
return Collections.unmodifiableSet(partitions);
public List<Partition> getPartitions() {
return Collections.unmodifiableList(partitions);
}

@Immutable
Expand Down
Loading

0 comments on commit c3c359b

Please sign in to comment.