Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #579 from zalando/ARUHA-591-start-from-cursors
Browse files Browse the repository at this point in the history
Aruha 591 start from cursors
  • Loading branch information
v-stepanov authored Mar 2, 2017
2 parents ad61937 + 0d1b787 commit 8c750b1
Show file tree
Hide file tree
Showing 30 changed files with 1,305 additions and 577 deletions.
20 changes: 20 additions & 0 deletions api/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,18 @@ definitions:
- event_type
- cursor_token

SubscriptionCursorWithoutToken:
allOf:
- $ref: '#/definitions/Cursor'
- type: object
properties:
event_type:
type: string
description: |
The name of the event type this partition's events belong to.
required:
- event_type

CursorCommitResult:
description: |
The result of single cursor commit. Holds a cursor itself and a result value.
Expand Down Expand Up @@ -1638,8 +1650,16 @@ definitions:
Position to start reading events from. Currently supported values:
- `begin` - read from the oldest available event.
- `end` - read from the most recent offset.
- `cursors` - read from cursors provided in `initial_cursors` property.
Applied in the moment when client starts reading from a subscription.
default: 'end'
initial_cursors:
type: array
items:
$ref: '#/definitions/SubscriptionCursorWithoutToken'
description: |
List of cursors to start reading from. This property is required when `read_from` = `cursors`.
The initial cursors should cover all partitions of subscription.
required:
- owning_application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.zalando.nakadi.config.JsonConfig;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.exceptions.DuplicatedSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.DuplicatedSubscriptionException;
import org.zalando.nakadi.exceptions.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.util.UUIDGenerator;
Expand Down Expand Up @@ -37,7 +37,7 @@ public class SubscriptionDbRepositoryTest extends AbstractDbRepositoryTest {
private SubscriptionDbRepository repository;

public SubscriptionDbRepositoryTest() {
super(new String[]{"zn_data.subscription"});
super("zn_data.subscription");
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.jayway.restassured.response.Response;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.curator.framework.CuratorFramework;
import org.apache.http.HttpStatus;
import org.apache.zookeeper.data.Stat;
Expand All @@ -18,23 +14,35 @@
import org.zalando.nakadi.config.JsonConfig;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.EventTypeBase;
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.domain.PaginationLinks;
import org.zalando.nakadi.domain.PaginationWrapper;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.utils.JsonTestHelper;
import org.zalando.nakadi.utils.RandomSubscriptionBuilder;
import org.zalando.nakadi.view.Cursor;
import org.zalando.nakadi.view.SubscriptionCursor;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;
import org.zalando.nakadi.webservice.BaseAT;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;
import org.zalando.nakadi.webservice.utils.TestStreamingClient;
import org.zalando.nakadi.webservice.utils.ZookeeperTestUtils;
import org.zalando.problem.Problem;
import org.zalando.problem.ThrowableProblem;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.jayway.restassured.RestAssured.get;
import static com.jayway.restassured.RestAssured.given;
import static com.jayway.restassured.RestAssured.when;
import static com.jayway.restassured.http.ContentType.JSON;
import static java.text.MessageFormat.format;
import static java.util.stream.IntStream.range;
import static javax.ws.rs.core.Response.Status.CONFLICT;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand All @@ -46,8 +54,10 @@
import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType;
import static org.zalando.nakadi.utils.TestUtils.randomUUID;
import static org.zalando.nakadi.utils.TestUtils.waitFor;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createBusinessEventTypeWithPartitions;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscriptionForEventType;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishBusinessEventWithUserDefinedPartition;
import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN;

public class SubscriptionAT extends BaseAT {
Expand Down Expand Up @@ -208,6 +218,73 @@ public void testOffsetsCommit() throws Exception {
assertThat(committedOffset, equalTo("25"));
}

@Test
public void testSubscriptionWithReadFromCursorsWithoutInitialCursors() throws Exception {
final EventType eventType = createEventType();

final SubscriptionBase subscriptionBase = RandomSubscriptionBuilder.builder()
.withEventType(eventType.getName())
.withStartFrom(SubscriptionBase.InitialPosition.CURSORS)
.buildSubscriptionBase();

given()
.body(JSON_HELPER.asJsonString(subscriptionBase))
.contentType(JSON)
.post(SUBSCRIPTIONS_URL)
.then()
.statusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY)
.body("detail", equalTo("initial_cursors should contain cursors for all partitions of subscription"));
}

@Test
public void testSubscriptionWithInitialCursors() throws Exception {
final EventType et1 = createBusinessEventTypeWithPartitions(2);
final EventType et2 = createBusinessEventTypeWithPartitions(2);

// write 10 events to each partition of two event-types
range(0, 10).forEach(x -> publishBusinessEventWithUserDefinedPartition(et1.getName(), "dummy", "0"));
range(0, 10).forEach(x -> publishBusinessEventWithUserDefinedPartition(et1.getName(), "dummy", "1"));
range(0, 10).forEach(x -> publishBusinessEventWithUserDefinedPartition(et2.getName(), "dummy", "0"));
range(0, 10).forEach(x -> publishBusinessEventWithUserDefinedPartition(et2.getName(), "dummy", "1"));

// create subscription with initial cursors
final SubscriptionBase subscriptionBase = RandomSubscriptionBuilder.builder()
.withEventTypes(ImmutableSet.of(et1.getName(), et2.getName()))
.withStartFrom(SubscriptionBase.InitialPosition.CURSORS)
.withInitialCursors(ImmutableList.of(
new SubscriptionCursorWithoutToken(et1.getName(), "0", "000000000000000007"),
new SubscriptionCursorWithoutToken(et1.getName(), "1", "000000000000000002"),
new SubscriptionCursorWithoutToken(et2.getName(), "0", Cursor.BEFORE_OLDEST_OFFSET),
new SubscriptionCursorWithoutToken(et2.getName(), "1", "000000000000000009")
))
.buildSubscriptionBase();
final Subscription subscription = createSubscription(subscriptionBase);

final TestStreamingClient client = TestStreamingClient
.create(URL, subscription.getId(), "max_uncommitted_events=100")
.start();
waitFor(() -> assertThat(client.getBatches(), hasSize(19))); // we should read 19 events in total
final List<StreamBatch> batches = client.getBatches();

// check that first events of each partition have correct offsets
assertThat(getFirstBatchOffsetFor(batches, new EventTypePartition(et1.getName(), "0")),
equalTo(Optional.of("000000000000000008")));
assertThat(getFirstBatchOffsetFor(batches, new EventTypePartition(et1.getName(), "1")),
equalTo(Optional.of("000000000000000003")));
assertThat(getFirstBatchOffsetFor(batches, new EventTypePartition(et2.getName(), "0")),
equalTo(Optional.of("000000000000000000")));
assertThat(getFirstBatchOffsetFor(batches, new EventTypePartition(et2.getName(), "1")),
equalTo(Optional.empty()));
}

private Optional<String> getFirstBatchOffsetFor(final List<StreamBatch> batches,
final EventTypePartition etPartition) {
return batches.stream()
.filter(b -> etPartition.ownsCursor(b.getCursor()))
.findFirst()
.map(b -> b.getCursor().getOffset());
}

@Test
public void testGetSubscriptionCursors() throws IOException, InterruptedException {
final String etName = createEventType().getName();
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.zalando.nakadi.exceptions.IllegalClientIdException;
import org.zalando.nakadi.exceptions.IllegalScopeException;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException;
import org.zalando.problem.Problem;
import org.zalando.problem.spring.web.advice.ProblemHandling;
import org.zalando.problem.spring.web.advice.Responses;
Expand Down Expand Up @@ -85,4 +87,18 @@ public ResponseEntity<Problem> handleExceptionWrapper(final NakadiRuntimeExcepti
}
throw exception.getException();
}

@ExceptionHandler(RepositoryProblemException.class)
public ResponseEntity<Problem> handleRepositoryProblem(final RepositoryProblemException exception,
final NativeWebRequest request) {
LOG.error("Repository problem occurred", exception);
return Responses.create(Response.Status.SERVICE_UNAVAILABLE, exception.getMessage(), request);
}

@ExceptionHandler(MyNakadiRuntimeException1.class)
public ResponseEntity<Problem> handleInternalError(final MyNakadiRuntimeException1 exception,
final NativeWebRequest request) {
LOG.error("Unexpected problem occurred", exception);
return Responses.create(Response.Status.INTERNAL_SERVER_ERROR, exception.getMessage(), request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.zalando.nakadi.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.Errors;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.util.UriComponents;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.domain.SubscriptionBase;
import org.zalando.nakadi.exceptions.runtime.DuplicatedSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.InconsistentStateException;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.exceptions.runtime.NoEventTypeException;
import org.zalando.nakadi.exceptions.runtime.NoSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.TooManyPartitionsException;
import org.zalando.nakadi.exceptions.runtime.WrongInitialCursorsException;
import org.zalando.nakadi.plugin.api.ApplicationService;
import org.zalando.nakadi.problem.ValidationProblem;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.subscription.SubscriptionService;
import org.zalando.nakadi.util.FeatureToggleService;
import org.zalando.problem.MoreStatus;
import org.zalando.problem.Problem;
import org.zalando.problem.spring.web.advice.Responses;

import javax.validation.Valid;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;

import static org.springframework.http.HttpStatus.NOT_IMPLEMENTED;
import static org.springframework.http.HttpStatus.OK;
import static org.zalando.nakadi.util.FeatureToggleService.Feature.CHECK_OWNING_APPLICATION;
import static org.zalando.nakadi.util.FeatureToggleService.Feature.DISABLE_SUBSCRIPTION_CREATION;
import static org.zalando.nakadi.util.FeatureToggleService.Feature.HIGH_LEVEL_API;


@RestController
public class PostSubscriptionController {

private static final Logger LOG = LoggerFactory.getLogger(PostSubscriptionController.class);

private final FeatureToggleService featureToggleService;
private final ApplicationService applicationService;
private final SubscriptionService subscriptionService;

@Autowired
public PostSubscriptionController(final FeatureToggleService featureToggleService,
final ApplicationService applicationService,
final SubscriptionService subscriptionService) {
this.featureToggleService = featureToggleService;
this.applicationService = applicationService;
this.subscriptionService = subscriptionService;
}

@RequestMapping(value = "/subscriptions", method = RequestMethod.POST)
public ResponseEntity<?> createOrGetSubscription(@Valid @RequestBody final SubscriptionBase subscriptionBase,
final Errors errors,
final NativeWebRequest request,
final Client client) {
if (!featureToggleService.isFeatureEnabled(HIGH_LEVEL_API)) {
return new ResponseEntity<>(NOT_IMPLEMENTED);
}
if (errors.hasErrors()) {
return Responses.create(new ValidationProblem(errors), request);
}
if (featureToggleService.isFeatureEnabled(CHECK_OWNING_APPLICATION)
&& !applicationService.exists(subscriptionBase.getOwningApplication())) {
return Responses.create(Problem.valueOf(MoreStatus.UNPROCESSABLE_ENTITY,
"owning_application doesn't exist"), request);
}

try {
return ok(subscriptionService.getExistingSubscription(subscriptionBase));
} catch (final NoSubscriptionException e) {
if (featureToggleService.isFeatureEnabled(DISABLE_SUBSCRIPTION_CREATION)) {
return Responses.create(Response.Status.SERVICE_UNAVAILABLE,
"Subscription creation is temporarily unavailable", request);
}
try {
final Subscription subscription = subscriptionService.createSubscription(subscriptionBase, client);
return prepareLocationResponse(subscription);
} catch (final DuplicatedSubscriptionException ex) {
throw new InconsistentStateException("Unexpected problem occurred when creating subscription", ex);
}
}
}

private ResponseEntity<?> ok(final Subscription existingSubscription) {
final UriComponents location = subscriptionService.getSubscriptionUri(existingSubscription);
return ResponseEntity.status(OK).location(location.toUri()).body(existingSubscription);
}

private ResponseEntity<?> prepareLocationResponse(final Subscription subscription) {
final UriComponents location = subscriptionService.getSubscriptionUri(subscription);
return ResponseEntity.status(HttpStatus.CREATED)
.location(location.toUri())
.header(HttpHeaders.CONTENT_LOCATION, location.toString())
.body(subscription);
}

@ExceptionHandler({
NoEventTypeException.class,
WrongInitialCursorsException.class,
TooManyPartitionsException.class})
public ResponseEntity<Problem> handleUnprocessableSubscription(final MyNakadiRuntimeException1 exception,
final NativeWebRequest request) {
LOG.debug("Error occurred when working with subscriptions", exception);
return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, exception.getMessage(), request);
}

}
Loading

0 comments on commit 8c750b1

Please sign in to comment.