Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
[FEATURE] DLQ redrive and supporting changes (#1568)
Browse files Browse the repository at this point in the history
* Add periodic exclusive job to attach DLQ redrive event type (#1558)

* Subscription cursor api: allow submitting implicit event types (DLQ redrive event type) cursors.

* Allow adding new event types in subscription's topology updates

* Workaround: disallow re-partitioning the DLQ redrive event type.

* Feed events one by one until the end of problematic batch (#1566)

The current behaviour that Auto DLQ goes back to normal mode once it found one
bad event.  It can happen that there are more events after that bad one.
Current behaviour will create longer bad events identification time.

* Add unprocessable event policy on consumption

This allows the user to choose between the current implementation of skipping
the unprocessable events, or sending those events to the dead letter queue.

* More readable DLQ debug information (#1569)

* Dlq cursor reset (#1574)

* Fix to not override existing last dead letter offset

* Workaround: reset subscriptions' running sessions on attaching DLQ partitions to subscriptions. This is a workaround to make subscription consume from the DLQ partitions on demand.

---------

Co-authored-by: Aleksey Pak <[email protected]>
Co-authored-by: Andrey <[email protected]>
Co-authored-by: ssaha <[email protected]>
Co-authored-by: Filippo Ghibellini <[email protected]>
Co-authored-by: BakaCoder <[email protected]>
Co-authored-by: ap <[email protected]>
  • Loading branch information
7 people authored Dec 8, 2023
1 parent 2a77a31 commit 5dd37c6
Show file tree
Hide file tree
Showing 28 changed files with 1,289 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.http.HttpStatus;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.core.io.DefaultResourceLoader;
Expand All @@ -33,7 +34,6 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.List;
import java.util.Map;

import static com.jayway.restassured.RestAssured.given;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN;
Expand Down Expand Up @@ -119,10 +119,10 @@ public void shouldPublishAvroAndConsumeJsonAndAvro() throws IOException {
final TestStreamingClient client1 = TestStreamingClient.create(subscription1.getId()).start();

TestUtils.waitFor(() -> Assert.assertEquals(1, client1.getJsonBatches().size()));
final Map jsonEvent = client1.getJsonBatches().get(0).getEvents().get(0);
final JSONObject jsonEvent = client1.getJsonBatches().get(0).getEvents().get(0);
Assert.assertEquals("bar", jsonEvent.get("foo"));

final Map<String, Object> metadata = (Map<String, Object>) jsonEvent.get("metadata");
final JSONObject metadata = jsonEvent.getJSONObject("metadata");
Assert.assertEquals("CE8C9EBC-3F19-4B9D-A453-08AD2EDA6028", metadata.get("eid"));
Assert.assertEquals("2.0.0", metadata.get("version"));
Assert.assertEquals(testETName, metadata.get("event_type"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.http.HttpStatus;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.zalando.nakadi.domain.Subscription;
Expand All @@ -12,7 +13,6 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static com.jayway.restassured.RestAssured.given;

Expand Down Expand Up @@ -43,7 +43,7 @@ public void testNakadiAccessLogInAvro() throws Exception {
// So the test is only looking for a valid event.
Assert.assertEquals(
NAKADI_ACCESS_LOG,
((Map) event.get("metadata")).get("event_type"));
event.getJSONObject("metadata").get("event_type"));
Assert.assertNotNull(event.get("method"));
Assert.assertNotNull(event.get("path"));
Assert.assertNotNull(event.get("query"));
Expand All @@ -56,7 +56,7 @@ public void testNakadiAccessLogInAvro() throws Exception {
Assert.assertNotNull(event.get("content_encoding"));
Assert.assertNotNull(event.get("request_length"));
Assert.assertNotNull(event.get("response_length"));
Assert.assertNull(event.get("random_key"));
Assert.assertFalse(event.has("random_key"));
}

@Test
Expand All @@ -75,7 +75,7 @@ public void testNakadiSubscriptionLogInAvro() throws Exception {
// So the test is only looking for a valid event.
Assert.assertEquals(
NAKADI_SUBSCRIPTION_LOG,
((Map) event.get("metadata")).get("event_type"));
event.getJSONObject("metadata").get("event_type"));
Assert.assertEquals("created", event.get("status"));
Assert.assertNotNull(event.get("subscription_id"));
}
Expand All @@ -96,7 +96,7 @@ public void testNakadiEventTypeLogInAvro() throws Exception {
// So the test is only looking for a valid event.
Assert.assertEquals(
NAKADI_EVENT_TYPE_LOG,
((Map) event.get("metadata")).get("event_type"));
event.getJSONObject("metadata").get("event_type"));
Assert.assertNotNull(event.get("event_type"));
Assert.assertNotNull(event.get("status"));
Assert.assertNotNull(event.get("category"));
Expand All @@ -120,7 +120,7 @@ public void testNakadiBatchPublishedInAvro() throws Exception {
// So the test is only looking for a valid event.
Assert.assertEquals(
NAKADI_BATCH_PUBLISHED,
((Map) event.get("metadata")).get("event_type"));
event.getJSONObject("metadata").get("event_type"));
Assert.assertNotNull(event.get("event_type"));
Assert.assertNotNull(event.get("app"));
Assert.assertNotNull(event.get("app_hashed"));
Expand All @@ -145,7 +145,7 @@ public void testNakadiDataStreamedInAvro() throws Exception {
final var event = events.get(0);
// All acceptance tests are run against same instance, so the exact event that is consumed is unpredictable.
// So the test is only looking for a valid event.
final var metadata = (Map) event.get("metadata");
final var metadata = event.getJSONObject("metadata");
Assert.assertEquals(NAKADI_DATA_STREAMED, metadata.get("event_type"));
Assert.assertNotNull(metadata.get("occurred_at"));
Assert.assertNotNull(metadata.get("received_at"));
Expand All @@ -162,7 +162,7 @@ public void testNakadiDataStreamedInAvro() throws Exception {
Assert.assertNotNull(event.get("batches_streamed"));
}

private List<Map> consumeEvent(final TestStreamingClient client) {
private List<JSONObject> consumeEvent(final TestStreamingClient client) {
TestUtils.waitFor(() -> MatcherAssert.assertThat(
client.getJsonBatches().size(), Matchers.greaterThanOrEqualTo(1)), 10000);
return client.getJsonBatches().get(0).getEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import com.jayway.restassured.RestAssured;
import com.jayway.restassured.http.ContentType;
import com.jayway.restassured.response.Header;
import com.jayway.restassured.specification.RequestSpecification;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -29,7 +29,6 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.stream.LongStream.rangeClosed;
Expand All @@ -51,7 +50,6 @@
import static org.zalando.nakadi.utils.TestUtils.randomUUID;
import static org.zalando.nakadi.utils.TestUtils.randomValidEventTypeName;
import static org.zalando.nakadi.utils.TestUtils.waitFor;
import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;

Expand Down Expand Up @@ -351,11 +349,11 @@ public void userJourneyHila() throws InterruptedException, IOException {
final SubscriptionCursor cursor = new SubscriptionCursor("0", TestUtils.toTimelineOffset(i),
eventTypeName, "");
final StreamBatch expectedBatch = new StreamBatch(cursor,
ImmutableList.of(ImmutableMap.of("foo", "bar" + i)),
new JSONArray().put(new JSONObject().put("foo", "bar" + i)),
i == 0 ? new StreamMetadata("Stream started") : null);

final StreamBatch batch = batches.get(i);
assertThat(batch, equalToBatchIgnoringToken(expectedBatch));
assertThat(batch, StreamBatch.equalToBatchIgnoringToken(expectedBatch));
}

// as we didn't commit, there should be still 4 unconsumed events
Expand Down Expand Up @@ -450,8 +448,9 @@ public void userJourneyAvroTransition() throws InterruptedException, IOException

// validate the events metadata
for (final StreamBatch batch : batches) {
final Map<String, String> metadata = (Map<String, String>) batch.getEvents().get(0).get("metadata");
assertThat(metadata.get("version"), equalTo(validatedWithJsonSchemaVersion));
assertThat(
batch.getEvents().get(0).getJSONObject("metadata").getString("version"),
equalTo(validatedWithJsonSchemaVersion));
}

// delete subscription
Expand Down
Loading

0 comments on commit 5dd37c6

Please sign in to comment.