diff --git a/CHANGELOG.md b/CHANGELOG.md index 41d527e4be..17c9126b5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Fixed +- Added validation of offsets availability when resetting subscription cursors. + ## [2.0.0] - 2017-08-09 ### Changed diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index 850358a474..ae37f08f89 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -20,10 +20,14 @@ import org.zalando.nakadi.utils.RandomSubscriptionBuilder; import org.zalando.nakadi.view.Cursor; import org.zalando.nakadi.view.SubscriptionCursor; +import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; import org.zalando.nakadi.webservice.BaseAT; import org.zalando.nakadi.webservice.SettingsControllerAT; import org.zalando.nakadi.webservice.utils.NakadiTestUtils; import org.zalando.nakadi.webservice.utils.TestStreamingClient; +import org.zalando.problem.MoreStatus; +import org.zalando.problem.Problem; +import org.zalando.problem.ThrowableProblem; import java.io.IOException; import java.util.Collections; @@ -38,6 +42,7 @@ import static org.apache.http.HttpStatus.SC_CONFLICT; import static org.apache.http.HttpStatus.SC_NO_CONTENT; import static org.apache.http.HttpStatus.SC_OK; +import static org.apache.http.HttpStatus.SC_UNPROCESSABLE_ENTITY; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -435,4 +440,23 @@ public void whenResetCursorsThenStreamFromResetCursorOffset() throws Exception { Assert.assertEquals("000000000000000005", client2.getBatches().get(0).getCursor().getOffset()); } + + @Test(timeout = 15000) + public void whenResetCursorsWithOffsetOverflowThen422() throws Exception { + publishEvents(eventType.getName(), 5, i -> "{\"foo\":\"bar\"}"); + + final SubscriptionCursorWithoutToken resetCursor = + new SubscriptionCursorWithoutToken(eventType.getName(), "0", "000000000000000007"); + + final ThrowableProblem expectedProblem = Problem.valueOf(MoreStatus.UNPROCESSABLE_ENTITY, + "offset 000000000000000007 for partition 0 is unavailable"); + + given() + .body(MAPPER.writeValueAsString(new ItemsWrapper<>(ImmutableList.of(resetCursor)))) + .contentType(JSON) + .patch("/subscriptions/{id}/cursors", subscription.getId()) + .then() + .statusCode(SC_UNPROCESSABLE_ENTITY) + .body(JSON_TEST_HELPER.matchesObject(expectedProblem)); + } } diff --git a/src/main/java/org/zalando/nakadi/controller/CursorsController.java b/src/main/java/org/zalando/nakadi/controller/CursorsController.java index 4e81f28dba..251776df08 100644 --- a/src/main/java/org/zalando/nakadi/controller/CursorsController.java +++ b/src/main/java/org/zalando/nakadi/controller/CursorsController.java @@ -22,6 +22,7 @@ import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.UnableProcessException; +import org.zalando.nakadi.exceptions.runtime.CursorUnavailableException; import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException; import org.zalando.nakadi.exceptions.runtime.RequestInProgressException; import org.zalando.nakadi.problem.ValidationProblem; @@ -169,8 +170,10 @@ public ResponseEntity handleInvalidStreamId(final InvalidStreamIdExcept return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, ex.getMessage(), request); } - @ExceptionHandler(UnableProcessException.class) - public ResponseEntity handleUnableProcessException(final UnableProcessException ex, + @ExceptionHandler({ + UnableProcessException.class, + CursorUnavailableException.class}) + public ResponseEntity handleUnableProcessException(final RuntimeException ex, final NativeWebRequest request) { LOG.debug(ex.getMessage(), ex); return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, ex.getMessage(), request); diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/CursorUnavailableException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/CursorUnavailableException.java new file mode 100644 index 0000000000..71eec5f76a --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/CursorUnavailableException.java @@ -0,0 +1,7 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class CursorUnavailableException extends MyNakadiRuntimeException1 { + public CursorUnavailableException(final String message, final Exception e) { + super(message, e); + } +} diff --git a/src/main/java/org/zalando/nakadi/service/CursorsService.java b/src/main/java/org/zalando/nakadi/service/CursorsService.java index 376d704d3d..0f2bbf4334 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -17,9 +17,11 @@ import org.zalando.nakadi.exceptions.NoSuchSubscriptionException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.UnableProcessException; +import org.zalando.nakadi.exceptions.runtime.CursorUnavailableException; import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; import org.zalando.nakadi.repository.EventTypeRepository; +import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.subscription.model.Partition; @@ -77,7 +79,7 @@ public List commitCursors(final String streamId, final String subscript final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId); TimeLogger.addMeasure("validateSubscriptionCursors"); - validateSubscriptionCursors(subscription, cursors); + validateSubscriptionCommitCursors(subscription, cursors); TimeLogger.addMeasure("createSubscriptionClient"); final ZkSubscriptionClient zkClient = zkSubscriptionFactory.createClient( @@ -137,14 +139,14 @@ public List getSubscriptionCursors(final String } public void resetCursors(final String subscriptionId, final List cursors, final Client client) - throws ServiceUnavailableException, NoSuchSubscriptionException, + throws ServiceUnavailableException, NoSuchSubscriptionException, CursorUnavailableException, UnableProcessException, OperationTimeoutException, ZookeeperException, InternalNakadiException, NoSuchEventTypeException { if (cursors.isEmpty()) { throw new UnableProcessException("Cursors are absent"); } final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId); - validateSubscriptionCursors(subscription, cursors); + validateSubscriptionResetCursors(subscription, cursors); final ZkSubscriptionClient zkClient = zkSubscriptionFactory.createClient( subscription, "subscription." + subscriptionId + ".reset_cursors"); @@ -156,15 +158,9 @@ public void resetCursors(final String subscriptionId, final List c timeout); } - private void validateSubscriptionCursors(final Subscription subscription, final List cursors) - throws ServiceUnavailableException, NoSuchSubscriptionException, UnableProcessException { - final List wrongEventTypes = cursors.stream() - .map(NakadiCursor::getEventType) - .filter(et -> !subscription.getEventTypes().contains(et)) - .collect(Collectors.toList()); - if (!wrongEventTypes.isEmpty()) { - throw new UnableProcessException("Event type does not belong to subscription: " + wrongEventTypes); - } + private void validateSubscriptionCommitCursors(final Subscription subscription, final List cursors) + throws ServiceUnavailableException, UnableProcessException { + validateCursorsBelongToSubscription(subscription, cursors); cursors.forEach(cursor -> { try { @@ -175,6 +171,35 @@ private void validateSubscriptionCursors(final Subscription subscription, final }); } + private void validateSubscriptionResetCursors(final Subscription subscription, final List cursors) + throws ServiceUnavailableException, CursorUnavailableException { + validateCursorsBelongToSubscription(subscription, cursors); + + final Map> cursorsByRepo = cursors.stream() + .collect(Collectors.groupingBy(c -> timelineService.getTopicRepository(c.getTimeline()))); + + for (final Map.Entry> repoEntry : cursorsByRepo.entrySet()) { + final TopicRepository topicRepository = repoEntry.getKey(); + final List cursorsForRepo = repoEntry.getValue(); + try { + topicRepository.validateReadCursors(cursorsForRepo); + } catch (final InvalidCursorException e) { + throw new CursorUnavailableException(e.getMessage(), e); + } + } + } + + private void validateCursorsBelongToSubscription(final Subscription subscription, final List cursors) + throws UnableProcessException { + final List wrongEventTypes = cursors.stream() + .map(NakadiCursor::getEventType) + .filter(et -> !subscription.getEventTypes().contains(et)) + .collect(Collectors.toList()); + if (!wrongEventTypes.isEmpty()) { + throw new UnableProcessException("Event type does not belong to subscription: " + wrongEventTypes); + } + } + private class SubscriptionCursorComparator implements Comparator { private final Map cached = new HashMap<>();