diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java index b8a4c8391a..47a1b906f1 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java @@ -8,6 +8,7 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.http.HttpStatus; +import org.json.JSONObject; import org.junit.Assert; import org.junit.Test; import org.springframework.core.io.DefaultResourceLoader; @@ -33,7 +34,6 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.List; -import java.util.Map; import static com.jayway.restassured.RestAssured.given; import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN; @@ -119,10 +119,10 @@ public void shouldPublishAvroAndConsumeJsonAndAvro() throws IOException { final TestStreamingClient client1 = TestStreamingClient.create(subscription1.getId()).start(); TestUtils.waitFor(() -> Assert.assertEquals(1, client1.getJsonBatches().size())); - final Map jsonEvent = client1.getJsonBatches().get(0).getEvents().get(0); + final JSONObject jsonEvent = client1.getJsonBatches().get(0).getEvents().get(0); Assert.assertEquals("bar", jsonEvent.get("foo")); - final Map metadata = (Map) jsonEvent.get("metadata"); + final JSONObject metadata = jsonEvent.getJSONObject("metadata"); Assert.assertEquals("CE8C9EBC-3F19-4B9D-A453-08AD2EDA6028", metadata.get("eid")); Assert.assertEquals("2.0.0", metadata.get("version")); Assert.assertEquals(testETName, metadata.get("event_type")); diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEventPublisherAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEventPublisherAT.java index db7eb7e82e..8cfcf951ed 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEventPublisherAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEventPublisherAT.java @@ -3,6 +3,7 @@ import org.apache.http.HttpStatus; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; +import org.json.JSONObject; import org.junit.Assert; import org.junit.Test; import org.zalando.nakadi.domain.Subscription; @@ -12,7 +13,6 @@ import java.io.IOException; import java.util.List; -import java.util.Map; import static com.jayway.restassured.RestAssured.given; @@ -43,7 +43,7 @@ public void testNakadiAccessLogInAvro() throws Exception { // So the test is only looking for a valid event. Assert.assertEquals( NAKADI_ACCESS_LOG, - ((Map) event.get("metadata")).get("event_type")); + event.getJSONObject("metadata").get("event_type")); Assert.assertNotNull(event.get("method")); Assert.assertNotNull(event.get("path")); Assert.assertNotNull(event.get("query")); @@ -56,7 +56,7 @@ public void testNakadiAccessLogInAvro() throws Exception { Assert.assertNotNull(event.get("content_encoding")); Assert.assertNotNull(event.get("request_length")); Assert.assertNotNull(event.get("response_length")); - Assert.assertNull(event.get("random_key")); + Assert.assertFalse(event.has("random_key")); } @Test @@ -75,7 +75,7 @@ public void testNakadiSubscriptionLogInAvro() throws Exception { // So the test is only looking for a valid event. Assert.assertEquals( NAKADI_SUBSCRIPTION_LOG, - ((Map) event.get("metadata")).get("event_type")); + event.getJSONObject("metadata").get("event_type")); Assert.assertEquals("created", event.get("status")); Assert.assertNotNull(event.get("subscription_id")); } @@ -96,7 +96,7 @@ public void testNakadiEventTypeLogInAvro() throws Exception { // So the test is only looking for a valid event. Assert.assertEquals( NAKADI_EVENT_TYPE_LOG, - ((Map) event.get("metadata")).get("event_type")); + event.getJSONObject("metadata").get("event_type")); Assert.assertNotNull(event.get("event_type")); Assert.assertNotNull(event.get("status")); Assert.assertNotNull(event.get("category")); @@ -120,7 +120,7 @@ public void testNakadiBatchPublishedInAvro() throws Exception { // So the test is only looking for a valid event. Assert.assertEquals( NAKADI_BATCH_PUBLISHED, - ((Map) event.get("metadata")).get("event_type")); + event.getJSONObject("metadata").get("event_type")); Assert.assertNotNull(event.get("event_type")); Assert.assertNotNull(event.get("app")); Assert.assertNotNull(event.get("app_hashed")); @@ -145,7 +145,7 @@ public void testNakadiDataStreamedInAvro() throws Exception { final var event = events.get(0); // All acceptance tests are run against same instance, so the exact event that is consumed is unpredictable. // So the test is only looking for a valid event. - final var metadata = (Map) event.get("metadata"); + final var metadata = event.getJSONObject("metadata"); Assert.assertEquals(NAKADI_DATA_STREAMED, metadata.get("event_type")); Assert.assertNotNull(metadata.get("occurred_at")); Assert.assertNotNull(metadata.get("received_at")); @@ -162,7 +162,7 @@ public void testNakadiDataStreamedInAvro() throws Exception { Assert.assertNotNull(event.get("batches_streamed")); } - private List consumeEvent(final TestStreamingClient client) { + private List consumeEvent(final TestStreamingClient client) { TestUtils.waitFor(() -> MatcherAssert.assertThat( client.getJsonBatches().size(), Matchers.greaterThanOrEqualTo(1)), 10000); return client.getJsonBatches().get(0).getEvents(); diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java index af9f4a1082..5b46afe7d2 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; import com.jayway.restassured.RestAssured; @@ -12,6 +11,7 @@ import com.jayway.restassured.response.Header; import com.jayway.restassured.specification.RequestSpecification; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; @@ -29,7 +29,6 @@ import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import static java.util.stream.LongStream.rangeClosed; @@ -51,7 +50,6 @@ import static org.zalando.nakadi.utils.TestUtils.randomUUID; import static org.zalando.nakadi.utils.TestUtils.randomValidEventTypeName; import static org.zalando.nakadi.utils.TestUtils.waitFor; -import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; @@ -351,11 +349,11 @@ public void userJourneyHila() throws InterruptedException, IOException { final SubscriptionCursor cursor = new SubscriptionCursor("0", TestUtils.toTimelineOffset(i), eventTypeName, ""); final StreamBatch expectedBatch = new StreamBatch(cursor, - ImmutableList.of(ImmutableMap.of("foo", "bar" + i)), + new JSONArray().put(new JSONObject().put("foo", "bar" + i)), i == 0 ? new StreamMetadata("Stream started") : null); final StreamBatch batch = batches.get(i); - assertThat(batch, equalToBatchIgnoringToken(expectedBatch)); + assertThat(batch, StreamBatch.equalToBatchIgnoringToken(expectedBatch)); } // as we didn't commit, there should be still 4 unconsumed events @@ -450,8 +448,9 @@ public void userJourneyAvroTransition() throws InterruptedException, IOException // validate the events metadata for (final StreamBatch batch : batches) { - final Map metadata = (Map) batch.getEvents().get(0).get("metadata"); - assertThat(metadata.get("version"), equalTo(validatedWithJsonSchemaVersion)); + assertThat( + batch.getEvents().get(0).getJSONObject("metadata").getString("version"), + equalTo(validatedWithJsonSchemaVersion)); } // delete subscription diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index 0afcb6d299..8d4649e7ef 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -1,21 +1,23 @@ package org.zalando.nakadi.webservice.hila; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.jayway.restassured.response.Response; import org.apache.http.HttpStatus; +import org.json.JSONObject; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.zalando.nakadi.annotations.validation.DeadLetterAnnotationValidator; import org.zalando.nakadi.config.JsonConfig; import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; +import org.zalando.nakadi.domain.UnprocessableEventPolicy; import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.util.ThreadUtils; import org.zalando.nakadi.utils.JsonTestHelper; @@ -35,6 +37,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static com.jayway.restassured.RestAssured.given; @@ -47,17 +50,20 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.zalando.nakadi.annotations.validation.DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT; +import static org.zalando.nakadi.annotations.validation.DeadLetterAnnotationValidator. + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY; import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN; import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END; import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.AUTO; import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.DIRECT; import static org.zalando.nakadi.utils.TestUtils.waitFor; -import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken; -import static org.zalando.nakadi.webservice.hila.StreamBatch.singleEventBatch; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; +import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscriptionForEventType; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.getNumberOfAssignedStreams; +import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishBusinessEventWithUserDefinedPartition; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvent; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents; import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN; @@ -102,7 +108,7 @@ public void whenStreamTimeoutReachedPossibleToCommit() throws Exception { @Test(timeout = 30000) public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartition() throws Exception { final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(1); - NakadiTestUtils.publishBusinessEventWithUserDefinedPartition( + publishBusinessEventWithUserDefinedPartition( eventType.getName(), 1, x -> "{\"foo\":\"bar\"}", p -> "0"); NakadiTestUtils.repartitionEventType(eventType, 2); final Subscription subscription = createSubscription( @@ -113,7 +119,7 @@ public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartiti final TestStreamingClient clientAfterRepartitioning = TestStreamingClient .create(URL, subscription.getId(), "") .start(); - NakadiTestUtils.publishBusinessEventWithUserDefinedPartition( + publishBusinessEventWithUserDefinedPartition( eventType.getName(), 1, x -> "{\"foo\":\"bar" + x + "\"}", p -> "1"); waitFor(() -> assertThat(clientAfterRepartitioning.getJsonBatches(), hasSize(2))); Assert.assertTrue(clientAfterRepartitioning.getJsonBatches().stream() @@ -148,11 +154,16 @@ public void whenOffsetIsCommittedNextSessionStartsFromNextEventAfterCommitted() .create(URL, subscription.getId(), "stream_limit=2") .start(); waitFor(() -> assertThat(client.getJsonBatches(), hasSize(2))); - assertThat(client.getJsonBatches().get(0), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000000", eventType.getName(), ImmutableMap.of("foo", "bar0"), - "Stream started"))); - assertThat(client.getJsonBatches().get(1), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000001", eventType.getName(), ImmutableMap.of("foo", "bar1")))); + assertThat( + client.getJsonBatches().get(0), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.singleEventBatch("0", "001-0001-000000000000000000", eventType.getName(), + new JSONObject().put("foo", "bar0"), "Stream started"))); + assertThat( + client.getJsonBatches().get(1), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.singleEventBatch("0", "001-0001-000000000000000001", eventType.getName(), + new JSONObject().put("foo", "bar1")))); // commit offset that will also trigger session closing as we reached stream_limit and committed commitCursors(subscription.getId(), ImmutableList.of(client.getJsonBatches().get(1).getCursor()), @@ -164,14 +175,18 @@ public void whenOffsetIsCommittedNextSessionStartsFromNextEventAfterCommitted() waitFor(() -> assertThat(client.getJsonBatches(), hasSize(2))); // check that we have read the next two events with correct offsets - assertThat(client.getJsonBatches().get(0), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000002", eventType.getName(), - ImmutableMap.of("foo", "bar2"), "Stream started"))); - assertThat(client.getJsonBatches().get(1), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000003", eventType.getName(), ImmutableMap.of("foo", "bar3")))); + assertThat( + client.getJsonBatches().get(0), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.singleEventBatch("0", "001-0001-000000000000000002", eventType.getName(), + new JSONObject().put("foo", "bar2"), "Stream started"))); + assertThat( + client.getJsonBatches().get(1), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.singleEventBatch("0", "001-0001-000000000000000003", eventType.getName(), + new JSONObject().put("foo", "bar3")))); } - @Test(timeout = 5000) public void whenNoEventsThenFirstOffsetIsBEGIN() { final TestStreamingClient client = TestStreamingClient @@ -248,8 +263,11 @@ public void whenCommitTimeoutReachedSessionIsClosed() { waitFor(() -> assertThat(client.getJsonBatches(), hasSize(2)), 10000); waitFor(() -> assertThat(client.isRunning(), is(false)), 10000); - assertThat(client.getJsonBatches().get(1), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000000", eventType.getName(), ImmutableMap.of(), "Commit timeout reached"))); + assertThat( + client.getJsonBatches().get(1), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.emptyBatch("0", "001-0001-000000000000000000", eventType.getName(), + "Commit timeout reached"))); } @Test(timeout = 15000) @@ -494,20 +512,15 @@ public void whenResetCursorsThenStreamFromResetCursorOffset() throws Exception { .start(); waitFor(() -> assertThat(client1.getJsonBatches(), hasSize(10))); - int statusCode = commitCursors( + final int statusCode = commitCursors( subscription.getId(), Collections.singletonList(client1.getJsonBatches().get(9).getCursor()), client1.getSessionId()); Assert.assertEquals(HttpStatus.SC_NO_CONTENT, statusCode); - final List resetCursors = + final List cursorsToReset = Collections.singletonList(client1.getJsonBatches().get(4).getCursor()); - statusCode = given() - .body(MAPPER.writeValueAsString(new ItemsWrapper<>(resetCursors))) - .contentType(JSON) - .patch("/subscriptions/{id}/cursors", subscription.getId()) - .getStatusCode(); - Assert.assertEquals(HttpStatus.SC_NO_CONTENT, statusCode); + resetCursorsWithToken(subscription.getId(), cursorsToReset); Assert.assertFalse(client1.isRunning()); Assert.assertTrue(client1.getJsonBatches().stream() .anyMatch(streamBatch -> streamBatch.getMetadata() != null @@ -531,12 +544,7 @@ public void whenPatchThenCursorsAreInitializedToDefault() throws Exception { .withEventType(et.getName()) .withStartFrom(END) .buildSubscriptionBase()); - given() - .body(MAPPER.writeValueAsString(new ItemsWrapper<>(Collections.emptyList()))) - .contentType(JSON) - .patch("/subscriptions/{id}/cursors", s.getId()) - .then() - .statusCode(HttpStatus.SC_NO_CONTENT); + resetCursors(s.getId(), List.of()); final ItemsWrapper subscriptionCursors = MAPPER.readValue( given().get("/subscriptions/{id}/cursors", s.getId()).getBody().asString(), @@ -572,15 +580,10 @@ public void whenPatchThenCursorsAreInitializedAndPatched() throws Exception { .withEventType(et.getName()) .withStartFrom(END) .buildSubscriptionBase()); - given() - .body(MAPPER.writeValueAsString(new ItemsWrapper<>(Collections.singletonList( - new SubscriptionCursorWithoutToken( - et.getName(), begin.getPartitionId(), begin.getOldestAvailableOffset()) - )))) - .contentType(JSON) - .patch("/subscriptions/{id}/cursors", s.getId()) - .then() - .statusCode(HttpStatus.SC_NO_CONTENT); + resetCursors(s.getId(), Collections.singletonList( + new SubscriptionCursorWithoutToken( + et.getName(), begin.getPartitionId(), begin.getOldestAvailableOffset()) + )); final ItemsWrapper subscriptionCursors = MAPPER.readValue( given().get("/subscriptions/{id}/cursors", s.getId()).getBody().asString(), @@ -602,13 +605,13 @@ public void whenPatchThenCursorsAreInitializedAndPatched() throws Exception { @Test(timeout = 15000) public void whenCommitFailsThreeTimesAndSingleBatchEventFailsThreeTimesThenEventSkipped() throws IOException { - final Subscription subscription = createAutoDLQSubscription(); - final TestStreamingClient client = TestStreamingClient - .create(URL, subscription.getId(), "batch_limit=3&commit_timeout=1") - .start(); - publishEvents(eventType.getName(), 50, i -> "{\"foo\":\"bar\"}"); + final Subscription subscription = createAutoDLQSubscription(eventType, UnprocessableEventPolicy.SKIP_EVENT, 3); + final TestStreamingClient client = TestStreamingClient + .create(URL, subscription.getId(), "batch_limit=3&commit_timeout=1"); + + client.start(); waitFor(() -> Assert.assertFalse(client.isRunning())); Assert.assertTrue(isCommitTimeoutReached(client)); @@ -622,6 +625,7 @@ public void whenCommitFailsThreeTimesAndSingleBatchEventFailsThreeTimesThenEvent client.start(); waitFor(() -> Assert.assertFalse(client.isRunning())); + // second batch is sent by Nakadi when a stream is closed Assert.assertEquals(2, client.getJsonBatches().size()); Assert.assertEquals(1, client.getJsonBatches().get(0).getEvents().size()); @@ -637,13 +641,48 @@ public void whenCommitFailsThreeTimesAndSingleBatchEventFailsThreeTimesThenEvent client.start(); waitFor(() -> Assert.assertFalse(client.isRunning())); - Assert.assertEquals(3, client.getJsonBatches().get(0).getEvents().size()); - Assert.assertEquals("001-0001-000000000000000003", client.getJsonBatches().get(0).getCursor().getOffset()); + + // check that we skipped over offset 0 + Assert.assertEquals("001-0001-000000000000000001", client.getJsonBatches().get(0).getCursor().getOffset()); } @Test(timeout = 15000) - public void whenIsLookingForDeadLetterAndCommitComesThenContinueLooking() throws IOException { - final Subscription subscription = createAutoDLQSubscription(); + public void whenCursorsAreResetTheDLQStateIsResetAsWell() throws IOException { + final Subscription subscription = createAutoDLQSubscription(eventType, UnprocessableEventPolicy.SKIP_EVENT, 3); + final TestStreamingClient client = TestStreamingClient + .create(URL, subscription.getId(), "batch_limit=3&commit_timeout=1") + .start(); + + publishEvents(eventType.getName(), 50, i -> "{\"foo\":\"bar\"}"); + + waitFor(() -> Assert.assertFalse(client.isRunning())); + Assert.assertTrue(isCommitTimeoutReached(client)); + + final List cursorSnapshot = getSubscriptionCursors(subscription.getId()); + + client.start(); + waitFor(() -> Assert.assertFalse(client.isRunning())); + Assert.assertTrue(isCommitTimeoutReached(client)); + + client.start(); + waitFor(() -> Assert.assertFalse(client.isRunning())); + Assert.assertTrue(isCommitTimeoutReached(client)); + + client.start(); + waitFor(() -> Assert.assertFalse(client.isRunning())); + Assert.assertEquals(2, client.getJsonBatches().size()); + Assert.assertEquals(1, client.getJsonBatches().get(0).getEvents().size()); + + resetCursorsWithToken(subscription.getId(), cursorSnapshot); + + client.start(); + waitFor(() -> Assert.assertFalse(client.isRunning())); + Assert.assertEquals(3, client.getJsonBatches().get(0).getEvents().size()); + } + + @Test(timeout = 30_000) + public void whenIsLookingForDeadLetterAndCommitComesThenContinueLooking() throws IOException, InterruptedException { + final Subscription subscription = createAutoDLQSubscription(eventType, UnprocessableEventPolicy.SKIP_EVENT, 3); final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), "batch_limit=10&commit_timeout=1") .start(); @@ -699,17 +738,36 @@ public void whenIsLookingForDeadLetterAndCommitComesThenContinueLooking() throws Assert.assertEquals(1, client.getJsonBatches().get(0).getEvents().size()); Assert.assertEquals("001-0001-000000000000000001", client.getJsonBatches().get(0).getCursor().getOffset()); + // now a single event should be skipped, but we continue getting them one by one until the failing batch end + final TestStreamingClient client2 = TestStreamingClient + .create(URL, subscription.getId(), "batch_limit=10&commit_timeout=1&stream_limit=20"); + + client2.startWithAutocommit(batches -> { + // 8×1 + 1×10 + 1×2 + 1×"stream closed" + Assert.assertEquals(11, batches.size()); + + final List singleEventBatches = batches.subList(0, 8); + for (int i = 0; i < singleEventBatches.size(); ++i) { + final StreamBatch batch = singleEventBatches.get(i); + Assert.assertEquals(1, batch.getEvents().size()); + } + + final StreamBatch theLast = batches.get(8); + Assert.assertEquals(10, theLast.getEvents().size()); + }); + waitFor(() -> Assert.assertFalse(client2.isRunning()), 15_000); + + // continue with normal batch size after skipping over the failing batch client.start(); waitFor(() -> Assert.assertFalse(client.isRunning())); Assert.assertTrue(isCommitTimeoutReached(client)); Assert.assertEquals(10, client.getJsonBatches().get(0).getEvents().size()); - Assert.assertEquals("001-0001-000000000000000011", client.getJsonBatches().get(0).getCursor().getOffset()); } @Test(timeout = 20_000) public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBatchSize() throws InterruptedException, IOException { - final Subscription subscription = createAutoDLQSubscription(); + final Subscription subscription = createAutoDLQSubscription(eventType, UnprocessableEventPolicy.SKIP_EVENT, 3); final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), "batch_limit=10&commit_timeout=1&stream_limit=20") .start(); @@ -734,6 +792,7 @@ public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBa Assert.assertEquals(10, client.getJsonBatches().get(0).getEvents().size()); Assert.assertEquals("001-0001-000000000000000009", client.getJsonBatches().get(0).getCursor().getOffset()); + // receive a single event in a batch and commit it so that Nakadi sends the next batch with a single event client.startWithAutocommit(batches -> { // 12 because the last one is stream limit reached debug info Assert.assertEquals(12, batches.size()); @@ -748,18 +807,152 @@ public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBa waitFor(() -> Assert.assertFalse(client.isRunning()), 15_000); } + @Test(timeout = 20_000) + public void shouldSkipPoisonPillAndDeadLetterFoundInTheQueueLater() throws IOException, InterruptedException { + + final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(4); + + publishBusinessEventWithUserDefinedPartition(eventType.getName(), + 50, i -> String.format("bar%d", i), i -> String.valueOf(i % 4)); + + final String poisonPillValue = "bar10"; + + final Subscription subscription = + createAutoDLQSubscription(eventType, UnprocessableEventPolicy.DEAD_LETTER_QUEUE, 3); + + // start looking for events in the DLQ store event type already (reading from END) + final Subscription dlqStoreEventTypeSub = createSubscriptionForEventType("nakadi.dead.letter.queue"); + final TestStreamingClient dlqStoreClient = TestStreamingClient.create(URL, + dlqStoreEventTypeSub.getId(), "batch_limit=1&stream_timeout=15"); + dlqStoreClient.startWithAutocommit(batches -> + Assert.assertTrue("failed event should be found in the dead letter queue", + batches.stream() + .flatMap(b -> b.getEvents().stream()) + .anyMatch(e -> + subscription.getId().equals(e.get("subscription_id")) && + poisonPillValue.equals(e.getJSONObject("event").getString("foo"))))); + + final AtomicReference cursorWithPoisonPill = new AtomicReference<>(); + while (true) { + final TestStreamingClient client = TestStreamingClient.create( + URL, subscription.getId(), "batch_limit=3&commit_timeout=1&stream_timeout=2"); + client.start(streamBatch -> { + if (streamBatch.getEvents().stream() + .anyMatch(event -> poisonPillValue.equals(event.getString("foo")))) { + cursorWithPoisonPill.set(streamBatch.getCursor()); + throw new RuntimeException("poison pill found"); + } else { + try { + NakadiTestUtils.commitCursors( + subscription.getId(), ImmutableList.of(streamBatch.getCursor()), client.getSessionId()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + }); + + waitFor(() -> Assert.assertFalse(client.isRunning())); + + if (client.getJsonBatches().stream() + .filter(streamBatch -> cursorWithPoisonPill.get().getPartition() + .equals(streamBatch.getCursor().getPartition())) + .anyMatch(streamBatch -> streamBatch.getCursor().getOffset() + .compareTo(cursorWithPoisonPill.get().getOffset()) > 0)) { + break; + } + } + + waitFor(() -> Assert.assertFalse(dlqStoreClient.isRunning())); + } + + @Test(timeout = 35_000) + public void testDlqModeOnlySendsSingleEventAndNotMore() throws IOException { + publishEvents(eventType.getName(), 50, i -> "{\"foo\":\"bar" + i + "\"}"); + final int maxEventSendCount = 2; + final Subscription subscription = createAutoDLQSubscription(eventType, UnprocessableEventPolicy.SKIP_EVENT, + maxEventSendCount); + final TestStreamingClient client = TestStreamingClient + .create(URL, subscription.getId(), "batch_limit=3&commit_timeout=5&batch_flush_timeout=2"); + + client.start(); + waitFor(() -> Assert.assertFalse(client.isRunning())); + Assert.assertTrue(isCommitTimeoutReached(client)); + + client.start(); + waitFor(() -> Assert.assertFalse(client.isRunning())); + Assert.assertTrue(isCommitTimeoutReached(client)); + + //DLQ MODE + client.start(); + waitFor(() -> Assert.assertFalse(client.isRunning())); + Assert.assertEquals(4, client.getJsonBatches().size()); // event batch, keepalive *2, commit timeout + Assert.assertEquals(1, client.getJsonBatches().get(0).getEvents().size()); + Assert.assertThat(client.getJsonBatches().get(1).getEvents(), empty()); + Assert.assertThat(client.getJsonBatches().get(2).getEvents(), empty()); + Assert.assertTrue(isCommitTimeoutReached(client)); + + client.start(); + waitFor(() -> Assert.assertFalse(client.isRunning())); + Assert.assertEquals(4, client.getJsonBatches().size()); // event batch, keepalive *2, commit timeout + Assert.assertEquals(1, client.getJsonBatches().get(0).getEvents().size()); + Assert.assertThat(client.getJsonBatches().get(1).getEvents(), empty()); + Assert.assertThat(client.getJsonBatches().get(2).getEvents(), empty()); + Assert.assertTrue(isCommitTimeoutReached(client)); + + client.start(); + waitFor(() -> Assert.assertFalse(client.isRunning())); + // check that we skipped over offset 0 + Assert.assertEquals("001-0001-000000000000000001", client.getJsonBatches().get(0).getCursor().getOffset()); + } + private static boolean isCommitTimeoutReached(final TestStreamingClient client) { return client.getJsonBatches().stream() .filter(batch -> batch.getMetadata() != null) .anyMatch(batch -> batch.getMetadata().getDebug().equals("Commit timeout reached")); } - private Subscription createAutoDLQSubscription() throws IOException { + private Subscription createAutoDLQSubscription(final EventType eventType, + final UnprocessableEventPolicy unprocessableEventPolicy, + final int maxEventSendCount) throws IOException { + final SubscriptionBase subscription = RandomSubscriptionBuilder.builder() .withEventType(eventType.getName()) .withStartFrom(BEGIN) .buildSubscriptionBase(); - subscription.setAnnotations(Map.of(DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3")); + subscription.setAnnotations(Map.of( + SUBSCRIPTION_MAX_EVENT_SEND_COUNT, Integer.toString(maxEventSendCount), + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, unprocessableEventPolicy.toString())); return createSubscription(subscription); } + + List getSubscriptionCursors(final String sid) throws IOException { + final Response response = given() + .get("/subscriptions/{id}/cursors", sid); + Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode()); + return MAPPER.readValue( + response.asInputStream(), + new TypeReference>() {}) + .getItems(); + } + + void resetCursorsWithToken( + final String sid, + final List cursors) throws JsonProcessingException { + final List cursorsWithoutToken = cursors + .stream() + .map(c -> new SubscriptionCursorWithoutToken(c.getEventType(), c.getPartition(), c.getOffset())) + .collect(Collectors.toList()); + resetCursors(sid, cursorsWithoutToken); + } + + void resetCursors( + final String sid, + final List cursors) throws JsonProcessingException { + final Response response = given() + .body(MAPPER.writeValueAsString(new ItemsWrapper<>(cursors))) + .contentType(JSON) + .patch("/subscriptions/{id}/cursors", sid); + Assert.assertEquals(HttpStatus.SC_NO_CONTENT, response.getStatusCode()); + } + } diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/StreamBatch.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/StreamBatch.java index 3a9ff59081..874b6ac686 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/StreamBatch.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/StreamBatch.java @@ -1,21 +1,30 @@ package org.zalando.nakadi.webservice.hila; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; import org.apache.commons.lang3.StringUtils; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.json.JSONArray; +import org.json.JSONObject; import org.zalando.nakadi.domain.StreamMetadata; import org.zalando.nakadi.repository.kafka.KafkaTestHelper; import org.zalando.nakadi.view.SubscriptionCursor; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; -import static java.util.Collections.unmodifiableList; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.isA; +import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONObjectAs; @Immutable public class StreamBatch { @@ -23,14 +32,25 @@ public class StreamBatch { private static final String DUMMY_TOKEN = "dummy-token"; private final SubscriptionCursor cursor; - private final List events; + private final List events; private final StreamMetadata metadata; + // used for reading from string with object mapper public StreamBatch(@JsonProperty("cursor") final SubscriptionCursor cursor, - @Nullable @JsonProperty("events") final List events, + @Nullable @JsonProperty("events") final List> events, @Nullable @JsonProperty("info") final StreamMetadata metadata) { this.cursor = cursor; - this.events = Optional.ofNullable(events).orElse(ImmutableList.of()); + this.events = Optional.ofNullable(events) + .map(evs -> evs.stream().map(JSONObject::new).collect(Collectors.toUnmodifiableList())) + .orElse(Collections.emptyList()); + this.metadata = metadata; + } + + public StreamBatch(final SubscriptionCursor cursor, final JSONArray eventsArray, final StreamMetadata metadata) { + this.cursor = cursor; + this.events = IntStream.range(0, eventsArray.length()) + .mapToObj(i -> eventsArray.getJSONObject(i)) + .collect(Collectors.toUnmodifiableList()); this.metadata = metadata; } @@ -38,8 +58,8 @@ public SubscriptionCursor getCursor() { return cursor; } - public List getEvents() { - return unmodifiableList(events); + public List getEvents() { + return events; } @Override @@ -69,23 +89,44 @@ public StreamMetadata getMetadata() { return metadata; } + public static StreamBatch emptyBatch(final String partition, final String offset, final String eventType, + final String debugInfo) { + + final String paddedOffset = StringUtils.leftPad(offset, KafkaTestHelper.CURSOR_OFFSET_LENGTH, '0'); + return new StreamBatch( + new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), + new JSONArray(), + new StreamMetadata(debugInfo)); + } + public static StreamBatch singleEventBatch(final String partition, final String offset, final String eventType, - final Map event, final String metadata) { + final JSONObject event) { + final String paddedOffset = StringUtils.leftPad(offset, KafkaTestHelper.CURSOR_OFFSET_LENGTH, '0'); - if (event.isEmpty()) { - return new StreamBatch(new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), - ImmutableList.of(), new StreamMetadata(metadata)); - } else { - return new StreamBatch(new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), - ImmutableList.of(event), new StreamMetadata(metadata)); - } + return new StreamBatch( + new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), + new JSONArray().put(event), + null); } public static StreamBatch singleEventBatch(final String partition, final String offset, final String eventType, - final Map event) { + final JSONObject event, final String debugInfo) { + + final String paddedOffset = StringUtils.leftPad(offset, KafkaTestHelper.CURSOR_OFFSET_LENGTH, '0'); + return new StreamBatch( + new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), + new JSONArray().put(event), + new StreamMetadata(debugInfo)); + } + + private static StreamBatch singleEventBatch(final String partition, final String offset, final String eventType, + final JSONObject event, final StreamMetadata metadata) { + final String paddedOffset = StringUtils.leftPad(offset, KafkaTestHelper.CURSOR_OFFSET_LENGTH, '0'); - return new StreamBatch(new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), - ImmutableList.of(event), null); + return new StreamBatch( + new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), + new JSONArray().put(event), + metadata); } @Override @@ -97,38 +138,20 @@ public String toString() { '}'; } - public static class MatcherIgnoringToken extends BaseMatcher { - - private final StreamBatch batch; - - public MatcherIgnoringToken(final StreamBatch batch) { - this.batch = batch; - } - - @Override - public boolean matches(final Object item) { - if (!(item instanceof StreamBatch)) { - return false; - } - final StreamBatch batchTocheck = (StreamBatch) item; - final SubscriptionCursor cursor = batch.getCursor(); - final SubscriptionCursor cursorToCheck = batchTocheck.getCursor(); - - return batch.getEvents().equals(batchTocheck.getEvents()) && - cursor.getPartition().equals(cursorToCheck.getPartition()) && - cursor.getOffset().equals(cursorToCheck.getOffset()) && - cursor.getEventType().equals(cursorToCheck.getEventType()) && - Optional.ofNullable(batch.getMetadata()) - .map(b -> b.equals(batchTocheck.getMetadata())) - .orElse(batchTocheck.getMetadata() == null); - } - - @Override - public void describeTo(final Description description) { - } - - public static MatcherIgnoringToken equalToBatchIgnoringToken(final StreamBatch batch) { - return new MatcherIgnoringToken(batch); - } + public static Matcher equalToBatchIgnoringToken(final StreamBatch batch) { + final SubscriptionCursor cursor = batch.getCursor(); + final List events = batch.getEvents(); + return allOf( + isA(StreamBatch.class), + hasProperty("cursor", allOf( + hasProperty("partition", equalTo(cursor.getPartition())), + hasProperty("offset", equalTo(cursor.getOffset())), + hasProperty("eventType", equalTo(cursor.getEventType())))), + hasProperty("events", events.isEmpty() + ? empty() + : contains(events.stream() + .map(e -> sameJSONObjectAs(e)) + .collect(Collectors.toList()))), + hasProperty("metadata", equalTo(batch.getMetadata()))); } } diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java index 629655817d..a5af9f5094 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java @@ -106,7 +106,15 @@ private TestStreamingClient startInternal(final boolean wait, public TestStreamingClient start() { try { - return startInternal(false, new JsonConsumer()); + return startInternal(false, new JsonConsumer((ignore) -> {})); + } catch (final InterruptedException ignore) { + throw new RuntimeException(ignore); + } + } + + public TestStreamingClient start(final Consumer onBatch) { + try { + return startInternal(false, new JsonConsumer(onBatch)); } catch (final InterruptedException ignore) { throw new RuntimeException(ignore); } @@ -123,7 +131,7 @@ public TestStreamingClient startBinary() { public TestStreamingClient startWithAutocommit(final Consumer> batchesListener) throws InterruptedException { this.batchesListener = batchesListener; - final TestStreamingClient client = startInternal(true, new JsonConsumer()); + final TestStreamingClient client = startInternal(true, new JsonConsumer((ignore)->{})); final Thread autocommitThread = new Thread(() -> { int oldIdx = 0; while (client.isRunning()) { @@ -132,11 +140,12 @@ public TestStreamingClient startWithAutocommit(final Consumer> if (batch.getEvents() != null && !batch.getEvents().isEmpty()) { try { final SubscriptionCursor cursor = batch.getCursor(); + LOG.debug("Committing: {}", cursor); final int responseCode = NakadiTestUtils.commitCursors( client.subscriptionId, Collections.singletonList(batch.getCursor()), client.getSessionId()); - LOG.info("Committing " + responseCode + ": " + cursor); + LOG.info("Commit response code: {}", responseCode); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -156,11 +165,14 @@ public TestStreamingClient startWithAutocommit(final Consumer> } public boolean close() { + LOG.debug("Closing..."); if (running) { + LOG.debug("Set not running!"); running = false; connection.disconnect(); return true; } else { + LOG.debug("Already closed!"); return false; } } @@ -252,6 +264,13 @@ public void run() { private class JsonConsumer extends ConsumerThread { + + private final Consumer onBatch; + + JsonConsumer(final Consumer onBatch) { + this.onBatch = onBatch; + } + @Override void addHeaders() { } @@ -262,14 +281,18 @@ void readBatches(final InputStream inputStream) throws IOException { final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8)); while (running) { try { + LOG.debug("Reading next line..."); final String line = reader.readLine(); if (line == null) { + LOG.debug("Got null line, stopping."); return; } + LOG.trace("Got line: {}", line); final StreamBatch streamBatch = MAPPER.readValue(line, StreamBatch.class); synchronized (jsonBatches) { jsonBatches.add(streamBatch); } + onBatch.accept(streamBatch); } catch (final SocketTimeoutException ste) { LOG.info("No data in 10 ms, retrying read data"); } diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 8d1f6f762c..42bc82b653 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -13,6 +13,7 @@ import org.zalando.nakadi.domain.HeaderTag; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.Subscription; +import org.zalando.nakadi.domain.UnprocessableEventPolicy; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.RebalanceConflictException; @@ -26,6 +27,7 @@ import org.zalando.nakadi.service.EventStreamWriter; import org.zalando.nakadi.service.EventTypeChangeListener; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.publishing.EventPublisher; import org.zalando.nakadi.service.subscription.autocommit.AutocommitSupport; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; @@ -36,6 +38,7 @@ import org.zalando.nakadi.service.subscription.zk.ZkSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; +import org.zalando.nakadi.util.UUIDGenerator; import java.io.Closeable; import java.io.IOException; @@ -90,6 +93,10 @@ public class StreamingContext implements SubscriptionStreamer { private final EventTypeCache eventTypeCache; private static final Logger LOG = LoggerFactory.getLogger(StreamingContext.class); private final Integer maxEventSendCount; + private final UnprocessableEventPolicy unprocessableEventPolicy; + private final String deadLetterQueueEventTypeName; + private final EventPublisher eventPublisher; + private final UUIDGenerator uuidGenerator; private StreamingContext(final Builder builder) { this.out = builder.out; @@ -117,11 +124,19 @@ private StreamingContext(final Builder builder) { this.kafkaRecordDeserializer = builder.kafkaRecordDeserializer; this.eventTypeCache = builder.eventTypeCache; this.featureToggleService = builder.featureToggleService; + this.deadLetterQueueEventTypeName = builder.deadLetterQueueEventTypeName; + this.eventPublisher = builder.eventPublisher; + this.uuidGenerator = builder.uuidGenerator; this.maxEventSendCount = Optional.ofNullable(getSubscription().getAnnotations()) .map(ans -> ans.get(DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT)) .map(Integer::valueOf) .orElse(null); + + this.unprocessableEventPolicy = Optional.ofNullable(getSubscription().getAnnotations()) + .map(ans -> ans.get(DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY)) + .map(UnprocessableEventPolicy::valueOf) + .orElse(null); } public ConsumptionKpiCollector getKpiCollector() { @@ -400,6 +415,22 @@ public Integer getMaxEventSendCount() { return this.maxEventSendCount; } + public UnprocessableEventPolicy getUnprocessableEventPolicy() { + return this.unprocessableEventPolicy; + } + + public String getDeadLetterQueueEventTypeName() { + return this.deadLetterQueueEventTypeName; + } + + public EventPublisher getEventPublisher() { + return this.eventPublisher; + } + + public UUIDGenerator getUuidGenerator() { + return this.uuidGenerator; + } + public static final class Builder { private SubscriptionOutput out; private StreamParameters parameters; @@ -425,6 +456,9 @@ public static final class Builder { private KafkaRecordDeserializer kafkaRecordDeserializer; private EventTypeCache eventTypeCache; private FeatureToggleService featureToggleService; + private String deadLetterQueueEventTypeName; + private EventPublisher eventPublisher; + private UUIDGenerator uuidGenerator; public Builder setEventTypeCache(final EventTypeCache eventTypeCache) { this.eventTypeCache = eventTypeCache; @@ -546,11 +580,23 @@ public Builder setFeatureToggleService(final FeatureToggleService featureToggleS return this; } - public StreamingContext build() { - return new StreamingContext(this); + public Builder setDeadLetterQueueEventTypeName(final String deadLetterQueueEventTypeName) { + this.deadLetterQueueEventTypeName = deadLetterQueueEventTypeName; + return this; } + public Builder setEventPublisher(final EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } - } + public Builder setUuidGenerator(final UUIDGenerator uuidGenerator) { + this.uuidGenerator = uuidGenerator; + return this; + } + public StreamingContext build() { + return new StreamingContext(this); + } + } } diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java index e39efdce5b..3060b18563 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java @@ -21,10 +21,12 @@ import org.zalando.nakadi.service.EventTypeChangeListener; import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.NakadiCursorComparator; +import org.zalando.nakadi.service.publishing.EventPublisher; import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; +import org.zalando.nakadi.util.UUIDGenerator; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -50,6 +52,9 @@ public class SubscriptionStreamerFactory { private final ConsumptionKpiCollectorFactory consumptionKpiCollectorFactory; private final KafkaRecordDeserializer kafkaRecordDeserializer; private final FeatureToggleService featureToggleService; + private final EventPublisher eventPublisher; + private final UUIDGenerator uuidGenerator; + private final String deadLetterQueueEventTypeName; @Autowired public SubscriptionStreamerFactory( @@ -68,8 +73,10 @@ public SubscriptionStreamerFactory( @Value("${nakadi.subscription.maxStreamMemoryBytes}") final long streamMemoryLimitBytes, final ConsumptionKpiCollectorFactory consumptionKpiCollectorFactory, final KafkaRecordDeserializer kafkaRecordDeserializer, - final FeatureToggleService featureToggleService - ) { + final FeatureToggleService featureToggleService, + final EventPublisher eventPublisher, + @Value("${nakadi.dlq.storeEventTypeName}") final String deadLetterQueueEventTypeName, + final UUIDGenerator uuidGenerator) { this.timelineService = timelineService; this.cursorTokenService = cursorTokenService; this.objectMapper = objectMapper; @@ -86,6 +93,9 @@ public SubscriptionStreamerFactory( this.consumptionKpiCollectorFactory = consumptionKpiCollectorFactory; this.kafkaRecordDeserializer = kafkaRecordDeserializer; this.featureToggleService = featureToggleService; + this.eventPublisher = eventPublisher; + this.deadLetterQueueEventTypeName = deadLetterQueueEventTypeName; + this.uuidGenerator = uuidGenerator; } public SubscriptionStreamer build( @@ -98,7 +108,6 @@ public SubscriptionStreamer build( final ZkSubscriptionClient zkClient = zkClientFactory.createClient( subscription, streamParameters.commitTimeoutMillis); - // Create streaming context return new StreamingContext.Builder() .setOut(output) .setStreamMemoryLimitBytes(streamMemoryLimitBytes) @@ -125,7 +134,9 @@ public SubscriptionStreamer build( .setKafkaRecordDeserializer(kafkaRecordDeserializer) .setEventTypeCache(eventTypeCache) .setFeatureToggleService(featureToggleService) + .setEventPublisher(eventPublisher) + .setDeadLetterQueueEventTypeName(deadLetterQueueEventTypeName) + .setUuidGenerator(uuidGenerator) .build(); } - } diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java index 61d84d8249..e6777b9cb4 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java @@ -40,6 +40,12 @@ class ClosingState extends State { @Override public void onExit() { + try { + updateFailedCommitsCount(); + } catch (final RuntimeException re) { + LOG.error("Failed to update failed commits count", re); + } + try { getAutocommit().autocommit(); freePartitions(new HashSet<>(listeners.keySet())); @@ -57,6 +63,17 @@ public void onExit() { } } + private void updateFailedCommitsCount() { + if (getContext().getMaxEventSendCount() == null) { + return; + } + + getZk().updateTopology(topology -> Arrays.stream(topology.getPartitions()) + .filter(p -> uncommittedOffsets.containsKey(new EventTypePartition(p.getEventType(), p.getPartition()))) + .map(Partition::toIncFailedCommits) + .toArray(Partition[]::new)); + } + @Override public void onEnter() { final long timeToWaitMillis = getParameters().commitTimeoutMillis - @@ -64,8 +81,6 @@ public void onEnter() { uncommittedOffsets = uncommittedOffsetsSupplier.get(); if (!uncommittedOffsets.isEmpty() && timeToWaitMillis > 0) { scheduleTask(() -> { - // commit timeout will be reached for the partitions, lets update topology with number of failed commits - updateFailedCommitsCount(); switchState(new CleanupState()); }, timeToWaitMillis, TimeUnit.MILLISECONDS); @@ -76,27 +91,11 @@ public void onEnter() { return; } reactOnTopologyChange(); - } else if (!uncommittedOffsets.isEmpty()) { - // commit timeout reached for these partitions, lets update topology with number of failed commits - updateFailedCommitsCount(); - switchState(new CleanupState()); } else { switchState(new CleanupState()); } } - private void updateFailedCommitsCount() { - if (getContext().getMaxEventSendCount() == null) { - return; - } - - getZk().updateTopology(topology -> Arrays.stream(topology.getPartitions()) - .filter(p -> uncommittedOffsets.containsKey(new EventTypePartition(p.getEventType(), p.getPartition()))) - .map(Partition::toIncFailedCommits) - .toArray(Partition[]::new) - ); - } - private void onTopologyChanged() { if (topologyListener == null) { throw new IllegalStateException( diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 0c223b307c..6a983b2d56 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -3,13 +3,19 @@ import com.codahale.metrics.Meter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.json.JSONArray; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zalando.nakadi.domain.ConsumedEvent; +import org.zalando.nakadi.domain.EventPublishingStatus; +import org.zalando.nakadi.domain.EventPublishResult; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.domain.UnprocessableEventPolicy; +import org.zalando.nakadi.exceptions.runtime.DeadLetterQueueStoreException; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; @@ -26,6 +32,8 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -42,7 +50,6 @@ import static java.util.stream.Collectors.groupingBy; - class StreamingState extends State { private static final Logger LOG = LoggerFactory.getLogger(StreamingState.class); @@ -52,6 +59,7 @@ class StreamingState extends State { // correctly, and p0 is not receiving any updates - reassignment won't complete. private final Map releasingPartitions = new HashMap<>(); private Map failedCommitPartitions = new HashMap<>(); + private String failedCommitsDebugStringToFlush; private ZkSubscription topologyChangeSubscription; private HighLevelConsumer eventConsumer; private boolean pollPaused; @@ -234,11 +242,6 @@ private void rememberEvent(final ConsumedEvent event) { } private long getMessagesAllowedToSend() { - if (failedCommitPartitions.values().stream() - .anyMatch(Partition::isLookingForDeadLetter)) { - return 1; - } - final long unconfirmed = offsets.values().stream().mapToLong(PartitionData::getUnconfirmed).sum(); final long limit = getParameters().maxUncommittedMessages - unconfirmed; return getParameters().getMessagesAllowedToSend(limit, this.sentEvents); @@ -263,8 +266,8 @@ private void streamToOutput() { private void streamToOutput(final boolean streamTimeoutReached) { final long currentTimeMillis = System.currentTimeMillis(); - int messagesAllowedToSend = (int) getMessagesAllowedToSend(); final boolean wasCommitted = isEverythingCommitted(); + int messagesAllowedToSend = (int) getMessagesAllowedToSend(); boolean sentSomething = false; for (final Map.Entry e : offsets.entrySet()) { @@ -272,25 +275,29 @@ private void streamToOutput(final boolean streamTimeoutReached) { final PartitionData partitionData = e.getValue(); Partition partition = failedCommitPartitions.get(etp); + int messagesAllowedForPartition = + getMessagesAllowedForPartition(partition, partitionData, messagesAllowedToSend); + + // loop sends all the events from partition, until max uncommitted reached or no more events while (true) { - if (partition != null && partition.isLookingForDeadLetter()) { + if (inDlqMode(partition)) { final NakadiCursor lastDeadLetterCursor = getContext().getCursorConverter().convert( partition.getEventType(), new Cursor(partition.getPartition(), partition.getLastDeadLetterOffset())); if (getComparator().compare(partitionData.getCommitOffset(), lastDeadLetterCursor) >= 0) { getZk().updateTopology(topology -> Arrays.stream(topology.getPartitions()) .filter(p -> p.getPartition().equals(etp.getPartition())) - .map(p -> p.toLastDeadLetterOffset(null)) + .map(p -> p.toCleanDeadLetterState()) .toArray(Partition[]::new)); failedCommitPartitions.remove(etp); - messagesAllowedToSend = (int) getMessagesAllowedToSend(); // fixme think partition = null; + messagesAllowedForPartition = messagesAllowedToSend; } } final List toSend = partitionData.takeEventsToStream( currentTimeMillis, - Math.min(getBatchLimitEvents(), messagesAllowedToSend), + Math.min(getBatchLimitEvents(), messagesAllowedForPartition), getParameters().batchTimeoutMillis, streamTimeoutReached); @@ -299,38 +306,41 @@ private void streamToOutput(final boolean streamTimeoutReached) { } if (!toSend.isEmpty() && - partition != null && - partition.isLookingForDeadLetter() && + inDlqMode(partition) && partition.getFailedCommitsCount() >= getContext().getMaxEventSendCount()) { + final ConsumedEvent failedEvent = toSend.remove(0); - // todo: skip or to dlq + LOG.warn("Skipping event {} from partition {} due to failed commits count {}", failedEvent.getPosition(), etp, partition.getFailedCommitsCount()); + if (getContext().getUnprocessableEventPolicy() == UnprocessableEventPolicy.DEAD_LETTER_QUEUE) { + sendToDeadLetterQueue(failedEvent, partition.getFailedCommitsCount()); + } + getAutocommit().addSkippedEvent(failedEvent.getPosition()); this.addTask(() -> getAutocommit().autocommit()); - // reset looking dead letter flag in zookeeper + // reset failed commits, but keep looking until last dead letter offset getZk().updateTopology(topology -> Arrays.stream(topology.getPartitions()) .filter(p -> p.getPartition().equals(etp.getPartition())) - .map(p -> p.toLastDeadLetterOffset(null)) + .map(p -> p.toZeroFailedCommits()) .toArray(Partition[]::new)); - // clean local copy of failed commits just in case that update from zookeeper is later or lost - failedCommitPartitions.remove(etp); - // we are sure the batch is empty break; } sentSomething |= !toSend.isEmpty(); - flushData(etp, toSend, getDebugMessage(e)); + flushData(etp, toSend, makeDebugMessage(partitionData)); if (toSend.isEmpty()) { break; } + messagesAllowedToSend -= toSend.size(); - } + messagesAllowedForPartition -= toSend.size(); + } } long memoryConsumed = offsets.values().stream().mapToLong(PartitionData::getBytesInMemory).sum(); @@ -370,37 +380,94 @@ private void streamToOutput(final boolean streamTimeoutReached) { } } + private static int getMessagesAllowedForPartition(final Partition partition, + final PartitionData partitionData, + final int messagesAllowedToSend) { + if (inDlqMode(partition)) { + return partitionData.isCommitted() ? 1 : 0; + } + return messagesAllowedToSend; + } + + private static boolean inDlqMode(final Partition partition) { + return partition != null && partition.isLookingForDeadLetter(); + } + + private void sendToDeadLetterQueue(final ConsumedEvent event, final int failedCommitsCount) { + // TODO: how do we handle AVRO here? + + final String failedEventString = new String(event.getEvent(), StandardCharsets.UTF_8); + final JSONObject failedEvent = new JSONObject(failedEventString); + + final JSONObject errorMessage = + new JSONObject() + .put("message", "Nakadi Auto DLQ: skipped due to failed commits count: " + failedCommitsCount); + + final SubscriptionCursorWithoutToken cursor = + getContext().getCursorConverter().convertToNoToken(event.getPosition()); + + final JSONObject deadLetter = + new JSONObject() + .put("subscription_id", getContext().getSubscription().getId()) + .put("event_type", cursor.getEventType()) + .put("partition", cursor.getPartition()) + .put("offset", cursor.getOffset()) + .put("error", errorMessage) + .put("event", failedEvent) + .put("metadata", + new JSONObject() + .put("eid", getContext().getUuidGenerator().randomUUID().toString()) + .put("occurred_at", Instant.now().toString())); + + final JSONArray deadLetterBatch = + new JSONArray().put(deadLetter); + + final EventPublishResult result; + try { + result = getContext().getEventPublisher().publishInternal( + deadLetterBatch.toString(), getContext().getDeadLetterQueueEventTypeName(), null); + } catch (final Exception e) { + throw new DeadLetterQueueStoreException("Failed to store an event to the dead letter queue", e); + } + + if (result.getStatus() != EventPublishingStatus.SUBMITTED) { + throw new DeadLetterQueueStoreException("Failed to store an event to the dead letter queue: " + + result.getResponses().get(0).getDetail()); + } + } + private int getBatchLimitEvents() { return getParameters().batchLimitEvents; } - private Optional getDebugMessage(final Map.Entry entry) { - final PartitionData partitionData = entry.getValue(); - final long skippedEventsCount = partitionData.getSkippedEventsCount(); - if (skippedEventsCount > 0) { - partitionData.resetSkippedEventsCount(); - return Optional.of(String.format( - "Skipped events due to retention time passed, count: %d", skippedEventsCount)); - } + private Optional makeDebugMessage(final PartitionData partitionData) { + final StringBuilder sb = new StringBuilder(); if (batchesSent == 0) { - final StringBuilder sb = new StringBuilder(); sb.append("Stream started"); - final String failedCommitsPartitions = Arrays.stream(getZk().getTopology().getPartitions()) - .filter(p -> p.getFailedCommitsCount() > 0 || p.isLookingForDeadLetter()) - .map(Partition::toFailedCommitString) - .collect(Collectors.joining(", ")); + final long skippedEventsCount = partitionData.getSkippedEventsCount(); + if (skippedEventsCount > 0) { + partitionData.resetSkippedEventsCount(); + sb.append("; skipped events due to retention time passed, count: ") + .append(skippedEventsCount); + } + } - if (failedCommitsPartitions != null && !failedCommitsPartitions.isEmpty()) { - sb.append(". Failed commits: ").append(failedCommitsPartitions); - LOG.info("Failed commits: {}", failedCommitsPartitions); + if (failedCommitsDebugStringToFlush != null && !failedCommitsDebugStringToFlush.isEmpty()) { + if (sb.length() != 0) { + sb.append("; "); } + sb.append("Auto DLQ failed commits tracking: ") + .append(failedCommitsDebugStringToFlush); - return Optional.of(sb.toString()); + // reset after sending it once + failedCommitsDebugStringToFlush = null; } - return Optional.empty(); + return sb.length() == 0 + ? Optional.empty() + : Optional.of(sb.toString()); } private void flushData(final EventTypePartition pk, final List data, @@ -514,11 +581,16 @@ void reactOnTopologyChange() { trackIdleness(topology); if (getContext().getMaxEventSendCount() != null) { - failedCommitPartitions = Arrays.stream(topology.getPartitions()) + failedCommitPartitions = Arrays.stream(assignedPartitions) .filter(p -> p.getFailedCommitsCount() > 0 || p.isLookingForDeadLetter()) .collect(Collectors.toMap( p -> new EventTypePartition(p.getEventType(), p.getPartition()), p -> p)); + + failedCommitsDebugStringToFlush = failedCommitPartitions.values().stream() + .map(Partition::toFailedCommitsTrackingString) + .collect(Collectors.joining(", ")); + LOG.debug("Failed commits tracking: {}", failedCommitsDebugStringToFlush); } } @@ -730,18 +802,23 @@ private void addToStreaming(final Partition partition, offsets.put(partition.getKey(), pd); getAutocommit().addPartition(cursor); - if (getContext().getMaxEventSendCount() != null) { + // check failed commits and indicate that we should switch in looking for dead letters mode, unless already + if (getContext().getMaxEventSendCount() != null && + partition.getFailedCommitsCount() >= getContext().getMaxEventSendCount() && + !partition.isLookingForDeadLetter()) { + final NakadiCursor lastDeadLetterCursor = getContext().getCursorOperationsService() .shiftCursor(cursor, getBatchLimitEvents()); + final String lastDeadLetterOffset = getContext().getCursorConverter() .convert(lastDeadLetterCursor).getOffset(); - // check failed commits and indicate that streaming should switch in looking for dead letters mode - if (partition.getFailedCommitsCount() >= getContext().getMaxEventSendCount()) { - final Partition lookingDeadLetter = partition.toLastDeadLetterOffset(lastDeadLetterOffset); - failedCommitPartitions.put(partition.getKey(), lookingDeadLetter); - getZk().updateTopology(topology -> new Partition[]{lookingDeadLetter}); - } + final Partition lookingDeadLetter = partition + .toLastDeadLetterOffset(lastDeadLetterOffset) + .toZeroFailedCommits(); + failedCommitPartitions.put(partition.getKey(), lookingDeadLetter); + + getZk().updateTopology(topology -> new Partition[]{lookingDeadLetter}); } } @@ -838,5 +915,4 @@ private void trackIdleness(final ZkSubscriptionClient.Topology topology) { } } } - } diff --git a/api-cursors/src/main/java/org/zalando/nakadi/service/CursorsService.java b/api-cursors/src/main/java/org/zalando/nakadi/service/CursorsService.java index 7c26cee100..cdb9b29805 100644 --- a/api-cursors/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/api-cursors/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -32,10 +32,12 @@ import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -85,7 +87,11 @@ public List commitCursors(final String streamId, final String subscript final Subscription subscription = subscriptionCache.getSubscription(subscriptionId); authorizationValidator.authorizeSubscriptionView(subscription); authorizationValidator.authorizeSubscriptionCommit(subscription); - validateSubscriptionCommitCursors(subscription, cursors); + // Note: not checking directly here if cursors belong to the subscription. + // Such check happens indirectly (in validateStreamId) against subscription's topology. + // The subscription's topology might contain additional implicit event types (like DLQ event type), + // for which we allow committing (but not resetting) offsets. + checkCursorsStorageAvailability(cursors); try (ZkSubscriptionClient zkClient = zkSubscriptionFactory.createClient(subscription)) { validateStreamId(cursors, streamId, zkClient, subscriptionId); return zkClient.commitOffsets( @@ -219,8 +225,20 @@ public void resetCursors(final String subscriptionId, final List newCursors = cursors.stream() .map(cursorConverter::convertToNoToken) .collect(Collectors.toList()); + final Predicate isUpdatedPartition = + p -> cursors.stream().anyMatch(c -> p.getKey().equals(c.getEventTypePartition())); - zkClient.closeSubscriptionStreams(() -> zkClient.forceCommitOffsets(newCursors), timeout); + zkClient.closeSubscriptionStreams(() -> { + zkClient.forceCommitOffsets(newCursors); + // reset the DLQ state of any partition that got updated + zkClient.updateTopology(topology -> + Arrays + .stream(topology.getPartitions()) + .filter(isUpdatedPartition) + .map(p -> p.toCleanDeadLetterState()) + .toArray(Partition[]::new) + ); + }, timeout); auditLogPublisher.publish( Optional.of(new ItemsWrapper<>(oldCursors)), @@ -238,11 +256,8 @@ public void resetCursors(final String subscriptionId, } } - private void validateSubscriptionCommitCursors(final Subscription subscription, - final List cursors) + private void checkCursorsStorageAvailability(final List cursors) throws UnableProcessException { - validateCursorsBelongToSubscription(subscription, cursors); - cursors.forEach(cursor -> { try { cursor.checkStorageAvailability(); diff --git a/app/src/main/java/org/zalando/nakadi/service/NakadiDeadLetterQueueInitialization.java b/app/src/main/java/org/zalando/nakadi/service/NakadiDeadLetterQueueInitialization.java new file mode 100644 index 0000000000..f0a0b67dad --- /dev/null +++ b/app/src/main/java/org/zalando/nakadi/service/NakadiDeadLetterQueueInitialization.java @@ -0,0 +1,74 @@ +package org.zalando.nakadi.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +@Component +@ConfigurationProperties("nakadi.dlq") +public class NakadiDeadLetterQueueInitialization { + private static final Logger LOG = LoggerFactory.getLogger(NakadiDeadLetterQueueInitialization.class); + + private final SystemEventTypeInitializer systemEventTypeInitializer; + + private String storeEventTypeName; + private String owningApplication; + private String authDataType; + private String authValue; + + @Autowired + public NakadiDeadLetterQueueInitialization(final SystemEventTypeInitializer systemEventTypeInitializer) { + this.systemEventTypeInitializer = systemEventTypeInitializer; + } + + @EventListener + public void onApplicationEvent(final ContextRefreshedEvent event) throws IOException { + final Map replacements = new HashMap<>(); + replacements.put("store_event_type_name_placeholder", storeEventTypeName); + replacements.put("owning_application_placeholder", owningApplication); + replacements.put("auth_data_type_placeholder", authDataType); + replacements.put("auth_value_placeholder", authValue); + + systemEventTypeInitializer.createEventTypesFromResource("dead_letter_queue_event_types.json", replacements); + } + + public String getStoreEventTypeName() { + return storeEventTypeName; + } + + public void setStoreEventTypeName(final String storeEventTypeName) { + this.storeEventTypeName = storeEventTypeName; + } + + public String getOwningApplication() { + return owningApplication; + } + + public void setOwningApplication(final String owningApplication) { + this.owningApplication = owningApplication; + } + + public String getAuthDataType() { + return authDataType; + } + + public void setAuthDataType(final String authDataType) { + this.authDataType = authDataType; + } + + public String getAuthValue() { + return authValue; + } + + public void setAuthValue(final String authValue) { + this.authValue = authValue; + } +} diff --git a/app/src/main/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJob.java b/app/src/main/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJob.java new file mode 100644 index 0000000000..427bd713a2 --- /dev/null +++ b/app/src/main/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJob.java @@ -0,0 +1,190 @@ +package org.zalando.nakadi.service.job; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.zalando.nakadi.cache.EventTypeCache; +import org.zalando.nakadi.config.NakadiSettings; +import org.zalando.nakadi.domain.ConsumedEvent; +import org.zalando.nakadi.domain.EventTypePartition; +import org.zalando.nakadi.domain.Feature; +import org.zalando.nakadi.domain.HeaderTag; +import org.zalando.nakadi.domain.NakadiCursor; +import org.zalando.nakadi.domain.Subscription; +import org.zalando.nakadi.repository.db.SubscriptionDbRepository; +import org.zalando.nakadi.service.CursorConverter; +import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.subscription.model.Partition; +import org.zalando.nakadi.service.timeline.HighLevelConsumer; +import org.zalando.nakadi.service.timeline.TimelineService; +import org.zalando.nakadi.view.Cursor; +import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; +import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Service +public class DlqRedriveEventTypeAttachmentJob { + + private static final String JOB_NAME = "dlq-redrive-event-type-attachment"; + + private static final Logger LOG = LoggerFactory.getLogger(DlqRedriveEventTypeAttachmentJob.class); + + private final FeatureToggleService featureToggleService; + private final EventTypeCache eventTypeCache; + private final TimelineService timelineService; + private final SubscriptionDbRepository subscriptionRepository; + private final SubscriptionClientFactory subscriptionClientFactory; + private final CursorConverter cursorConverter; + private final NakadiSettings nakadiSettings; + private final String dlqRedriveEventTypeName; + private final ExclusiveJobWrapper jobWrapper; + + @Autowired + public DlqRedriveEventTypeAttachmentJob( + final FeatureToggleService featureToggleService, + final EventTypeCache eventTypeCache, + final TimelineService timelineService, + final SubscriptionDbRepository subscriptionRepository, + final SubscriptionClientFactory subscriptionClientFactory, + final CursorConverter cursorConverter, + final NakadiSettings nakadiSettings, + @Value("${nakadi.dlq.redriveEventTypeName}") final String dlqRedriveEventTypeName, + @Value("${nakadi.jobs.dlqRedriveEventTypeAttach.runPeriodMs}") final int periodMs, + final JobWrapperFactory jobWrapperFactory) { + this.featureToggleService = featureToggleService; + this.eventTypeCache = eventTypeCache; + this.timelineService = timelineService; + this.subscriptionRepository = subscriptionRepository; + this.subscriptionClientFactory = subscriptionClientFactory; + this.cursorConverter = cursorConverter; + this.nakadiSettings = nakadiSettings; + this.dlqRedriveEventTypeName = dlqRedriveEventTypeName; + this.jobWrapper = jobWrapperFactory.createExclusiveJobWrapper(JOB_NAME, periodMs); + } + + @Scheduled( + fixedDelayString = "${nakadi.jobs.checkRunMs}", + initialDelayString = "${random.int(${nakadi.jobs.checkRunMs})}") + public void attachDlqRedriveEventType() { + if (featureToggleService.isFeatureEnabled(Feature.DLQ_REDRIVE_EVENT_TYPE_ATTACHMENT_JOB)) { + LOG.warn("Feature toggle is disabled for DLQ redrive event type attachment job: skipping the run."); + return; + } + try { + jobWrapper.runJobLocked(() -> { + try { + attachDlqRedriveEventTypeLocked(); + } catch (final IOException ioe) { + throw new RuntimeException(ioe); + } + }); + } catch (final Exception e) { + LOG.error("Failed to run the job to attach DLQ redrive event type to corresponding subscriptions", e); + } + } + + private void attachDlqRedriveEventTypeLocked() throws IOException { + + final Set subscriptionIds = new HashSet<>(); + + final List orderedPartitions = eventTypeCache.getOrderedPartitions(dlqRedriveEventTypeName); + + // it's inefficient to re-consume from begin, but we expect this event type to be empty most of the time anyway + final List beginCursors = orderedPartitions.stream() + .map(p -> new Cursor(p, Cursor.BEFORE_OLDEST_OFFSET)) + .map(c -> cursorConverter.convert(dlqRedriveEventTypeName, c)) + .collect(Collectors.toList()); + + try (HighLevelConsumer consumer = timelineService.createEventConsumer(JOB_NAME + "-job", beginCursors)) { + while (true) { + final List events = consumer.readEvents(); + if (events.isEmpty()) { + // consumed till the current end + break; + } + events.forEach(e -> { + final Map tags = e.getConsumerTags(); + if (tags != null) { + final String sid = tags.get(HeaderTag.CONSUMER_SUBSCRIPTION_ID); + if (sid != null) { + subscriptionIds.add(sid); + } + } + }); + } + } + + final List unassignedDlqPartitions = orderedPartitions.stream() + .map(p -> new Partition(dlqRedriveEventTypeName, p, null, null, Partition.State.UNASSIGNED)) + .collect(Collectors.toList()); + + // for every subscription update the topology, if needed, so that the dlq redrive event type is included + subscriptionIds.forEach(sid -> { + try { + addDlqPartitionsToSubscription( + subscriptionRepository.getSubscription(sid), + unassignedDlqPartitions); + } catch (final Exception e) { + // subscription is already gone, but events for redrive are still there; or failed to update + // topology, etc. + LOG.warn("Failed to add DLQ redrive event type to subscription: {}", sid, e); + } + }); + } + + private void addDlqPartitionsToSubscription( + final Subscription subscription, + final List unassignedDlqPartitions) throws Exception { + + final ZkSubscriptionClient client = subscriptionClientFactory.createClient(subscription); + + // idempotent, does not overwrite existing offsets + client.createOffsetZNodes( + unassignedDlqPartitions.stream() + .map(p -> p.getPartition()) + .map(p -> new Cursor(p, Cursor.BEFORE_OLDEST_OFFSET)) + .map(c -> cursorConverter.convert(dlqRedriveEventTypeName, c)) + .map(nc -> cursorConverter.convertToNoToken(nc)) + .collect(Collectors.toList())); + + final boolean[] hasNewPartitions = {false}; + client.updateTopology(topology -> { + final var newPartitions = selectMissingPartitions(topology.getPartitions(), unassignedDlqPartitions); + if (!hasNewPartitions[0] && newPartitions.length != 0) { + hasNewPartitions[0] = true; + } + return newPartitions; + }); + + // shortcut to enable new partitions for streaming, otherwise they will stay unassigned. + // the better way is to rebalance them. + if (hasNewPartitions[0]) { + client.closeSubscriptionStreams( + () -> LOG.info("Subscription `{}` streams were closed due to addition of " + + "Nakadi DLQ Event Type", subscription.getId()), + TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout())); + } + } + + static Partition[] selectMissingPartitions( + final Partition[] initialPartitions, + final List toAddPartitions) { + + final Map assignment = + Arrays.stream(initialPartitions).collect(Collectors.toMap(k -> k.getKey(), v -> v)); + + return toAddPartitions.stream().filter(p -> !assignment.containsKey(p.getKey())) + .toArray(Partition[]::new); + } +} diff --git a/app/src/main/resources/application.yml b/app/src/main/resources/application.yml index 292913dbb2..58429b0547 100644 --- a/app/src/main/resources/application.yml +++ b/app/src/main/resources/application.yml @@ -104,11 +104,12 @@ nakadi: maxPartitions: 100 maxStreamMemoryBytes: 50000000 # ~50 MB jobs: - checkRunMs: 600000 # 10 min + checkRunMs: 30000 timelineCleanup: runPeriodMs: 3600000 # 1 hour deletionDelayMs: 2000 # 2 seconds, to be on the safe side - consumerNodesCleanup.runPeriodMs: 21600000 # 6 hours + dlqRedriveEventTypeAttach: + runPeriodMs: 30000 # same as checkRunMs http.pool.connection: max.total: 20 max.per.route: 10 @@ -139,6 +140,12 @@ nakadi: nakadiDataStreamed: "nakadi.data.streamed" owning_application: "stups_nakadi" hasher.salt: "salt" + dlq: + redriveEventTypeName: "nakadi.dead.letter.redrive" + storeEventTypeName: "nakadi.dead.letter.queue" + owningApplication: "stups_nakadi" + authDataType: "*" + authValue: "*" twintip: mapping: /api diff --git a/app/src/main/resources/dead_letter_queue_event_types.json b/app/src/main/resources/dead_letter_queue_event_types.json new file mode 100644 index 0000000000..452516e1fb --- /dev/null +++ b/app/src/main/resources/dead_letter_queue_event_types.json @@ -0,0 +1,49 @@ +[ + { + "name": "store_event_type_name_placeholder", + "owning_application": "owning_application_placeholder", + "category": "business", + "enrichment_strategies": [ + "metadata_enrichment" + ], + "partition_strategy": "random", + "cleanup_policy": "delete", + "ordering_key_fields": [], + "ordering_instance_ids": [], + "schema": { + "type": "json_schema", + "schema": "{\"type\": \"object\", \"additionalProperties\": false, \"properties\": {\"subscription_id\": {\"type\": \"string\", \"format\": \"uuid\"}, \"event_type\": {\"type\": \"string\"}, \"partition\": {\"type\": \"string\"}, \"offset\": {\"type\": \"string\"}, \"error\": {\"type\": \"object\", \"properties\": {\"message\": {\"type\": \"string\"}}, \"required\": [\"message\"]}, \"event\": {\"type\": \"object\", \"additionalProperties\": true, \"properties\": {\"metadata\": {\"type\": \"object\", \"additionalProperties\": true, \"properties\": {\"event_type\": {\"type\": \"string\"}, \"eid\": {\"type\": \"string\", \"format\": \"uuid\"}, \"occurred_at\": {\"type\": \"string\", \"format\": \"date-time\"}}, \"required\": [\"event_type\", \"eid\", \"occurred_at\"]}}, \"required\": [\"metadata\"]}}, \"required\": [\"subscription_id\", \"event_type\", \"partition\", \"offset\", \"error\", \"event\"]}" + }, + "default_statistic": { + "messages_per_minute": 1, + "message_size": 1000, + "read_parallelism": 1, + "write_parallelism": 1 + }, + "options": { + "retention_time": 86400000 + }, + "compatibility_mode": "forward", + "audience": "component-internal", + "authorization": { + "admins": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "readers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "writers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ] + } + } +] diff --git a/app/src/test/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJobTest.java b/app/src/test/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJobTest.java new file mode 100644 index 0000000000..3e8d7717d3 --- /dev/null +++ b/app/src/test/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJobTest.java @@ -0,0 +1,39 @@ +package org.zalando.nakadi.service.job; + +import org.junit.Assert; +import org.junit.Test; +import org.zalando.nakadi.service.subscription.model.Partition; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class DlqRedriveEventTypeAttachmentJobTest { + @Test + public void whenAddingDlqToTopologyThenOnlyMissingPartitionsAreAdded() { + final Partition[] existingPartitions = new Partition[] { + new Partition("abc", "0", null, null, Partition.State.UNASSIGNED), + new Partition("abc", "1", "session-1", "session-2", Partition.State.ASSIGNED), + new Partition("dlq-redrive", "0", "session-1", "session-2", Partition.State.ASSIGNED) + }; + final List unassignedDlqPartitions = List.of( + new Partition("dlq-redrive", "0", null, null, Partition.State.UNASSIGNED), + new Partition("dlq-redrive", "1", null, null, Partition.State.UNASSIGNED), + new Partition("dlq-redrive", "2", null, null, Partition.State.UNASSIGNED)); + + final Partition[] newPartitions = + DlqRedriveEventTypeAttachmentJob.selectMissingPartitions( + existingPartitions, + unassignedDlqPartitions); + + final Set newPartitionsSet = new HashSet<>(); + Collections.addAll(newPartitionsSet, newPartitions); + + Assert.assertEquals( + Set.of( + new Partition("dlq-redrive", "1", null, null, Partition.State.UNASSIGNED), + new Partition("dlq-redrive", "2", null, null, Partition.State.UNASSIGNED)), + newPartitionsSet); + } +} diff --git a/core-common/src/main/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidator.java b/core-common/src/main/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidator.java index 9d151feb6a..52e754116b 100644 --- a/core-common/src/main/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidator.java +++ b/core-common/src/main/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidator.java @@ -1,5 +1,7 @@ package org.zalando.nakadi.annotations.validation; +import org.zalando.nakadi.domain.UnprocessableEventPolicy; + import javax.validation.ConstraintValidator; import javax.validation.ConstraintValidatorContext; import java.util.Map; @@ -10,20 +12,34 @@ public class DeadLetterAnnotationValidator implements public static final String SUBSCRIPTION_MAX_EVENT_SEND_COUNT = "nakadi.zalando.org/subscription-max-event-send-count"; + public static final String SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY = + "nakadi.zalando.org/subscription-unprocessable-event-policy"; + @Override public boolean isValid(final Map annotations, final ConstraintValidatorContext context) { if (annotations == null) { return true; } + if (!isValidMaxEventSendCount(annotations, context)) { + return false; + } + if (!isValidUnprocessableEventPolicy(annotations, context)) { + return false; + } + return true; + } - final String failedCommitCount = annotations.get(SUBSCRIPTION_MAX_EVENT_SEND_COUNT); - if (failedCommitCount == null) { + private boolean isValidMaxEventSendCount( + final Map annotations, final ConstraintValidatorContext context) { + + final String maxEventSendCount = annotations.get(SUBSCRIPTION_MAX_EVENT_SEND_COUNT); + if (maxEventSendCount == null) { return true; } final Integer commits; try { - commits = Integer.valueOf(failedCommitCount); + commits = Integer.valueOf(maxEventSendCount); } catch (final NumberFormatException e) { context.disableDefaultConstraintViolation(); context.buildConstraintViolationWithTemplate(SUBSCRIPTION_MAX_EVENT_SEND_COUNT + " must be an integer") @@ -39,7 +55,50 @@ public boolean isValid(final Map annotations, final ConstraintVa return false; } + final String unprocessableEventPolicy = annotations.get(SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY); + if (unprocessableEventPolicy == null) { + context.disableDefaultConstraintViolation(); + context + .buildConstraintViolationWithTemplate( + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY + " must be also set when setting " + + SUBSCRIPTION_MAX_EVENT_SEND_COUNT) + .addConstraintViolation(); + return false; + } + return true; } + private boolean isValidUnprocessableEventPolicy( + final Map annotations, final ConstraintValidatorContext context) { + + final String unprocessableEventPolicy = annotations.get(SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY); + if (unprocessableEventPolicy == null) { + return true; + } + + try { + UnprocessableEventPolicy.valueOf(unprocessableEventPolicy); + } catch (final IllegalArgumentException e) { + context.disableDefaultConstraintViolation(); + context + .buildConstraintViolationWithTemplate( + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY + " must be one of: SKIP_EVENT, DEAD_LETTER_QUEUE") + .addConstraintViolation(); + return false; + } + + final String maxEventSendCount = annotations.get(SUBSCRIPTION_MAX_EVENT_SEND_COUNT); + if (maxEventSendCount == null) { + context.disableDefaultConstraintViolation(); + context + .buildConstraintViolationWithTemplate( + SUBSCRIPTION_MAX_EVENT_SEND_COUNT + " must be also set when setting " + + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY) + .addConstraintViolation(); + return false; + } + + return true; + } } diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/Feature.java b/core-common/src/main/java/org/zalando/nakadi/domain/Feature.java index c80c52b6a0..80e6e2930f 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/Feature.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/Feature.java @@ -19,8 +19,9 @@ public enum Feature { TOKEN_SUBSCRIPTIONS_ITERATION("token_subscription_iteration"), RETURN_BODY_ON_CREATE_UPDATE_EVENT_TYPE("return_body_on_create_update_event_type"), VALIDATE_SUBSCRIPTION_OWNING_APPLICATION("validate_subscription_owning_app"), + VALIDATE_EVENT_TYPE_OWNING_APPLICATION("validate_event_type_owning_app"), SKIP_MISPLACED_EVENTS("skip_misplaced_events"), - VALIDATE_EVENT_TYPE_OWNING_APPLICATION("validate_event_type_owning_app"); + DLQ_REDRIVE_EVENT_TYPE_ATTACHMENT_JOB("dlq_redrive_event_type_attachment_job"); private final String id; diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/UnprocessableEventPolicy.java b/core-common/src/main/java/org/zalando/nakadi/domain/UnprocessableEventPolicy.java new file mode 100644 index 0000000000..8f07b5d68c --- /dev/null +++ b/core-common/src/main/java/org/zalando/nakadi/domain/UnprocessableEventPolicy.java @@ -0,0 +1,6 @@ +package org.zalando.nakadi.domain; + +public enum UnprocessableEventPolicy { + SKIP_EVENT, + DEAD_LETTER_QUEUE +} diff --git a/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/DeadLetterQueueStoreException.java b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/DeadLetterQueueStoreException.java new file mode 100644 index 0000000000..905dee2e6c --- /dev/null +++ b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/DeadLetterQueueStoreException.java @@ -0,0 +1,12 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class DeadLetterQueueStoreException extends RuntimeException { + + public DeadLetterQueueStoreException(final String msg) { + super(msg); + } + + public DeadLetterQueueStoreException(final String msg, final Throwable cause) { + super(msg, cause); + } +} diff --git a/core-common/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java b/core-common/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java index e454f1d176..c140419ffa 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/subscription/model/Partition.java @@ -87,12 +87,12 @@ public Partition toZeroFailedCommits() { } public Partition toLastDeadLetterOffset(final String lastDeadLetterOffset) { - if ((lastDeadLetterOffset != null && this.lastDeadLetterOffset == null) || - (lastDeadLetterOffset == null && this.lastDeadLetterOffset != null)) { - return new Partition(eventType, partition, session, nextSession, state, 0, lastDeadLetterOffset); - } + return new Partition(eventType, partition, session, nextSession, state, failedCommitsCount, + lastDeadLetterOffset); + } - return this; + public Partition toCleanDeadLetterState() { + return new Partition(eventType, partition, session, nextSession, state, 0, null); } /** @@ -177,8 +177,9 @@ public String toString() { session + "->" + nextSession + ":" + failedCommitsCount + ":" + lastDeadLetterOffset; } - public String toFailedCommitString() { - return eventType + ":" + partition + ":" + failedCommitsCount + ":" + lastDeadLetterOffset; + public String toFailedCommitsTrackingString() { + return String.format("{%s:%s failedCommitsCount=%d lastDeadLetterOffset=%s}", + eventType, partition, failedCommitsCount, lastDeadLetterOffset); } @Override diff --git a/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index f75ba4e533..2a67f6de68 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -129,7 +129,8 @@ private void createSessionsZNode() throws Exception { } } - private void createOffsetZNodes(final Collection cursors) throws Exception { + @Override + public void createOffsetZNodes(final Collection cursors) throws Exception { LOG.info("Creating offsets"); for (final SubscriptionCursorWithoutToken cursor : cursors) { try { @@ -370,6 +371,7 @@ public final Optional getZkSubscriptionNode() listSessions())); } + @Override public void forceCommitOffsets(final List cursors) throws NakadiRuntimeException { try { for (final SubscriptionCursorWithoutToken cursor : cursors) { diff --git a/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java index 3375bf147f..ba80503dab 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java @@ -1,10 +1,8 @@ package org.zalando.nakadi.service.subscription.zk; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.function.Function; import org.apache.commons.codec.binary.Hex; import org.zalando.nakadi.domain.EventTypePartition; -import org.zalando.nakadi.exceptions.runtime.NakadiBaseException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; @@ -17,12 +15,15 @@ import java.io.Closeable; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.function.Function; +import java.util.stream.IntStream; public interface ZkSubscriptionClient extends Closeable { @@ -32,7 +33,7 @@ public interface ZkSubscriptionClient extends Closeable { * @return true if subscription was created. False if subscription already present. To operate on this value * additional field 'state' is used /nakadi/subscriptions/{subscriptionId}/state. Just after creation it has value * CREATED. After {{@link #fillEmptySubscription}} call it will have value INITIALIZED. So true - * will be returned in case of state is equal to CREATED. + * will be returned in case of state is equal to INITIALIZED. */ boolean isSubscriptionCreatedAndInitialized() throws NakadiRuntimeException; @@ -49,6 +50,8 @@ public interface ZkSubscriptionClient extends Closeable { */ void fillEmptySubscription(Collection cursors); + void createOffsetZNodes(Collection cursors) throws Exception; + /** * Updates topologies partitions by reading topology first and * then writing the change back with usage of zookeeper node version. @@ -175,8 +178,18 @@ void repartitionTopology(String eventTypeName, int newPartitionsCount, String of /** * Close subscription streams and perform provided action when streams are closed. * + * Specifically the steps taken are: + * + * 1. It creates a /subscriptions/{SID}/cursor_reset znode - thus signaling to all the + * consumers that they should terminate + * 2. waits for the session count on this subscription to go down to zero + * 3. executes the action + * 4. deletes the /subscriptions/{SID}/cursor_reset znode - thus making the subscription available + * to the consumers again + * * @param action perform action once streams are closed - * @param timeout wait until give up resetting + * @param timeout maximum amount of time it will wait for the session count to go down to 0. + * If exceeded an OperationTimeoutException is thrown. * @throws OperationTimeoutException * @throws ZookeeperException */ @@ -202,21 +215,18 @@ public Partition[] getPartitions() { } public Topology withUpdatedPartitions(final Partition[] partitions) { - final Partition[] resultPartitions = Arrays.copyOf(this.partitions, this.partitions.length); + final var resultPartitions = new ArrayList<>(Arrays.asList(this.partitions)); for (final Partition newValue : partitions) { - int selectedIdx = -1; - for (int idx = 0; idx < resultPartitions.length; ++idx) { - if (resultPartitions[idx].getKey().equals(newValue.getKey())) { - selectedIdx = idx; - } - } - if (selectedIdx < 0) { - throw new NakadiBaseException( - "Failed to find partition " + newValue.getKey() + " in " + this); + final var selected = IntStream.range(0, resultPartitions.size()) + .filter(idx -> resultPartitions.get(idx).getKey().equals(newValue.getKey())) + .findFirst(); + if (selected.isPresent()) { + resultPartitions.set(selected.getAsInt(), newValue); + } else { + resultPartitions.add(newValue); } - resultPartitions[selectedIdx] = newValue; } - return new Topology(resultPartitions, Optional.ofNullable(version).orElse(0) + 1); + return new Topology(resultPartitions.toArray(new Partition[0]), Optional.ofNullable(version).orElse(0) + 1); } @Override diff --git a/core-common/src/test/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidatorTest.java b/core-common/src/test/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidatorTest.java index 6e7fd9896a..62b6555ce3 100644 --- a/core-common/src/test/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidatorTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidatorTest.java @@ -1,14 +1,145 @@ package org.zalando.nakadi.annotations.validation; +import org.junit.Before; import org.junit.Assert; import org.junit.Test; +import javax.validation.ConstraintViolation; +import javax.validation.Valid; +import javax.validation.Validation; +import javax.validation.Validator; +import java.util.Collections; +import java.util.Map; + public class DeadLetterAnnotationValidatorTest { + public static class TestClass { + @Valid + @DeadLetterValidAnnotations + private final Map< + @Valid @AnnotationKey String, + @Valid @AnnotationValue String> annotations; + + public TestClass(final Map annotations) { + this.annotations = annotations; + } + } + + private Validator validator; + + @Before + public void prepareValidator() { + validator = Validation.buildDefaultValidatorFactory().getValidator(); + } + + @Test + public void whenEmptyAnnotationsThenOK() { + Assert.assertTrue("No annotations is OK", + validator.validate(new TestClass(null)).isEmpty()); + + Assert.assertTrue("Empty annotations is OK", + validator.validate(new TestClass(Collections.emptyMap())).isEmpty()); + } + + @Test + public void whenValidMaxEventSendCountThenOK() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3", + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT"); + + Assert.assertTrue("Valid max event send count is OK", + validator.validate(new TestClass(annotations)).isEmpty()); + } + + @Test + public void whenInvalidMaxEventSendCountThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "not-a-number", + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT"); + + Assert.assertTrue("Invalid max event send count is rejected", + validator.validate(new TestClass(annotations)).stream().anyMatch( + r -> r.getMessage().contains( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT))); + } + + @Test + public void whenTooLowMaxEventSendCountThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "1", + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT"); + + Assert.assertTrue("Too low max event send count is rejected", + validator.validate(new TestClass(annotations)).stream().anyMatch( + r -> r.getMessage().contains( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT))); + } + + @Test + public void whenTooHighMaxEventSendCountThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "11", + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT"); + + Assert.assertTrue("Too high max event send count is rejected", + validator.validate(new TestClass(annotations)).stream().anyMatch( + r -> r.getMessage().contains( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT))); + } @Test - public void testIsValid() { - final DeadLetterAnnotationValidator deadLetterAnnotationValidator = new DeadLetterAnnotationValidator(); - Assert.assertTrue(deadLetterAnnotationValidator.isValid(null, null)); + public void whenExplicitlySetMaxEventSendCountSetAndNoPolicyThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3"); + + Assert.assertTrue("Unprocessable event policy must be set when setting max event send count", + validator.validate(new TestClass(annotations)).stream() + .map(ConstraintViolation::getMessage) + .anyMatch(m -> + m.contains(DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT) && + m.contains(DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY))); } -} \ No newline at end of file + @Test + public void whenSkipEventPolicyThenOK() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT", + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3"); + + Assert.assertTrue("Skip event policy is OK", + validator.validate(new TestClass(annotations)).isEmpty()); + } + + @Test + public void whenDeadLetterQueuePolicyThenOK() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "DEAD_LETTER_QUEUE", + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3"); + + Assert.assertTrue("Dead letter queue policy is OK", + validator.validate(new TestClass(annotations)).isEmpty()); + } + + @Test + public void whenExplicitlySetPolicyAndNoMaxEventSendCountSetThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "DEAD_LETTER_QUEUE"); + + Assert.assertTrue("Max event send count must be set when setting unprocessable event policy", + validator.validate(new TestClass(annotations)).stream() + .map(ConstraintViolation::getMessage) + .anyMatch(m -> + m.contains(DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY) && + m.contains(DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT))); + } + + @Test + public void whenUnknownPolicyThenFail() { + final Map annotations = + Map.of(DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "unknown"); + + Assert.assertTrue("Unknown unprocessable event policy is rejected", + validator.validate(new TestClass(annotations)).stream().anyMatch( + r -> r.getMessage().contains( + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY))); + } +} diff --git a/core-common/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClientTest.java b/core-common/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClientTest.java index 5d152b03ad..d4d9601797 100644 --- a/core-common/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClientTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClientTest.java @@ -11,6 +11,55 @@ public class ZkSubscriptionClientTest { + @Test + public void testUpdatePartitionsOverridesExistingPartitions() { + final var topology = new ZkSubscriptionClient.Topology( + new Partition[] { + new Partition("event-type-1", "0", null, null, Partition.State.ASSIGNED), + new Partition("event-type-1", "1", null, null, Partition.State.ASSIGNED), + }, + 0); + + final var updatedTopology = topology.withUpdatedPartitions( + new Partition[] { + new Partition("event-type-1", "0", null, null, Partition.State.UNASSIGNED), + }); + + Assert.assertNotEquals(topology, updatedTopology); + Assert.assertArrayEquals( + new Partition[] { + new Partition("event-type-1", "0", null, null, Partition.State.UNASSIGNED), + new Partition("event-type-1", "1", null, null, Partition.State.ASSIGNED), + }, + updatedTopology.getPartitions() + ); + } + + @Test + public void testUpdatePartitionsExtendsPartitionsList() { + final var topology = new ZkSubscriptionClient.Topology( + new Partition[] { + new Partition("event-type-1", "0", null, null, Partition.State.ASSIGNED), + new Partition("event-type-1", "1", null, null, Partition.State.ASSIGNED), + }, + 0); + + final var updatedTopology = topology.withUpdatedPartitions( + new Partition[] { + new Partition("event-type-2", "0", null, null, Partition.State.UNASSIGNED), + }); + + Assert.assertNotEquals(topology, updatedTopology); + Assert.assertArrayEquals( + new Partition[] { + new Partition("event-type-1", "0", null, null, Partition.State.ASSIGNED), + new Partition("event-type-1", "1", null, null, Partition.State.ASSIGNED), + new Partition("event-type-2", "0", null, null, Partition.State.UNASSIGNED), + }, + updatedTopology.getPartitions() + ); + } + @Test public void testHashCalculationOrder() { Assert.assertEquals( diff --git a/core-services/src/main/java/org/zalando/nakadi/service/RepartitioningService.java b/core-services/src/main/java/org/zalando/nakadi/service/RepartitioningService.java index 23e73b1b5d..4bbdac6cf9 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/RepartitioningService.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/RepartitioningService.java @@ -4,6 +4,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.zalando.nakadi.cache.EventTypeCache; import org.zalando.nakadi.config.NakadiSettings; @@ -48,6 +49,7 @@ public class RepartitioningService { private final NakadiSettings nakadiSettings; private final CursorConverter cursorConverter; private final TimelineSync timelineSync; + private final String dlqRedriveEventTypeName; @Autowired public RepartitioningService( @@ -58,7 +60,8 @@ public RepartitioningService( final SubscriptionClientFactory subscriptionClientFactory, final NakadiSettings nakadiSettings, final CursorConverter cursorConverter, - final TimelineSync timelineSync) { + final TimelineSync timelineSync, + @Value("${nakadi.dlq.redriveEventTypeName}") final String dlqRedriveEventTypeName) { this.eventTypeRepository = eventTypeRepository; this.eventTypeCache = eventTypeCache; this.timelineService = timelineService; @@ -67,10 +70,18 @@ public RepartitioningService( this.nakadiSettings = nakadiSettings; this.cursorConverter = cursorConverter; this.timelineSync = timelineSync; + this.dlqRedriveEventTypeName = dlqRedriveEventTypeName; } public void repartition(final String eventTypeName, final int partitions) throws InternalNakadiException, NakadiRuntimeException { + // Workaround: don't support repartitioning the DLQ event type, since the event type is consumed by + // subscriptions implicitly (not listed in subscriptions' configuration). Current re-partitioning implementation + // won't work properly with that event type. + if (eventTypeName.equals(dlqRedriveEventTypeName)) { + throw new InvalidEventTypeException( + String.format("Repartitioning %s event type is not supported", eventTypeName)); + } if (partitions > nakadiSettings.getMaxTopicPartitionCount()) { throw new InvalidEventTypeException("Number of partitions should not be more than " + nakadiSettings.getMaxTopicPartitionCount()); diff --git a/core-services/src/main/java/org/zalando/nakadi/service/SubscriptionValidationService.java b/core-services/src/main/java/org/zalando/nakadi/service/SubscriptionValidationService.java index b790df4468..05bc7fe31f 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/SubscriptionValidationService.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/SubscriptionValidationService.java @@ -128,6 +128,9 @@ public void validatePartitionsToStream(final Subscription subscription, final Li throw new InvalidStreamParametersException("Duplicated partition specified"); } // check that partitions belong to subscription + // Note: currently, this check validates the partitions (passed by the client) against only explicit event types + // of the subscription. If the client passes partitions for implicit event types (like DLQ event type), this + // check would fail. final List allPartitions = getAllPartitions(subscription.getEventTypes()); final List wrongPartitions = partitions.stream() .filter(p -> !allPartitions.contains(p)) @@ -145,6 +148,8 @@ private void validateInitialCursors(final SubscriptionBase subscription, final List allPartitions) throws InvalidInitialCursorsException, RepositoryProblemException { + // Note: this check only verifies explicitly listed event types for a subscription. + // Implicit event types (like DLQ) are not checked here on creation of the subscription. final boolean cursorsMissing = allPartitions.stream() .anyMatch(p -> !subscription.getInitialCursors().stream().anyMatch(p::ownsCursor)); if (cursorsMissing) { diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java index 5be8b4464b..a6ee6a7fd9 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java @@ -114,6 +114,21 @@ public EventPublishResult publish(final String events, return processInternal(events, eventTypeName, consumerTags, true, false); } + // no auth checks + public EventPublishResult publishInternal(final String events, + final String eventTypeName, + final Map consumerTags) + throws NoSuchEventTypeException, + InternalNakadiException, + EnrichmentException, + EventTypeTimeoutException, + AccessDeniedException, + PublishEventOwnershipException, + ServiceTemporarilyUnavailableException, + PartitioningException { + return processInternal(events, eventTypeName, consumerTags, false, false); + } + public EventPublishResult delete(final String events, final String eventTypeName) throws NoSuchEventTypeException, InternalNakadiException,