Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #724 from zalando/aruha-961-check-reset-cursors
Browse files Browse the repository at this point in the history
ARUHA-961: added validation of cursors availability when resetting cursors
  • Loading branch information
v-stepanov authored Aug 11, 2017
2 parents 6fd208b + c41e4e6 commit e040eed
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 14 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Fixed
- Added validation of offsets availability when resetting subscription cursors.

## [2.0.0] - 2017-08-09

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import org.zalando.nakadi.utils.RandomSubscriptionBuilder;
import org.zalando.nakadi.view.Cursor;
import org.zalando.nakadi.view.SubscriptionCursor;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;
import org.zalando.nakadi.webservice.BaseAT;
import org.zalando.nakadi.webservice.SettingsControllerAT;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;
import org.zalando.nakadi.webservice.utils.TestStreamingClient;
import org.zalando.problem.MoreStatus;
import org.zalando.problem.Problem;
import org.zalando.problem.ThrowableProblem;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -38,6 +42,7 @@
import static org.apache.http.HttpStatus.SC_CONFLICT;
import static org.apache.http.HttpStatus.SC_NO_CONTENT;
import static org.apache.http.HttpStatus.SC_OK;
import static org.apache.http.HttpStatus.SC_UNPROCESSABLE_ENTITY;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -435,4 +440,23 @@ public void whenResetCursorsThenStreamFromResetCursorOffset() throws Exception {

Assert.assertEquals("000000000000000005", client2.getBatches().get(0).getCursor().getOffset());
}

@Test(timeout = 15000)
public void whenResetCursorsWithOffsetOverflowThen422() throws Exception {
publishEvents(eventType.getName(), 5, i -> "{\"foo\":\"bar\"}");

final SubscriptionCursorWithoutToken resetCursor =
new SubscriptionCursorWithoutToken(eventType.getName(), "0", "000000000000000007");

final ThrowableProblem expectedProblem = Problem.valueOf(MoreStatus.UNPROCESSABLE_ENTITY,
"offset 000000000000000007 for partition 0 is unavailable");

given()
.body(MAPPER.writeValueAsString(new ItemsWrapper<>(ImmutableList.of(resetCursor))))
.contentType(JSON)
.patch("/subscriptions/{id}/cursors", subscription.getId())
.then()
.statusCode(SC_UNPROCESSABLE_ENTITY)
.body(JSON_TEST_HELPER.matchesObject(expectedProblem));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.CursorUnavailableException;
import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException;
import org.zalando.nakadi.exceptions.runtime.RequestInProgressException;
import org.zalando.nakadi.problem.ValidationProblem;
Expand Down Expand Up @@ -169,8 +170,10 @@ public ResponseEntity<Problem> handleInvalidStreamId(final InvalidStreamIdExcept
return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, ex.getMessage(), request);
}

@ExceptionHandler(UnableProcessException.class)
public ResponseEntity<Problem> handleUnableProcessException(final UnableProcessException ex,
@ExceptionHandler({
UnableProcessException.class,
CursorUnavailableException.class})
public ResponseEntity<Problem> handleUnableProcessException(final RuntimeException ex,
final NativeWebRequest request) {
LOG.debug(ex.getMessage(), ex);
return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, ex.getMessage(), request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.zalando.nakadi.exceptions.runtime;

public class CursorUnavailableException extends MyNakadiRuntimeException1 {
public CursorUnavailableException(final String message, final Exception e) {
super(message, e);
}
}
49 changes: 37 additions & 12 deletions src/main/java/org/zalando/nakadi/service/CursorsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import org.zalando.nakadi.exceptions.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.CursorUnavailableException;
import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.subscription.model.Partition;
Expand Down Expand Up @@ -77,7 +79,7 @@ public List<Boolean> commitCursors(final String streamId, final String subscript
final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId);

TimeLogger.addMeasure("validateSubscriptionCursors");
validateSubscriptionCursors(subscription, cursors);
validateSubscriptionCommitCursors(subscription, cursors);

TimeLogger.addMeasure("createSubscriptionClient");
final ZkSubscriptionClient zkClient = zkSubscriptionFactory.createClient(
Expand Down Expand Up @@ -137,14 +139,14 @@ public List<SubscriptionCursorWithoutToken> getSubscriptionCursors(final String
}

public void resetCursors(final String subscriptionId, final List<NakadiCursor> cursors, final Client client)
throws ServiceUnavailableException, NoSuchSubscriptionException,
throws ServiceUnavailableException, NoSuchSubscriptionException, CursorUnavailableException,
UnableProcessException, OperationTimeoutException, ZookeeperException,
InternalNakadiException, NoSuchEventTypeException {
if (cursors.isEmpty()) {
throw new UnableProcessException("Cursors are absent");
}
final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId);
validateSubscriptionCursors(subscription, cursors);
validateSubscriptionResetCursors(subscription, cursors);

final ZkSubscriptionClient zkClient = zkSubscriptionFactory.createClient(
subscription, "subscription." + subscriptionId + ".reset_cursors");
Expand All @@ -156,15 +158,9 @@ public void resetCursors(final String subscriptionId, final List<NakadiCursor> c
timeout);
}

private void validateSubscriptionCursors(final Subscription subscription, final List<NakadiCursor> cursors)
throws ServiceUnavailableException, NoSuchSubscriptionException, UnableProcessException {
final List<String> wrongEventTypes = cursors.stream()
.map(NakadiCursor::getEventType)
.filter(et -> !subscription.getEventTypes().contains(et))
.collect(Collectors.toList());
if (!wrongEventTypes.isEmpty()) {
throw new UnableProcessException("Event type does not belong to subscription: " + wrongEventTypes);
}
private void validateSubscriptionCommitCursors(final Subscription subscription, final List<NakadiCursor> cursors)
throws ServiceUnavailableException, UnableProcessException {
validateCursorsBelongToSubscription(subscription, cursors);

cursors.forEach(cursor -> {
try {
Expand All @@ -175,6 +171,35 @@ private void validateSubscriptionCursors(final Subscription subscription, final
});
}

private void validateSubscriptionResetCursors(final Subscription subscription, final List<NakadiCursor> cursors)
throws ServiceUnavailableException, CursorUnavailableException {
validateCursorsBelongToSubscription(subscription, cursors);

final Map<TopicRepository, List<NakadiCursor>> cursorsByRepo = cursors.stream()
.collect(Collectors.groupingBy(c -> timelineService.getTopicRepository(c.getTimeline())));

for (final Map.Entry<TopicRepository, List<NakadiCursor>> repoEntry : cursorsByRepo.entrySet()) {
final TopicRepository topicRepository = repoEntry.getKey();
final List<NakadiCursor> cursorsForRepo = repoEntry.getValue();
try {
topicRepository.validateReadCursors(cursorsForRepo);
} catch (final InvalidCursorException e) {
throw new CursorUnavailableException(e.getMessage(), e);
}
}
}

private void validateCursorsBelongToSubscription(final Subscription subscription, final List<NakadiCursor> cursors)
throws UnableProcessException {
final List<String> wrongEventTypes = cursors.stream()
.map(NakadiCursor::getEventType)
.filter(et -> !subscription.getEventTypes().contains(et))
.collect(Collectors.toList());
if (!wrongEventTypes.isEmpty()) {
throw new UnableProcessException("Event type does not belong to subscription: " + wrongEventTypes);
}
}

private class SubscriptionCursorComparator implements Comparator<SubscriptionCursorWithoutToken> {
private final Map<SubscriptionCursorWithoutToken, NakadiCursor> cached = new HashMap<>();

Expand Down

0 comments on commit e040eed

Please sign in to comment.