Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Add unprocessable event policy on consumption (#1561)
Browse files Browse the repository at this point in the history
This allows the user to choose between the current implementation of skipping
the unprocessable events, or sending those events to the dead letter queue.
  • Loading branch information
a1exsh authored Oct 19, 2023
1 parent 0d50ac1 commit 1f15548
Show file tree
Hide file tree
Showing 16 changed files with 637 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.http.HttpStatus;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.core.io.DefaultResourceLoader;
Expand All @@ -33,7 +34,6 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.List;
import java.util.Map;

import static com.jayway.restassured.RestAssured.given;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN;
Expand Down Expand Up @@ -119,10 +119,10 @@ public void shouldPublishAvroAndConsumeJsonAndAvro() throws IOException {
final TestStreamingClient client1 = TestStreamingClient.create(subscription1.getId()).start();

TestUtils.waitFor(() -> Assert.assertEquals(1, client1.getJsonBatches().size()));
final Map jsonEvent = client1.getJsonBatches().get(0).getEvents().get(0);
final JSONObject jsonEvent = client1.getJsonBatches().get(0).getEvents().get(0);
Assert.assertEquals("bar", jsonEvent.get("foo"));

final Map<String, Object> metadata = (Map<String, Object>) jsonEvent.get("metadata");
final JSONObject metadata = jsonEvent.getJSONObject("metadata");
Assert.assertEquals("CE8C9EBC-3F19-4B9D-A453-08AD2EDA6028", metadata.get("eid"));
Assert.assertEquals("2.0.0", metadata.get("version"));
Assert.assertEquals(testETName, metadata.get("event_type"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.http.HttpStatus;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.zalando.nakadi.domain.Subscription;
Expand All @@ -12,7 +13,6 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static com.jayway.restassured.RestAssured.given;

Expand Down Expand Up @@ -43,7 +43,7 @@ public void testNakadiAccessLogInAvro() throws Exception {
// So the test is only looking for a valid event.
Assert.assertEquals(
NAKADI_ACCESS_LOG,
((Map) event.get("metadata")).get("event_type"));
event.getJSONObject("metadata").get("event_type"));
Assert.assertNotNull(event.get("method"));
Assert.assertNotNull(event.get("path"));
Assert.assertNotNull(event.get("query"));
Expand All @@ -56,7 +56,7 @@ public void testNakadiAccessLogInAvro() throws Exception {
Assert.assertNotNull(event.get("content_encoding"));
Assert.assertNotNull(event.get("request_length"));
Assert.assertNotNull(event.get("response_length"));
Assert.assertNull(event.get("random_key"));
Assert.assertFalse(event.has("random_key"));
}

@Test
Expand All @@ -75,7 +75,7 @@ public void testNakadiSubscriptionLogInAvro() throws Exception {
// So the test is only looking for a valid event.
Assert.assertEquals(
NAKADI_SUBSCRIPTION_LOG,
((Map) event.get("metadata")).get("event_type"));
event.getJSONObject("metadata").get("event_type"));
Assert.assertEquals("created", event.get("status"));
Assert.assertNotNull(event.get("subscription_id"));
}
Expand All @@ -96,7 +96,7 @@ public void testNakadiEventTypeLogInAvro() throws Exception {
// So the test is only looking for a valid event.
Assert.assertEquals(
NAKADI_EVENT_TYPE_LOG,
((Map) event.get("metadata")).get("event_type"));
event.getJSONObject("metadata").get("event_type"));
Assert.assertNotNull(event.get("event_type"));
Assert.assertNotNull(event.get("status"));
Assert.assertNotNull(event.get("category"));
Expand All @@ -120,7 +120,7 @@ public void testNakadiBatchPublishedInAvro() throws Exception {
// So the test is only looking for a valid event.
Assert.assertEquals(
NAKADI_BATCH_PUBLISHED,
((Map) event.get("metadata")).get("event_type"));
event.getJSONObject("metadata").get("event_type"));
Assert.assertNotNull(event.get("event_type"));
Assert.assertNotNull(event.get("app"));
Assert.assertNotNull(event.get("app_hashed"));
Expand All @@ -145,7 +145,7 @@ public void testNakadiDataStreamedInAvro() throws Exception {
final var event = events.get(0);
// All acceptance tests are run against same instance, so the exact event that is consumed is unpredictable.
// So the test is only looking for a valid event.
final var metadata = (Map) event.get("metadata");
final var metadata = event.getJSONObject("metadata");
Assert.assertEquals(NAKADI_DATA_STREAMED, metadata.get("event_type"));
Assert.assertNotNull(metadata.get("occurred_at"));
Assert.assertNotNull(metadata.get("received_at"));
Expand All @@ -162,7 +162,7 @@ public void testNakadiDataStreamedInAvro() throws Exception {
Assert.assertNotNull(event.get("batches_streamed"));
}

private List<Map> consumeEvent(final TestStreamingClient client) {
private List<JSONObject> consumeEvent(final TestStreamingClient client) {
TestUtils.waitFor(() -> MatcherAssert.assertThat(
client.getJsonBatches().size(), Matchers.greaterThanOrEqualTo(1)), 10000);
return client.getJsonBatches().get(0).getEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import com.jayway.restassured.RestAssured;
import com.jayway.restassured.http.ContentType;
import com.jayway.restassured.response.Header;
import com.jayway.restassured.specification.RequestSpecification;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -29,7 +29,6 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.stream.LongStream.rangeClosed;
Expand All @@ -51,7 +50,6 @@
import static org.zalando.nakadi.utils.TestUtils.randomUUID;
import static org.zalando.nakadi.utils.TestUtils.randomValidEventTypeName;
import static org.zalando.nakadi.utils.TestUtils.waitFor;
import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;

Expand Down Expand Up @@ -351,11 +349,11 @@ public void userJourneyHila() throws InterruptedException, IOException {
final SubscriptionCursor cursor = new SubscriptionCursor("0", TestUtils.toTimelineOffset(i),
eventTypeName, "");
final StreamBatch expectedBatch = new StreamBatch(cursor,
ImmutableList.of(ImmutableMap.of("foo", "bar" + i)),
new JSONArray().put(new JSONObject().put("foo", "bar" + i)),
i == 0 ? new StreamMetadata("Stream started") : null);

final StreamBatch batch = batches.get(i);
assertThat(batch, equalToBatchIgnoringToken(expectedBatch));
assertThat(batch, StreamBatch.equalToBatchIgnoringToken(expectedBatch));
}

// as we didn't commit, there should be still 4 unconsumed events
Expand Down Expand Up @@ -450,8 +448,9 @@ public void userJourneyAvroTransition() throws InterruptedException, IOException

// validate the events metadata
for (final StreamBatch batch : batches) {
final Map<String, String> metadata = (Map<String, String>) batch.getEvents().get(0).get("metadata");
assertThat(metadata.get("version"), equalTo(validatedWithJsonSchemaVersion));
assertThat(
batch.getEvents().get(0).getJSONObject("metadata").getString("version"),
equalTo(validatedWithJsonSchemaVersion));
}

// delete subscription
Expand Down
Loading

0 comments on commit 1f15548

Please sign in to comment.