From 6cf632b636d2c915352ccf693b3906d3b1703e16 Mon Sep 17 00:00:00 2001 From: Vyacheslav Stepanov Date: Thu, 3 Aug 2017 11:20:14 +0200 Subject: [PATCH 1/5] ARUHA-961: added validation of cursors availability when resetting cursors; --- .../nakadi/webservice/hila/HilaAT.java | 72 +++++++++++++------ .../nakadi/service/CursorsService.java | 44 +++++++++--- 2 files changed, 83 insertions(+), 33 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index 9972b350db..8a7c57442e 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -5,25 +5,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import static com.jayway.restassured.RestAssured.given; -import static com.jayway.restassured.RestAssured.when; -import static com.jayway.restassured.http.ContentType.JSON; -import java.io.IOException; -import static java.text.MessageFormat.format; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.http.HttpStatus; -import static org.apache.http.HttpStatus.SC_CONFLICT; -import static org.apache.http.HttpStatus.SC_NO_CONTENT; -import static org.apache.http.HttpStatus.SC_OK; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.not; import org.hamcrest.core.StringContains; import org.junit.Assert; import org.junit.Before; @@ -33,26 +15,51 @@ import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; -import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN; -import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.utils.JsonTestHelper; import org.zalando.nakadi.utils.RandomSubscriptionBuilder; -import static org.zalando.nakadi.utils.TestUtils.waitFor; import org.zalando.nakadi.view.Cursor; import org.zalando.nakadi.view.SubscriptionCursor; +import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; import org.zalando.nakadi.webservice.BaseAT; import org.zalando.nakadi.webservice.SettingsControllerAT; +import org.zalando.nakadi.webservice.utils.NakadiTestUtils; +import org.zalando.nakadi.webservice.utils.TestStreamingClient; +import org.zalando.problem.MoreStatus; +import org.zalando.problem.Problem; +import org.zalando.problem.ThrowableProblem; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static com.jayway.restassured.RestAssured.given; +import static com.jayway.restassured.RestAssured.when; +import static com.jayway.restassured.http.ContentType.JSON; +import static java.text.MessageFormat.format; +import static org.apache.http.HttpStatus.SC_CONFLICT; +import static org.apache.http.HttpStatus.SC_NO_CONTENT; +import static org.apache.http.HttpStatus.SC_OK; +import static org.apache.http.HttpStatus.SC_UNPROCESSABLE_ENTITY; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; +import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN; +import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END; +import static org.zalando.nakadi.utils.TestUtils.waitFor; import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken; import static org.zalando.nakadi.webservice.hila.StreamBatch.singleEventBatch; -import org.zalando.nakadi.webservice.utils.NakadiTestUtils; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvent; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents; -import org.zalando.nakadi.webservice.utils.TestStreamingClient; import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN; public class HilaAT extends BaseAT { @@ -434,4 +441,23 @@ public void whenResetCursorsThenStreamFromResetCursorOffset() throws Exception { Assert.assertEquals("000000000000000005", client2.getBatches().get(0).getCursor().getOffset()); } + + @Test(timeout = 15000) + public void whenResetCursorsWithOffsetOverflowThen422() throws Exception { + publishEvents(eventType.getName(), 5, i -> "{\"foo\":\"bar\"}"); + + final SubscriptionCursorWithoutToken resetCursor = + new SubscriptionCursorWithoutToken(eventType.getName(), "0", "000000000000000007"); + + final ThrowableProblem expectedProblem = Problem.valueOf(MoreStatus.UNPROCESSABLE_ENTITY, + "offset 000000000000000007 for partition 0 is unavailable"); + + given() + .body(MAPPER.writeValueAsString(new ItemsWrapper<>(ImmutableList.of(resetCursor)))) + .contentType(JSON) + .patch("/subscriptions/{id}/cursors", subscription.getId()) + .then() + .statusCode(SC_UNPROCESSABLE_ENTITY) + .body(JSON_TEST_HELPER.matchesObject(expectedProblem)); + } } diff --git a/src/main/java/org/zalando/nakadi/service/CursorsService.java b/src/main/java/org/zalando/nakadi/service/CursorsService.java index 253e3e940c..1d1ab1e888 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -22,6 +22,7 @@ import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; import org.zalando.nakadi.repository.EventTypeRepository; +import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.subscription.model.Partition; @@ -80,7 +81,7 @@ public List commitCursors(final String streamId, final String subscript final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId); TimeLogger.addMeasure("validateSubscriptionCursors"); - validateSubscriptionCursors(subscription, cursors); + validateSubscriptionCommitCursors(subscription, cursors); TimeLogger.addMeasure("validateSubscriptionReadScopes"); validateSubscriptionReadScopes(subscription, client); @@ -151,7 +152,7 @@ public void resetCursors(final String subscriptionId, final List c throw new UnableProcessException("Cursors are absent"); } final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId); - validateSubscriptionCursors(subscription, cursors); + validateSubscriptionResetCursors(subscription, cursors); validateSubscriptionReadScopes(subscription, client); final ZkSubscriptionClient zkClient = zkSubscriptionFactory.createClient( @@ -164,15 +165,9 @@ public void resetCursors(final String subscriptionId, final List c timeout); } - private void validateSubscriptionCursors(final Subscription subscription, final List cursors) + private void validateSubscriptionCommitCursors(final Subscription subscription, final List cursors) throws ServiceUnavailableException, NoSuchSubscriptionException, UnableProcessException { - final List wrongEventTypes = cursors.stream() - .map(NakadiCursor::getEventType) - .filter(et -> !subscription.getEventTypes().contains(et)) - .collect(Collectors.toList()); - if (!wrongEventTypes.isEmpty()) { - throw new UnableProcessException("Event type does not belong to subscription: " + wrongEventTypes); - } + validateCursorsBelongToSubscription(subscription, cursors); cursors.forEach(cursor -> { try { @@ -183,6 +178,35 @@ private void validateSubscriptionCursors(final Subscription subscription, final }); } + private void validateSubscriptionResetCursors(final Subscription subscription, final List cursors) + throws ServiceUnavailableException, NoSuchSubscriptionException, UnableProcessException { + validateCursorsBelongToSubscription(subscription, cursors); + + final Map> cursorsByRepo = cursors.stream() + .collect(Collectors.groupingBy(c -> timelineService.getTopicRepository(c.getTimeline()))); + + for (final Map.Entry> repoEntry : cursorsByRepo.entrySet()) { + final TopicRepository topicRepository = repoEntry.getKey(); + final List cursorsForRepo = repoEntry.getValue(); + try { + topicRepository.validateReadCursors(cursorsForRepo); + } catch (final InvalidCursorException e) { + throw new UnableProcessException(e.getMessage(), e); + } + } + } + + private void validateCursorsBelongToSubscription(final Subscription subscription, final List cursors) + throws UnableProcessException { + final List wrongEventTypes = cursors.stream() + .map(NakadiCursor::getEventType) + .filter(et -> !subscription.getEventTypes().contains(et)) + .collect(Collectors.toList()); + if (!wrongEventTypes.isEmpty()) { + throw new UnableProcessException("Event type does not belong to subscription: " + wrongEventTypes); + } + } + private void validateSubscriptionReadScopes(final Subscription subscription, final Client client) throws ServiceUnavailableException, NoSuchSubscriptionException, IllegalScopeException { subscription.getEventTypes().stream().map(Try.wrap(eventTypeRepository::findByName)) From 69849d351bc198d3e71cdfc1851f94df2c2538e8 Mon Sep 17 00:00:00 2001 From: Vyacheslav Stepanov Date: Thu, 3 Aug 2017 12:36:28 +0200 Subject: [PATCH 2/5] ARUHA-961: updated changelog; --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e5d16d5e9..157251b126 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Fixed +- Added validation of offsets availability when resetting subscription cursors. + ## [1.1.1] - 2017-07-26 ### Fixed From 6ae58153c46834ec1246604bec6cb86369cbfe49 Mon Sep 17 00:00:00 2001 From: Vyacheslav Stepanov Date: Fri, 4 Aug 2017 15:05:24 +0200 Subject: [PATCH 3/5] ARUHA-961: changed exception type; --- .../org/zalando/nakadi/controller/CursorsController.java | 5 ++++- .../exceptions/runtime/CursorUnavailableException.java | 7 +++++++ .../java/org/zalando/nakadi/service/CursorsService.java | 9 +++++---- 3 files changed, 16 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/zalando/nakadi/exceptions/runtime/CursorUnavailableException.java diff --git a/src/main/java/org/zalando/nakadi/controller/CursorsController.java b/src/main/java/org/zalando/nakadi/controller/CursorsController.java index 4e81f28dba..7cf20b09f6 100644 --- a/src/main/java/org/zalando/nakadi/controller/CursorsController.java +++ b/src/main/java/org/zalando/nakadi/controller/CursorsController.java @@ -22,6 +22,7 @@ import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.UnableProcessException; +import org.zalando.nakadi.exceptions.runtime.CursorUnavailableException; import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException; import org.zalando.nakadi.exceptions.runtime.RequestInProgressException; import org.zalando.nakadi.problem.ValidationProblem; @@ -169,7 +170,9 @@ public ResponseEntity handleInvalidStreamId(final InvalidStreamIdExcept return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, ex.getMessage(), request); } - @ExceptionHandler(UnableProcessException.class) + @ExceptionHandler({ + UnableProcessException.class, + CursorUnavailableException.class}) public ResponseEntity handleUnableProcessException(final UnableProcessException ex, final NativeWebRequest request) { LOG.debug(ex.getMessage(), ex); diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/CursorUnavailableException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/CursorUnavailableException.java new file mode 100644 index 0000000000..71eec5f76a --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/CursorUnavailableException.java @@ -0,0 +1,7 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class CursorUnavailableException extends MyNakadiRuntimeException1 { + public CursorUnavailableException(final String message, final Exception e) { + super(message, e); + } +} diff --git a/src/main/java/org/zalando/nakadi/service/CursorsService.java b/src/main/java/org/zalando/nakadi/service/CursorsService.java index 1d1ab1e888..ee2bd4d3e6 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -19,6 +19,7 @@ import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.Try; import org.zalando.nakadi.exceptions.UnableProcessException; +import org.zalando.nakadi.exceptions.runtime.CursorUnavailableException; import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; import org.zalando.nakadi.repository.EventTypeRepository; @@ -145,7 +146,7 @@ public List getSubscriptionCursors(final String } public void resetCursors(final String subscriptionId, final List cursors, final Client client) - throws ServiceUnavailableException, NoSuchSubscriptionException, + throws ServiceUnavailableException, NoSuchSubscriptionException, CursorUnavailableException, UnableProcessException, IllegalScopeException, OperationTimeoutException, ZookeeperException, InternalNakadiException, NoSuchEventTypeException { if (cursors.isEmpty()) { @@ -166,7 +167,7 @@ public void resetCursors(final String subscriptionId, final List c } private void validateSubscriptionCommitCursors(final Subscription subscription, final List cursors) - throws ServiceUnavailableException, NoSuchSubscriptionException, UnableProcessException { + throws ServiceUnavailableException, UnableProcessException { validateCursorsBelongToSubscription(subscription, cursors); cursors.forEach(cursor -> { @@ -179,7 +180,7 @@ private void validateSubscriptionCommitCursors(final Subscription subscription, } private void validateSubscriptionResetCursors(final Subscription subscription, final List cursors) - throws ServiceUnavailableException, NoSuchSubscriptionException, UnableProcessException { + throws ServiceUnavailableException, CursorUnavailableException { validateCursorsBelongToSubscription(subscription, cursors); final Map> cursorsByRepo = cursors.stream() @@ -191,7 +192,7 @@ private void validateSubscriptionResetCursors(final Subscription subscription, f try { topicRepository.validateReadCursors(cursorsForRepo); } catch (final InvalidCursorException e) { - throw new UnableProcessException(e.getMessage(), e); + throw new CursorUnavailableException(e.getMessage(), e); } } } From 9683c3c0ff6cba01d265553359ce649c67ac8945 Mon Sep 17 00:00:00 2001 From: Vyacheslav Stepanov Date: Fri, 4 Aug 2017 15:47:25 +0200 Subject: [PATCH 4/5] ARUHA-961: fixed exception handler; --- .../java/org/zalando/nakadi/controller/CursorsController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/zalando/nakadi/controller/CursorsController.java b/src/main/java/org/zalando/nakadi/controller/CursorsController.java index 7cf20b09f6..251776df08 100644 --- a/src/main/java/org/zalando/nakadi/controller/CursorsController.java +++ b/src/main/java/org/zalando/nakadi/controller/CursorsController.java @@ -173,7 +173,7 @@ public ResponseEntity handleInvalidStreamId(final InvalidStreamIdExcept @ExceptionHandler({ UnableProcessException.class, CursorUnavailableException.class}) - public ResponseEntity handleUnableProcessException(final UnableProcessException ex, + public ResponseEntity handleUnableProcessException(final RuntimeException ex, final NativeWebRequest request) { LOG.debug(ex.getMessage(), ex); return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, ex.getMessage(), request); From 5587e997b48a6eed1e967b4da2f5aa24a67472e4 Mon Sep 17 00:00:00 2001 From: Vyacheslav Stepanov Date: Fri, 4 Aug 2017 16:01:05 +0200 Subject: [PATCH 5/5] ARUHA-961: merge with master; --- .../zalando/nakadi/webservice/hila/HilaAT.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index 7432ac7262..ae37f08f89 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -4,17 +4,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.http.HttpStatus; -import static org.apache.http.HttpStatus.SC_CONFLICT; -import static org.apache.http.HttpStatus.SC_NO_CONTENT; -import static org.apache.http.HttpStatus.SC_OK; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.not; import org.hamcrest.core.StringContains; import org.junit.Assert; import org.junit.Before; @@ -24,13 +14,10 @@ import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; -import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN; -import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.utils.JsonTestHelper; import org.zalando.nakadi.utils.RandomSubscriptionBuilder; -import static org.zalando.nakadi.utils.TestUtils.waitFor; import org.zalando.nakadi.view.Cursor; import org.zalando.nakadi.view.SubscriptionCursor; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -67,13 +54,11 @@ import static org.zalando.nakadi.utils.TestUtils.waitFor; import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken; import static org.zalando.nakadi.webservice.hila.StreamBatch.singleEventBatch; -import org.zalando.nakadi.webservice.utils.NakadiTestUtils; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvent; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents; -import org.zalando.nakadi.webservice.utils.TestStreamingClient; import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN; public class HilaAT extends BaseAT {