From 9dabfa286a129f829c80dcdc8a1059c6d3e0cb94 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 15 Dec 2016 15:05:51 +0100 Subject: [PATCH 01/37] ARUHA-473 Reject batches with at least one event that's too large --- .../nakadi/domain/EventPublishingStep.java | 1 + .../EventSizeValidationException.java | 21 +++++++++++++++++ .../nakadi/service/EventPublisher.java | 23 +++++++++++++++++++ 3 files changed, 45 insertions(+) create mode 100644 src/main/java/org/zalando/nakadi/exceptions/EventSizeValidationException.java diff --git a/src/main/java/org/zalando/nakadi/domain/EventPublishingStep.java b/src/main/java/org/zalando/nakadi/domain/EventPublishingStep.java index 522c41bc8b..13069982fc 100644 --- a/src/main/java/org/zalando/nakadi/domain/EventPublishingStep.java +++ b/src/main/java/org/zalando/nakadi/domain/EventPublishingStep.java @@ -4,6 +4,7 @@ public enum EventPublishingStep { NONE, VALIDATING, ENRICHING, + VALIDATING_SIZE, PARTITIONING, PUBLISHING, } diff --git a/src/main/java/org/zalando/nakadi/exceptions/EventSizeValidationException.java b/src/main/java/org/zalando/nakadi/exceptions/EventSizeValidationException.java new file mode 100644 index 0000000000..fd9ee955a8 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/EventSizeValidationException.java @@ -0,0 +1,21 @@ +package org.zalando.nakadi.exceptions; + +import org.zalando.nakadi.validation.ValidationError; +import org.zalando.problem.MoreStatus; + +import javax.ws.rs.core.Response; + +public class EventSizeValidationException extends NakadiException { + public EventSizeValidationException(final String message) { + super(message); + } + + public EventSizeValidationException (final ValidationError validationError) { + super(validationError.getMessage()); + } + + @Override + protected Response.StatusType getStatus() { + return MoreStatus.UNPROCESSABLE_ENTITY; + } +} diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 4f27ac18e3..cc21ffc16f 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -16,6 +16,7 @@ import org.zalando.nakadi.enrichment.Enrichment; import org.zalando.nakadi.exceptions.EnrichmentException; import org.zalando.nakadi.exceptions.EventPublishingException; +import org.zalando.nakadi.exceptions.EventSizeValidationException; import org.zalando.nakadi.exceptions.EventValidationException; import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; @@ -37,6 +38,8 @@ public class EventPublisher { private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class); + private static final long MAX_EVENT_SIZE_BYTES = 1000000; + private final TopicRepository topicRepository; private final EventTypeCache eventTypeCache; private final PartitionResolver partitionResolver; @@ -64,6 +67,7 @@ public EventPublishResult publish(final JSONArray events, final String eventType validate(batch, eventType); partition(batch, eventType); enrich(batch, eventType); + validateSize(batch); submit(batch, eventType); return ok(batch); @@ -76,6 +80,9 @@ public EventPublishResult publish(final JSONArray events, final String eventType } catch (final EnrichmentException e) { LOG.debug("Event enrichment error: {}", e.getMessage()); return aborted(EventPublishingStep.ENRICHING, batch); + } catch (final EventSizeValidationException e) { + LOG.debug("Event size validation error: ){", e.getMessage()); + return aborted(EventPublishingStep.VALIDATING_SIZE, batch); } catch (final EventPublishingException e) { LOG.error("error publishing event", e); return failed(batch); @@ -145,6 +152,22 @@ private void validateSchema(final JSONObject event, final EventType eventType) t } } + private void validateSize(final List batch) throws EventSizeValidationException { + for (final BatchItem item: batch) { + item.setStep(EventPublishingStep.VALIDATING_SIZE); + try { + validateEventSize(item); + } catch (final EventSizeValidationException e) { + item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage()); + } + } + } + + private void validateEventSize(final BatchItem item) throws EventSizeValidationException { + if (item.getEvent().toString().getBytes().length > MAX_EVENT_SIZE_BYTES) + throw new EventSizeValidationException("Event too large"); + } + private EventPublishResult failed(final List batch) { return new EventPublishResult(EventPublishingStatus.FAILED, EventPublishingStep.PUBLISHING, responses(batch)); } From cc50d0cc1a7f436181b73d7190d11e5e787d80ac Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 15 Dec 2016 15:31:25 +0100 Subject: [PATCH 02/37] ARUHA-473 Move maximum event size to nakadi settings --- .../nakadi/repository/kafka/KafkaRepositoryAT.java | 4 +++- .../java/org/zalando/nakadi/config/NakadiSettings.java | 9 ++++++++- .../org/zalando/nakadi/service/EventPublisher.java | 9 ++++++--- src/main/resources/application.yml | 1 + .../nakadi/controller/EventTypeControllerTest.java | 3 ++- .../org/zalando/nakadi/service/EventPublisherTest.java | 10 +++++++++- 6 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index 56a9985240..8acb8fd389 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -54,6 +54,7 @@ public class KafkaRepositoryAT extends BaseAT { private static final int KAFKA_REQUEST_TIMEOUT = 30000; private static final int KAFKA_BATCH_SIZE = 1048576; private static final long KAFKA_LINGER_MS = 0; + private static final long NAKADI_EVENT_MAX_BYTES = 1000000L; private NakadiSettings nakadiSettings; private KafkaSettings kafkaSettings; @@ -73,7 +74,8 @@ public void setup() { DEFAULT_TOPIC_ROTATION, DEFAULT_COMMIT_TIMEOUT, NAKADI_POLL_TIMEOUT, - NAKADI_SEND_TIMEOUT); + NAKADI_SEND_TIMEOUT, + NAKADI_EVENT_MAX_BYTES); kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS); zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT); kafkaHelper = new KafkaTestHelper(KAFKA_URL); diff --git a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java index 0970159cde..116e0f1a55 100644 --- a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java +++ b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java @@ -15,6 +15,7 @@ public class NakadiSettings { private final long defaultCommitTimeoutSeconds; private final long kafkaPollTimeoutMs; private final long kafkaSendTimeoutMs; + private final long eventMaxBytes; @Autowired public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTopicPartitionCount, @@ -24,7 +25,8 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo @Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs, @Value("${nakadi.stream.default.commitTimeout}") final long defaultCommitTimeoutSeconds, @Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs, - @Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs) { + @Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs, + @Value("${nakadi.event.max.bytes}") final long eventMaxBytes) { this.maxTopicPartitionCount = maxTopicPartitionCount; this.defaultTopicPartitionCount = defaultTopicPartitionCount; this.defaultTopicReplicaFactor = defaultTopicReplicaFactor; @@ -33,6 +35,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo this.defaultCommitTimeoutSeconds = defaultCommitTimeoutSeconds; this.kafkaPollTimeoutMs = kafkaPollTimeoutMs; this.kafkaSendTimeoutMs = kafkaSendTimeoutMs; + this.eventMaxBytes = eventMaxBytes; } public int getDefaultTopicPartitionCount() { @@ -67,4 +70,8 @@ public long getKafkaSendTimeoutMs() { return kafkaSendTimeoutMs; } + public long getEventMaxBytes() { + return eventMaxBytes; + } + } diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index cc21ffc16f..65287f47c1 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.BatchFactory; import org.zalando.nakadi.domain.BatchItem; import org.zalando.nakadi.domain.BatchItemResponse; @@ -38,7 +39,7 @@ public class EventPublisher { private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class); - private static final long MAX_EVENT_SIZE_BYTES = 1000000; + private final NakadiSettings nakadiSettings; private final TopicRepository topicRepository; private final EventTypeCache eventTypeCache; @@ -49,11 +50,13 @@ public class EventPublisher { public EventPublisher(final TopicRepository topicRepository, final EventTypeCache eventTypeCache, final PartitionResolver partitionResolver, - final Enrichment enrichment) { + final Enrichment enrichment, + final NakadiSettings nakadiSettings) { this.topicRepository = topicRepository; this.eventTypeCache = eventTypeCache; this.partitionResolver = partitionResolver; this.enrichment = enrichment; + this.nakadiSettings = nakadiSettings; } public EventPublishResult publish(final JSONArray events, final String eventTypeName, final Client client) @@ -164,7 +167,7 @@ private void validateSize(final List batch) throws EventSizeValidatio } private void validateEventSize(final BatchItem item) throws EventSizeValidationException { - if (item.getEvent().toString().getBytes().length > MAX_EVENT_SIZE_BYTES) + if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes()) throw new EventSizeValidationException("Event too large"); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 5eb815be8d..9e913f04bf 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -78,6 +78,7 @@ nakadi: auth: plugin: factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory + event.max.bytes: 1000000 --- spring: diff --git a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java index e317188304..0ec9d9b6ac 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTest.java @@ -88,6 +88,7 @@ public class EventTypeControllerTest { private static final long TOPIC_RETENTION_TIME_MS = 150; private static final int NAKADI_SEND_TIMEOUT = 10000; private static final int NAKADI_POLL_TIMEOUT = 10000; + private static final long NAKADI_EVENT_MAX_BYTES = 1000000; private final EventTypeRepository eventTypeRepository = mock(EventTypeRepository.class); private final TopicRepository topicRepository = mock(TopicRepository.class); private final PartitionResolver partitionResolver = mock(PartitionResolver.class); @@ -111,7 +112,7 @@ public void init() throws Exception { final EventTypeOptionsValidator eventTypeOptionsValidator = new EventTypeOptionsValidator(TOPIC_RETENTION_MIN_MS, TOPIC_RETENTION_MAX_MS); final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, - NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT); + NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, NAKADI_EVENT_MAX_BYTES); final EventTypeController controller = new EventTypeController(eventTypeService, featureToggleService, eventTypeOptionsValidator, applicationService, nakadiSettings); diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index 10358f398a..4c12080d76 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -5,6 +5,7 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.BatchItemResponse; import org.zalando.nakadi.domain.EventPublishResult; import org.zalando.nakadi.domain.EventPublishingStatus; @@ -52,11 +53,18 @@ public class EventPublisherTest { public static final String CLIENT_ID = "clientId"; private static final Client FULL_ACCESS_CLIENT = new FullAccessClient(CLIENT_ID); + private static final int NAKADI_SEND_TIMEOUT = 10000; + private static final int NAKADI_POLL_TIMEOUT = 10000; + private static final long NAKADI_EVENT_MAX_BYTES = 1000000; + private static final long TOPIC_RETENTION_TIME_MS = 150; + private final TopicRepository topicRepository = mock(TopicRepository.class); private final EventTypeCache cache = mock(EventTypeCache.class); private final PartitionResolver partitionResolver = mock(PartitionResolver.class); private final Enrichment enrichment = mock(Enrichment.class); - private final EventPublisher publisher = new EventPublisher(topicRepository, cache, partitionResolver, enrichment); + private final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, + TOPIC_RETENTION_TIME_MS, 0, 60, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, NAKADI_EVENT_MAX_BYTES); + private final EventPublisher publisher = new EventPublisher(topicRepository, cache, partitionResolver, enrichment, nakadiSettings); @Test public void whenPublishIsSuccessfulThenResultIsSubmitted() throws Exception { From 761d455a9a4e5945c5e1c93daaa6413c0325ece5 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 15 Dec 2016 18:33:47 +0100 Subject: [PATCH 03/37] ARUHA-473 Unit tests for max event size --- .../nakadi/service/EventPublisher.java | 1 + .../nakadi/service/EventPublisherTest.java | 186 +++++++++++++++++- .../org/zalando/nakadi/utils/TestUtils.java | 22 ++- 3 files changed, 205 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 65287f47c1..65b91eca4c 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -162,6 +162,7 @@ private void validateSize(final List batch) throws EventSizeValidatio validateEventSize(item); } catch (final EventSizeValidationException e) { item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage()); + throw e; } } } diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index 4c12080d76..da771c24b8 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -46,6 +46,8 @@ import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; import static org.zalando.nakadi.utils.TestUtils.createBatch; import static org.zalando.nakadi.utils.TestUtils.randomString; +import static org.zalando.nakadi.utils.TestUtils.randomStringOfLength; +import static org.zalando.nakadi.utils.TestUtils.randomValidStringOfLength; public class EventPublisherTest { @@ -55,7 +57,7 @@ public class EventPublisherTest { private static final int NAKADI_SEND_TIMEOUT = 10000; private static final int NAKADI_POLL_TIMEOUT = 10000; - private static final long NAKADI_EVENT_MAX_BYTES = 1000000; + private static final int NAKADI_EVENT_MAX_BYTES = 900; private static final long TOPIC_RETENTION_TIME_MS = 150; private final TopicRepository topicRepository = mock(TopicRepository.class); @@ -64,7 +66,8 @@ public class EventPublisherTest { private final Enrichment enrichment = mock(Enrichment.class); private final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, NAKADI_EVENT_MAX_BYTES); - private final EventPublisher publisher = new EventPublisher(topicRepository, cache, partitionResolver, enrichment, nakadiSettings); + private final EventPublisher publisher = new EventPublisher(topicRepository, cache, partitionResolver, + enrichment, nakadiSettings); @Test public void whenPublishIsSuccessfulThenResultIsSubmitted() throws Exception { @@ -135,6 +138,147 @@ public void whenValidationFailsThenSubsequentItemsAreAborted() throws Exception verify(cache, times(1)).getValidator(any()); } + @Test + public void whenEventIsTooLargeThenResultIsAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildLargeBatch(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + verify(enrichment, times(1)).enrich(any(), any()); + verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(topicRepository, times(0)).syncPostBatch(any(), any()); + } + + @Test + public void whenEventIsTooLargeThenAllItemsAreAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildDefaultBatch(1); + final JSONObject largeEvent = new JSONObject(); + largeEvent.put("foo", randomStringOfLength(10000)); + batch.put(largeEvent); + final JSONObject smallEvent = new JSONObject(); + smallEvent.put("foo", randomString()); + batch.put(smallEvent); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + + final BatchItemResponse firstResponse = result.getResponses().get(0); + assertThat(firstResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(firstResponse.getStep(), equalTo(EventPublishingStep.VALIDATING_SIZE)); + assertThat(firstResponse.getDetail(), is(isEmptyString())); + + final BatchItemResponse secondResponse = result.getResponses().get(1); + assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING_SIZE)); + assertThat(secondResponse.getDetail(), equalTo("Event too large")); + + final BatchItemResponse thirdResponse = result.getResponses().get(2); + assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.ENRICHING)); + assertThat(thirdResponse.getDetail(), is(isEmptyString())); + } + + @Test + public void whenEnrichmentMakesEventTooLargeThenAllItemsAreAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildDefaultBatch(1); + final JSONObject largeEvent = new JSONObject(); + largeEvent.put("foo", randomStringOfLength(880)); + batch.put(largeEvent); + final JSONObject smallEvent = new JSONObject(); + smallEvent.put("foo", randomString()); + batch.put(smallEvent); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + + final BatchItemResponse firstResponse = result.getResponses().get(0); + assertThat(firstResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(firstResponse.getStep(), equalTo(EventPublishingStep.VALIDATING_SIZE)); + assertThat(firstResponse.getDetail(), is(isEmptyString())); + + final BatchItemResponse secondResponse = result.getResponses().get(1); + assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING_SIZE)); + assertThat(secondResponse.getDetail(), equalTo("Event too large")); + + final BatchItemResponse thirdResponse = result.getResponses().get(2); + assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.ENRICHING)); + assertThat(thirdResponse.getDetail(), is(isEmptyString())); + } + + @Test + public void whenEventIsExactlyMaxSizeThenResultIsSuccess() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildMaxSizeBatch(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); + verify(enrichment, times(1)).enrich(any(), any()); + verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(topicRepository, times(1)).syncPostBatch(any(), any()); + } + + @Test + public void whenEventIsOneByteOverMaxSizeThenResultIsAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildOneByteTooLargeBatch(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + verify(enrichment, times(1)).enrich(any(), any()); + verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(topicRepository, times(0)).syncPostBatch(any(), any()); + } + + @Test + public void whenEventIsOneByteOverMaxSizeWithMultiByteCharsThenResultIsAborted() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildOneByteTooLargeBatchMultiByte(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); + verify(enrichment, times(1)).enrich(any(), any()); + verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(topicRepository, times(0)).syncPostBatch(any(), any()); + } + + @Test + public void whenEventIsExactlyMaxSizeWithMultiByteCharsThenResultIsSuccess() throws Exception { + final EventType eventType = buildDefaultEventType(); + final JSONArray batch = buildMaxSizeBatchMultiByte(1); + + mockSuccessfulValidation(eventType); + + final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + + assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); + verify(enrichment, times(1)).enrich(any(), any()); + verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(topicRepository, times(1)).syncPostBatch(any(), any()); + } + @Test public void whenPartitionFailsThenResultIsAborted() throws Exception { final EventType eventType = buildDefaultEventType(); @@ -332,11 +476,47 @@ private void mockSuccessfulValidation(final EventType eventType) throws Exceptio } private JSONArray buildDefaultBatch(final int numberOfEvents) { + return buildBatch(numberOfEvents, 50); + } + + private JSONArray buildLargeBatch(final int numberOfEvents) { + return buildBatch(numberOfEvents, NAKADI_EVENT_MAX_BYTES + 100); + } + + private JSONArray buildMaxSizeBatch(final int numberOfEvents) { + return buildBatch(numberOfEvents, NAKADI_EVENT_MAX_BYTES); + } + + private JSONArray buildOneByteTooLargeBatch(final int numberOfEvents) { + return buildBatch(numberOfEvents, NAKADI_EVENT_MAX_BYTES + 1); + } + + private JSONArray buildMaxSizeBatchMultiByte(final int numberOfEvents) { + return buildBatchMultiByte(numberOfEvents, NAKADI_EVENT_MAX_BYTES); + } + + private JSONArray buildOneByteTooLargeBatchMultiByte(final int numberOfEvents) { + return buildBatchMultiByte(numberOfEvents, NAKADI_EVENT_MAX_BYTES + 1); + } + + private JSONArray buildBatchMultiByte(final int numberOfEvents, final int length) { final List events = new ArrayList<>(); + final int valueLength = length - 16; // each character 2 lines below is 3 bytes + for (int i = 0; i < numberOfEvents; i++) { + final JSONObject event = new JSONObject(); + event.put("foo", randomValidStringOfLength(valueLength) + "温泉"); + events.add(event); + } + return new JSONArray(events); + } + + private JSONArray buildBatch(final int numberOfEvents, final int length) { + final List events = new ArrayList<>(); + final int valueLength = length - 10; // the brackets, key, and quotation marks take 10 characters for (int i = 0; i < numberOfEvents; i++) { final JSONObject event = new JSONObject(); - event.put("foo", randomString()); + event.put("foo", randomValidStringOfLength(valueLength)); events.add(event); } diff --git a/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/src/test/java/org/zalando/nakadi/utils/TestUtils.java index 9380452d7c..d08533e1f6 100644 --- a/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -54,7 +54,7 @@ public static String randomTextString() { } public static String randomString() { - final int length = RANDOM.nextInt(500); + final int length = RANDOM.nextInt(100); String s = ""; @@ -79,6 +79,26 @@ public static String randomString(final String validChars) { } + public static String randomStringOfLength(final int length) { + final StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < length; i++) { + sb.append((char) RANDOM.nextInt(128)); + } + + return sb.toString(); + } + + public static String randomValidStringOfLength(final int length) { + final StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < length; i++) { + sb.append(VALID_EVENT_TYPE_NAME_CHARS.charAt(RANDOM.nextInt(VALID_EVENT_TYPE_NAME_CHARS.length()))); + } + + return sb.toString(); + } + public static String randomValidEventTypeName() { return String.format("%s.%s", randomString(VALID_EVENT_TYPE_NAME_CHARS), randomString(VALID_EVENT_TYPE_NAME_CHARS)); From 146d6a0a4f2deca9c5bfa191589e91780e3ae2bf Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 10:52:48 +0100 Subject: [PATCH 04/37] ARUHA-473 Acceptance test for max event size --- .../nakadi/webservice/EventPublishingAT.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java new file mode 100644 index 0000000000..c60116b2ca --- /dev/null +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java @@ -0,0 +1,37 @@ +package org.zalando.nakadi.webservice; + +import com.jayway.restassured.http.ContentType; +import com.jayway.restassured.response.Response; +import org.apache.http.HttpStatus; +import org.junit.Test; +import org.zalando.nakadi.domain.EventType; +import org.zalando.nakadi.webservice.utils.NakadiTestUtils; + +import java.text.MessageFormat; + +import static com.jayway.restassured.RestAssured.given; + +public class EventPublishingAT extends BaseAT { + + @Test + public void whenPublishingEventTooLargeThen422() throws Exception { + final EventType eventType = NakadiTestUtils.createEventType(); + + publishLargeEvent(eventType) + .then() + .statusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY); + } + + private Response publishLargeEvent(final EventType eventType) { + StringBuilder sb = new StringBuilder(); + sb.append("[{\"blah\":\""); + for (int i = 0; i < 1000010; i++) { + sb.append("a"); + } + sb.append("\"}]"); + return given() + .body(sb.toString()) + .contentType(ContentType.JSON) + .post(MessageFormat.format("/event-types/{0}/events", eventType.getName())); + } +} From 8849782ee25c1a07ccb3c827a283141f87f02d11 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 11:13:25 +0100 Subject: [PATCH 05/37] ARUHA-473 Fix checkstyle --- .../java/org/zalando/nakadi/webservice/EventPublishingAT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java index c60116b2ca..b63abef394 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java @@ -23,7 +23,7 @@ public void whenPublishingEventTooLargeThen422() throws Exception { } private Response publishLargeEvent(final EventType eventType) { - StringBuilder sb = new StringBuilder(); + final StringBuilder sb = new StringBuilder(); sb.append("[{\"blah\":\""); for (int i = 0; i < 1000010; i++) { sb.append("a"); From a06139221c7c0cd45bc7c4798b8b1ae7bd083551 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 11:54:37 +0100 Subject: [PATCH 06/37] ARUHA-473 Move size validation after validation, remove VALIDATING_SIZE step --- .../nakadi/domain/EventPublishingStep.java | 1 - .../nakadi/service/EventPublisher.java | 10 ++++---- .../nakadi/service/EventPublisherTest.java | 24 +++++++++---------- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/domain/EventPublishingStep.java b/src/main/java/org/zalando/nakadi/domain/EventPublishingStep.java index 13069982fc..522c41bc8b 100644 --- a/src/main/java/org/zalando/nakadi/domain/EventPublishingStep.java +++ b/src/main/java/org/zalando/nakadi/domain/EventPublishingStep.java @@ -4,7 +4,6 @@ public enum EventPublishingStep { NONE, VALIDATING, ENRICHING, - VALIDATING_SIZE, PARTITIONING, PUBLISHING, } diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 65b91eca4c..642f7cc65e 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -68,24 +68,24 @@ public EventPublishResult publish(final JSONArray events, final String eventType try { validate(batch, eventType); + validateSize(batch); partition(batch, eventType); enrich(batch, eventType); - validateSize(batch); submit(batch, eventType); return ok(batch); } catch (final EventValidationException e) { LOG.debug("Event validation error: {}", e.getMessage()); return aborted(EventPublishingStep.VALIDATING, batch); + } catch (final EventSizeValidationException e) { + LOG.debug("Event size validation error: ){", e.getMessage()); + return aborted(EventPublishingStep.VALIDATING, batch); } catch (final PartitioningException e) { LOG.debug("Event partition error: {}", e.getMessage()); return aborted(EventPublishingStep.PARTITIONING, batch); } catch (final EnrichmentException e) { LOG.debug("Event enrichment error: {}", e.getMessage()); return aborted(EventPublishingStep.ENRICHING, batch); - } catch (final EventSizeValidationException e) { - LOG.debug("Event size validation error: ){", e.getMessage()); - return aborted(EventPublishingStep.VALIDATING_SIZE, batch); } catch (final EventPublishingException e) { LOG.error("error publishing event", e); return failed(batch); @@ -157,7 +157,7 @@ private void validateSchema(final JSONObject event, final EventType eventType) t private void validateSize(final List batch) throws EventSizeValidationException { for (final BatchItem item: batch) { - item.setStep(EventPublishingStep.VALIDATING_SIZE); + item.setStep(EventPublishingStep.VALIDATING); try { validateEventSize(item); } catch (final EventSizeValidationException e) { diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index da771c24b8..231330d466 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -148,8 +148,8 @@ public void whenEventIsTooLargeThenResultIsAborted() throws Exception { final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); - verify(enrichment, times(1)).enrich(any(), any()); - verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(enrichment, times(0)).enrich(any(), any()); + verify(partitionResolver, times(0)).resolvePartition(any(), any()); verify(topicRepository, times(0)).syncPostBatch(any(), any()); } @@ -172,17 +172,17 @@ public void whenEventIsTooLargeThenAllItemsAreAborted() throws Exception { final BatchItemResponse firstResponse = result.getResponses().get(0); assertThat(firstResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); - assertThat(firstResponse.getStep(), equalTo(EventPublishingStep.VALIDATING_SIZE)); + assertThat(firstResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); assertThat(firstResponse.getDetail(), is(isEmptyString())); final BatchItemResponse secondResponse = result.getResponses().get(1); assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); - assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING_SIZE)); + assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); assertThat(secondResponse.getDetail(), equalTo("Event too large")); final BatchItemResponse thirdResponse = result.getResponses().get(2); assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); - assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.ENRICHING)); + assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); assertThat(thirdResponse.getDetail(), is(isEmptyString())); } @@ -205,17 +205,17 @@ public void whenEnrichmentMakesEventTooLargeThenAllItemsAreAborted() throws Exce final BatchItemResponse firstResponse = result.getResponses().get(0); assertThat(firstResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); - assertThat(firstResponse.getStep(), equalTo(EventPublishingStep.VALIDATING_SIZE)); + assertThat(firstResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); assertThat(firstResponse.getDetail(), is(isEmptyString())); final BatchItemResponse secondResponse = result.getResponses().get(1); assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); - assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING_SIZE)); + assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); assertThat(secondResponse.getDetail(), equalTo("Event too large")); final BatchItemResponse thirdResponse = result.getResponses().get(2); assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); - assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.ENRICHING)); + assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); assertThat(thirdResponse.getDetail(), is(isEmptyString())); } @@ -244,8 +244,8 @@ public void whenEventIsOneByteOverMaxSizeThenResultIsAborted() throws Exception final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); - verify(enrichment, times(1)).enrich(any(), any()); - verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(enrichment, times(0)).enrich(any(), any()); + verify(partitionResolver, times(0)).resolvePartition(any(), any()); verify(topicRepository, times(0)).syncPostBatch(any(), any()); } @@ -259,8 +259,8 @@ public void whenEventIsOneByteOverMaxSizeWithMultiByteCharsThenResultIsAborted() final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); - verify(enrichment, times(1)).enrich(any(), any()); - verify(partitionResolver, times(1)).resolvePartition(any(), any()); + verify(enrichment, times(0)).enrich(any(), any()); + verify(partitionResolver, times(0)).resolvePartition(any(), any()); verify(topicRepository, times(0)).syncPostBatch(any(), any()); } From 0a791da5ab7ee644797d845445df532d21516361 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 12:06:18 +0100 Subject: [PATCH 07/37] ARUHA-473 Move size validation inside validation --- .../nakadi/service/EventPublisher.java | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 642f7cc65e..dde3d49a2d 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -68,7 +68,6 @@ public EventPublishResult publish(final JSONArray events, final String eventType try { validate(batch, eventType); - validateSize(batch); partition(batch, eventType); enrich(batch, eventType); submit(batch, eventType); @@ -124,14 +123,18 @@ private void partition(final List batch, final EventType eventType) t } private void validate(final List batch, final EventType eventType) throws EventValidationException, - InternalNakadiException { + EventSizeValidationException, InternalNakadiException { for (final BatchItem item : batch) { item.setStep(EventPublishingStep.VALIDATING); try { validateSchema(item.getEvent(), eventType); + validateEventSize(item); } catch (final EventValidationException e) { item.updateStatusAndDetail(EventPublishingStatus.FAILED, e.getMessage()); throw e; + } catch (final EventSizeValidationException e) { + item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage()); + throw e; } } } @@ -155,18 +158,6 @@ private void validateSchema(final JSONObject event, final EventType eventType) t } } - private void validateSize(final List batch) throws EventSizeValidationException { - for (final BatchItem item: batch) { - item.setStep(EventPublishingStep.VALIDATING); - try { - validateEventSize(item); - } catch (final EventSizeValidationException e) { - item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage()); - throw e; - } - } - } - private void validateEventSize(final BatchItem item) throws EventSizeValidationException { if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes()) throw new EventSizeValidationException("Event too large"); From df5b675d645fc83bfdba29751d29b8f85e8160e1 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 13:03:17 +0100 Subject: [PATCH 08/37] ARUHA-473 Fix bug in test wrt event publishing step --- .../java/org/zalando/nakadi/service/EventPublisherTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index 231330d466..3fe784570a 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -182,7 +182,7 @@ public void whenEventIsTooLargeThenAllItemsAreAborted() throws Exception { final BatchItemResponse thirdResponse = result.getResponses().get(2); assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); - assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); + assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.NONE)); assertThat(thirdResponse.getDetail(), is(isEmptyString())); } @@ -215,7 +215,7 @@ public void whenEnrichmentMakesEventTooLargeThenAllItemsAreAborted() throws Exce final BatchItemResponse thirdResponse = result.getResponses().get(2); assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); - assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); + assertThat(thirdResponse.getStep(), equalTo(EventPublishingStep.NONE)); assertThat(thirdResponse.getDetail(), is(isEmptyString())); } From e261cb8c3df45b965b1161e9cf9eebb7bf4aee9a Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 13:10:13 +0100 Subject: [PATCH 09/37] ARUHA-473 Refactor string building --- .../org/zalando/nakadi/webservice/EventPublishingAT.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java index b63abef394..1ab67fbaf2 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java @@ -2,6 +2,7 @@ import com.jayway.restassured.http.ContentType; import com.jayway.restassured.response.Response; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpStatus; import org.junit.Test; import org.zalando.nakadi.domain.EventType; @@ -25,9 +26,7 @@ public void whenPublishingEventTooLargeThen422() throws Exception { private Response publishLargeEvent(final EventType eventType) { final StringBuilder sb = new StringBuilder(); sb.append("[{\"blah\":\""); - for (int i = 0; i < 1000010; i++) { - sb.append("a"); - } + sb.append(StringUtils.repeat("a", 1000010)); sb.append("\"}]"); return given() .body(sb.toString()) From 217be7f35e120416ce012be6f811b66863a46355 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 13:11:02 +0100 Subject: [PATCH 10/37] ARUHA-473 Fix log string --- src/main/java/org/zalando/nakadi/service/EventPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index dde3d49a2d..4946c194ce 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -77,7 +77,7 @@ public EventPublishResult publish(final JSONArray events, final String eventType LOG.debug("Event validation error: {}", e.getMessage()); return aborted(EventPublishingStep.VALIDATING, batch); } catch (final EventSizeValidationException e) { - LOG.debug("Event size validation error: ){", e.getMessage()); + LOG.debug("Event size validation error: {}", e.getMessage()); return aborted(EventPublishingStep.VALIDATING, batch); } catch (final PartitioningException e) { LOG.debug("Event partition error: {}", e.getMessage()); From ddd7a4fffd4b7e0b099a1c6a1ba82862bbd19002 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 13:20:58 +0100 Subject: [PATCH 11/37] ARUHA-473 Set capacity for StringBuilder --- .../java/org/zalando/nakadi/webservice/EventPublishingAT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java index 1ab67fbaf2..26893e686b 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java @@ -24,7 +24,7 @@ public void whenPublishingEventTooLargeThen422() throws Exception { } private Response publishLargeEvent(final EventType eventType) { - final StringBuilder sb = new StringBuilder(); + final StringBuilder sb = new StringBuilder(1000023); sb.append("[{\"blah\":\""); sb.append(StringUtils.repeat("a", 1000010)); sb.append("\"}]"); From 957938b9bb4fa56ad95daeff1ea4d9c5d847e56f Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 13:44:35 +0100 Subject: [PATCH 12/37] ARUHA-473 If an event is too large, that event is marked FAILED. Other events in the batch are marked ABORTED --- src/main/java/org/zalando/nakadi/service/EventPublisher.java | 5 +---- .../java/org/zalando/nakadi/service/EventPublisherTest.java | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 4946c194ce..f8dd8a1ce5 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -129,12 +129,9 @@ private void validate(final List batch, final EventType eventType) th try { validateSchema(item.getEvent(), eventType); validateEventSize(item); - } catch (final EventValidationException e) { + } catch (final EventValidationException|EventSizeValidationException e) { item.updateStatusAndDetail(EventPublishingStatus.FAILED, e.getMessage()); throw e; - } catch (final EventSizeValidationException e) { - item.updateStatusAndDetail(EventPublishingStatus.ABORTED, e.getMessage()); - throw e; } } } diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index 3fe784570a..d8a418e229 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -176,7 +176,7 @@ public void whenEventIsTooLargeThenAllItemsAreAborted() throws Exception { assertThat(firstResponse.getDetail(), is(isEmptyString())); final BatchItemResponse secondResponse = result.getResponses().get(1); - assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); assertThat(secondResponse.getDetail(), equalTo("Event too large")); @@ -209,7 +209,7 @@ public void whenEnrichmentMakesEventTooLargeThenAllItemsAreAborted() throws Exce assertThat(firstResponse.getDetail(), is(isEmptyString())); final BatchItemResponse secondResponse = result.getResponses().get(1); - assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); + assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); assertThat(secondResponse.getDetail(), equalTo("Event too large")); From 49c184652e46d3748b1a8067d55297ec07c48fec Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 16 Dec 2016 13:51:31 +0100 Subject: [PATCH 13/37] ARUHA-473 Remove redundant Exception --- .../EventSizeValidationException.java | 21 ------------------- .../nakadi/service/EventPublisher.java | 12 ++++------- 2 files changed, 4 insertions(+), 29 deletions(-) delete mode 100644 src/main/java/org/zalando/nakadi/exceptions/EventSizeValidationException.java diff --git a/src/main/java/org/zalando/nakadi/exceptions/EventSizeValidationException.java b/src/main/java/org/zalando/nakadi/exceptions/EventSizeValidationException.java deleted file mode 100644 index fd9ee955a8..0000000000 --- a/src/main/java/org/zalando/nakadi/exceptions/EventSizeValidationException.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.zalando.nakadi.exceptions; - -import org.zalando.nakadi.validation.ValidationError; -import org.zalando.problem.MoreStatus; - -import javax.ws.rs.core.Response; - -public class EventSizeValidationException extends NakadiException { - public EventSizeValidationException(final String message) { - super(message); - } - - public EventSizeValidationException (final ValidationError validationError) { - super(validationError.getMessage()); - } - - @Override - protected Response.StatusType getStatus() { - return MoreStatus.UNPROCESSABLE_ENTITY; - } -} diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index f8dd8a1ce5..010103b849 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -17,7 +17,6 @@ import org.zalando.nakadi.enrichment.Enrichment; import org.zalando.nakadi.exceptions.EnrichmentException; import org.zalando.nakadi.exceptions.EventPublishingException; -import org.zalando.nakadi.exceptions.EventSizeValidationException; import org.zalando.nakadi.exceptions.EventValidationException; import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; @@ -76,9 +75,6 @@ public EventPublishResult publish(final JSONArray events, final String eventType } catch (final EventValidationException e) { LOG.debug("Event validation error: {}", e.getMessage()); return aborted(EventPublishingStep.VALIDATING, batch); - } catch (final EventSizeValidationException e) { - LOG.debug("Event size validation error: {}", e.getMessage()); - return aborted(EventPublishingStep.VALIDATING, batch); } catch (final PartitioningException e) { LOG.debug("Event partition error: {}", e.getMessage()); return aborted(EventPublishingStep.PARTITIONING, batch); @@ -123,13 +119,13 @@ private void partition(final List batch, final EventType eventType) t } private void validate(final List batch, final EventType eventType) throws EventValidationException, - EventSizeValidationException, InternalNakadiException { + InternalNakadiException { for (final BatchItem item : batch) { item.setStep(EventPublishingStep.VALIDATING); try { validateSchema(item.getEvent(), eventType); validateEventSize(item); - } catch (final EventValidationException|EventSizeValidationException e) { + } catch (final EventValidationException e) { item.updateStatusAndDetail(EventPublishingStatus.FAILED, e.getMessage()); throw e; } @@ -155,9 +151,9 @@ private void validateSchema(final JSONObject event, final EventType eventType) t } } - private void validateEventSize(final BatchItem item) throws EventSizeValidationException { + private void validateEventSize(final BatchItem item) throws EventValidationException { if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes()) - throw new EventSizeValidationException("Event too large"); + throw new EventValidationException("Event too large"); } private EventPublishResult failed(final List batch) { From cc15d1a388a1e4ed3c906e761bbc3f83ba92fc49 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Tue, 20 Dec 2016 14:11:45 +0100 Subject: [PATCH 14/37] ARUHA-473 Explicit charset in getBytes() --- src/main/java/org/zalando/nakadi/service/EventPublisher.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 010103b849..7585b5392e 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -28,6 +28,7 @@ import org.zalando.nakadi.validation.EventTypeValidator; import org.zalando.nakadi.validation.ValidationError; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -152,7 +153,7 @@ private void validateSchema(final JSONObject event, final EventType eventType) t } private void validateEventSize(final BatchItem item) throws EventValidationException { - if (item.getEvent().toString().getBytes().length > nakadiSettings.getEventMaxBytes()) + if (item.getEvent().toString().getBytes(StandardCharsets.UTF_8).length > nakadiSettings.getEventMaxBytes()) throw new EventValidationException("Event too large"); } From 3f9bf87c9841500d67d15cebe38493b04e9bcf7a Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 23 Dec 2016 10:48:10 +0100 Subject: [PATCH 15/37] ARUHA-473 Refactor event size computation --- src/main/java/org/zalando/nakadi/domain/BatchItem.java | 5 +++++ src/main/java/org/zalando/nakadi/service/EventPublisher.java | 3 +-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/domain/BatchItem.java b/src/main/java/org/zalando/nakadi/domain/BatchItem.java index f194b69ec3..7193907d9a 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchItem.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchItem.java @@ -3,6 +3,7 @@ import org.json.JSONObject; import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; import java.util.Optional; public class BatchItem { @@ -58,4 +59,8 @@ public void updateStatusAndDetail(final EventPublishingStatus publishingStatus, response.setPublishingStatus(publishingStatus); response.setDetail(detail); } + + public int getEventSize() { + return event.toString().getBytes(StandardCharsets.UTF_8).length; + } } \ No newline at end of file diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 7585b5392e..948f44c2c5 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -28,7 +28,6 @@ import org.zalando.nakadi.validation.EventTypeValidator; import org.zalando.nakadi.validation.ValidationError; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -153,7 +152,7 @@ private void validateSchema(final JSONObject event, final EventType eventType) t } private void validateEventSize(final BatchItem item) throws EventValidationException { - if (item.getEvent().toString().getBytes(StandardCharsets.UTF_8).length > nakadiSettings.getEventMaxBytes()) + if (item.getEventSize() > nakadiSettings.getEventMaxBytes()) throw new EventValidationException("Event too large"); } From 6f7a45087c0c7b6cd1a4bcc772ca4f1911300abe Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Tue, 27 Dec 2016 18:24:43 +0100 Subject: [PATCH 16/37] ARUHA-473 Explicitely set Kafka consumers' fetch.message.max.bytes property --- .../nakadi/repository/kafka/KafkaRepositoryAT.java | 4 +++- .../nakadi/repository/kafka/KafkaLocationManager.java | 4 +++- .../zalando/nakadi/repository/kafka/KafkaSettings.java | 9 ++++++++- src/main/resources/application.yml | 1 + 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index 8acb8fd389..7590e90aa7 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -55,6 +55,7 @@ public class KafkaRepositoryAT extends BaseAT { private static final int KAFKA_BATCH_SIZE = 1048576; private static final long KAFKA_LINGER_MS = 0; private static final long NAKADI_EVENT_MAX_BYTES = 1000000L; + private static final int KAFKA_FETCH_MESSAGE_MAX_BYTES = 2000000; private NakadiSettings nakadiSettings; private KafkaSettings kafkaSettings; @@ -76,7 +77,8 @@ public void setup() { NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, NAKADI_EVENT_MAX_BYTES); - kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS); + kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS, + KAFKA_FETCH_MESSAGE_MAX_BYTES); zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT); kafkaHelper = new KafkaTestHelper(KAFKA_URL); kafkaTopicRepository = createKafkaTopicRepository(); diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java index 90ecf8f9e4..cdd1db3355 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java @@ -99,7 +99,9 @@ private void updateBrokers() { } public Properties getKafkaConsumerProperties() { - return (Properties) kafkaProperties.clone(); + final Properties consumerProps = (Properties) kafkaProperties.clone(); + consumerProps.put("fetch.message.max.bytes", kafkaSettings.getFetchMessageMaxBytes()); + return consumerProps; } public Properties getKafkaProducerProperties() { diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java index ed27c69f96..8f23fd0eef 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java @@ -16,14 +16,17 @@ public class KafkaSettings { // /kafka/clients/producer/ProducerConfig.java#L232 private final int batchSize; private final long lingerMs; + private final int fetchMessageMaxBytes; @Autowired public KafkaSettings(@Value("${nakadi.kafka.request.timeout.ms}") final int requestTimeoutMs, @Value("${nakadi.kafka.batch.size}") final int batchSize, - @Value("${nakadi.kafka.linger.ms}") final long lingerMs) { + @Value("${nakadi.kafka.linger.ms}") final long lingerMs, + @Value("${nakadi.kafka.fetch.message.max.bytes}") final int fetchMessageMaxBytes) { this.requestTimeoutMs = requestTimeoutMs; this.batchSize = batchSize; this.lingerMs = lingerMs; + this.fetchMessageMaxBytes = fetchMessageMaxBytes; } public int getRequestTimeoutMs() { @@ -37,4 +40,8 @@ public int getBatchSize() { public long getLingerMs() { return lingerMs; } + + public int getFetchMessageMaxBytes() { + return fetchMessageMaxBytes; + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9e913f04bf..f88219d016 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -59,6 +59,7 @@ nakadi: send.timeoutMs: 5000 batch.size: 5242880 linger.ms: 0 + fetch.message.max.bytes: 2000000 zookeeper: kafkaNamespace: brokers: 127.0.0.1:2181 From f3158365fe0e670e8479eb5581103ae62e0b4439 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Wed, 28 Dec 2016 10:39:34 +0100 Subject: [PATCH 17/37] ARUHA-473 Add curly braces --- src/main/java/org/zalando/nakadi/service/EventPublisher.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 948f44c2c5..a8455217c6 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -152,8 +152,9 @@ private void validateSchema(final JSONObject event, final EventType eventType) t } private void validateEventSize(final BatchItem item) throws EventValidationException { - if (item.getEventSize() > nakadiSettings.getEventMaxBytes()) + if (item.getEventSize() > nakadiSettings.getEventMaxBytes()) { throw new EventValidationException("Event too large"); + } } private EventPublishResult failed(final List batch) { From 8871dea4fdae03c06ddedfc99e9eaaf5c270e083 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Wed, 28 Dec 2016 10:48:18 +0100 Subject: [PATCH 18/37] ARUHA-473 Explicit event size limit in swagger file --- api/nakadi-event-bus-api.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api/nakadi-event-bus-api.yaml b/api/nakadi-event-bus-api.yaml index 506f495bba..b314c25c1b 100644 --- a/api/nakadi-event-bus-api.yaml +++ b/api/nakadi-event-bus-api.yaml @@ -316,7 +316,9 @@ paths: incoming Events. Validation rules are evaluated in the order they are defined and the Event is **rejected** in the first case of failure. If the offending validation rule provides information about the violation it will be included in the `BatchItemResponse`. If the - `EventType` defines schema validation it will be performed at this moment. + `EventType` defines schema validation it will be performed at this moment. The size of each + Event will also be validated. The maximum size per Event is 1,000,000 bytes, before + enrichment. 1. Once the validation succeeded, the content of the Event is updated according to the enrichment rules in the order the rules are defined in the `EventType`. No preexisting From f5e6367c266336d33074c3b3f62114d79489a54c Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 30 Dec 2016 11:30:02 +0100 Subject: [PATCH 19/37] ARUHA-473 BatchItem constructor for String event, adding event size --- src/main/java/org/zalando/nakadi/domain/BatchItem.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/zalando/nakadi/domain/BatchItem.java b/src/main/java/org/zalando/nakadi/domain/BatchItem.java index 7193907d9a..0444f25811 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchItem.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchItem.java @@ -11,6 +11,7 @@ public class BatchItem { private final JSONObject event; private String partition; private String brokerId; + private int eventSize; public BatchItem(final JSONObject event) { this.response = new BatchItemResponse(); @@ -21,6 +22,11 @@ public BatchItem(final JSONObject event) { .ifPresent(this.response::setEid); } + public BatchItem(final String event) { + this(new JSONObject(event)); + eventSize = event.getBytes(StandardCharsets.UTF_8).length; + } + public JSONObject getEvent() { return this.event; } @@ -61,6 +67,6 @@ public void updateStatusAndDetail(final EventPublishingStatus publishingStatus, } public int getEventSize() { - return event.toString().getBytes(StandardCharsets.UTF_8).length; + return eventSize; } } \ No newline at end of file From d5e6f3d31f6417aeb958158bc6a7c4b1aa33c70c Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 30 Dec 2016 11:37:27 +0100 Subject: [PATCH 20/37] ARUHA-473 Test BatchItem size with multi-byte characters --- .../org/zalando/nakadi/domain/BatchItemTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 src/test/java/org/zalando/nakadi/domain/BatchItemTest.java diff --git a/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java b/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java new file mode 100644 index 0000000000..cccf8d8cd1 --- /dev/null +++ b/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java @@ -0,0 +1,14 @@ +package org.zalando.nakadi.domain; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class BatchItemTest { + + @Test + public void testBatchItemSizeWithMultByteChar() { + BatchItem item = new BatchItem("{ \"name\": \"香港\"} "); + assertEquals(20, item.getEventSize()); + } +} From 12253ed898c9f69e2e5b509ea1002443485b6b69 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 30 Dec 2016 13:59:57 +0100 Subject: [PATCH 21/37] ARUHA-473 Fix test style --- src/test/java/org/zalando/nakadi/domain/BatchItemTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java b/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java index cccf8d8cd1..1a39c5fc8b 100644 --- a/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java +++ b/src/test/java/org/zalando/nakadi/domain/BatchItemTest.java @@ -8,7 +8,7 @@ public class BatchItemTest { @Test public void testBatchItemSizeWithMultByteChar() { - BatchItem item = new BatchItem("{ \"name\": \"香港\"} "); + final BatchItem item = new BatchItem("{ \"name\": \"香港\"} "); assertEquals(20, item.getEventSize()); } } From 30da84aaa864d95ed8618ec1d6d9aa9ccadbf4e9 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 30 Dec 2016 14:00:33 +0100 Subject: [PATCH 22/37] ARUHA-473 BatchFactory method to process batches as Strings --- .../zalando/nakadi/domain/BatchFactory.java | 29 ++++++++++ .../nakadi/domain/BatchFactoryTest.java | 53 +++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java diff --git a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java index 4a64faf5d1..816521c063 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java @@ -16,4 +16,33 @@ public static List from(final JSONArray events) { return batch; } + public static List from(final String events) { + final List batch = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + int brackets = 0; + for (int i = 1; i < events.length() - 1; i++) { + if (events.charAt(i) == '{') { + brackets++; + } + if (events.charAt(i) == '}') { + brackets--; + } + if (!((brackets == 0) && (events.charAt(i) == ','))) { + sb.append(events.charAt(i)); + } + if (brackets == 0) { + if (sb.length() > 0) { + batch.add(new BatchItem(sb.toString())); + } + sb = new StringBuilder(); + } + } + + if (sb.length() != 0) { + batch.add(new BatchItem(sb.toString())); + } + + return batch; + } + } diff --git a/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java new file mode 100644 index 0000000000..8b620cea6c --- /dev/null +++ b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java @@ -0,0 +1,53 @@ +package org.zalando.nakadi.domain; + +import org.json.JSONException; +import org.junit.Test; + +import java.util.List; + +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertEquals; + +public class BatchFactoryTest { + + @Test + public void testOneEvent() { + final String events = "[{\"name\":\"MyEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(1, batch.size()); + assertEquals(18, batch.get(0).getEventSize()); + assertEquals("{\"name\":\"MyEvent\"}", batch.get(0).getEvent().toString()); + } + + @Test + public void testMultipleEvents() { + final String events = "[{\"name\":\"MyEvent\"},{\"name\":\"MyOtherEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + assertEquals(18, batch.get(0).getEventSize()); + assertEquals(23, batch.get(1).getEventSize()); + assertEquals("{\"name\":\"MyEvent\"}", batch.get(0).getEvent().toString()); + assertEquals("{\"name\":\"MyOtherEvent\"}", batch.get(1).getEvent().toString()); + } + + @Test + public void testNestedArrays() { + final String events = "[{\"name\":\"MyEvent\", \"array\":[{\"developer\": \"Ricardo\"}," + + "{\"developer\": \"Sergii\"},{\"field\":[\"hello\",\"world\"]}]},{\"name\":\"MyOtherEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + assertEquals("{\"array\":[{\"developer\":\"Ricardo\"},{\"developer\":\"Sergii\"}," + + "{\"field\":[\"hello\",\"world\"]}],\"name\":\"MyEvent\"}", + batch.get(0).getEvent().toString()); + assertEquals("{\"name\":\"MyOtherEvent\"}", batch.get(1).getEvent().toString()); + } + + @Test + public void testMalformedJSON() { + final String events = "[{\"hello\":\"world\",]"; + try { + BatchFactory.from(events); + fail(); + } catch (JSONException e) {} + } +} From 7287c409637664de446dec743adc70f2be214926 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Tue, 3 Jan 2017 14:52:18 +0100 Subject: [PATCH 23/37] ARUHA-473 Use String when publishing events to improve event size checking. The max event size is computer based on the events as sent by producers. The creation of JSONObjects is pushed down to the BatchItem factory (instead of EventPublishingController). --- .../repository/kafka/KafkaRepositoryAT.java | 3 +- .../controller/EventPublishingController.java | 4 +- .../zalando/nakadi/domain/BatchFactory.java | 55 ++++++--- .../org/zalando/nakadi/domain/BatchItem.java | 12 +- .../nakadi/service/EventPublisher.java | 3 +- .../EventPublishingControllerTest.java | 9 +- .../nakadi/domain/BatchFactoryTest.java | 43 +++++++ .../nakadi/enrichment/EnrichmentTest.java | 4 +- .../MetadataEnrichmentStrategyTest.java | 25 ++-- .../kafka/KafkaTopicRepositoryTest.java | 11 +- .../nakadi/service/EventPublisherTest.java | 113 ++++++++++++------ .../org/zalando/nakadi/utils/TestUtils.java | 6 +- 12 files changed, 197 insertions(+), 91 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index 7590e90aa7..c993351e65 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -8,7 +8,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; -import org.json.JSONObject; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -137,7 +136,7 @@ public void whenDeleteTopicThenTopicIsDeleted() throws Exception { @Test(timeout = 10000) public void whenBulkSendSuccessfullyThenUpdateBatchItemStatus() throws Exception { final List items = new ArrayList<>(); - final JSONObject event = new JSONObject(); + final String event = "{}"; final String topicId = TestUtils.randomValidEventTypeName(); kafkaHelper.createTopic(topicId, ZOOKEEPER_URL); diff --git a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java index d3133ee2f5..06aa164366 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java @@ -84,7 +84,9 @@ private ResponseEntity postEventInternal(final String eventTypeName, final int eventCount = eventsAsJsonObjects.length(); eventTypeMetrics.reportSizing(eventCount, eventsAsString.length()); - return response(publisher.publish(eventsAsJsonObjects, eventTypeName, client)); + final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, client); + + return response(result); } catch (final JSONException e) { LOG.debug("Problem parsing event", e); return processJSONException(e, nativeWebRequest); diff --git a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java index 816521c063..eb9e6586e4 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java @@ -1,40 +1,57 @@ package org.zalando.nakadi.domain; -import org.json.JSONArray; +import org.json.JSONException; import java.util.ArrayList; import java.util.List; public class BatchFactory { - public static List from(final JSONArray events) { - final List batch = new ArrayList<>(events.length()); - for (int i = 0; i < events.length(); i++) { - batch.add(new BatchItem(events.getJSONObject(i))); - } - - return batch; - } - public static List from(final String events) { final List batch = new ArrayList<>(); StringBuilder sb = new StringBuilder(); int brackets = 0; + boolean insideQuote = false; + boolean escaped = false; + if ((!events.startsWith("[")) || (!events.endsWith("]"))) { + throw new JSONException("Array must be surrounded with square brackets"); + } for (int i = 1; i < events.length() - 1; i++) { - if (events.charAt(i) == '{') { - brackets++; + if (!escaped && events.charAt(i) == '"') { + if (insideQuote) + insideQuote = false; + else + insideQuote = true; } - if (events.charAt(i) == '}') { - brackets--; + if (escaped) { + sb.append(events.charAt(i)); + escaped = false; } - if (!((brackets == 0) && (events.charAt(i) == ','))) { + else if (!escaped && events.charAt(i) == '\\') { sb.append(events.charAt(i)); + escaped = true; } - if (brackets == 0) { - if (sb.length() > 0) { - batch.add(new BatchItem(sb.toString())); + else if (insideQuote) { + sb.append(events.charAt(i)); + } else { + if (events.charAt(i) == '{') { + brackets++; + } + if (events.charAt(i) == '}') { + brackets--; + } + if (!((brackets == 0) && (events.charAt(i) == ','))) { + sb.append(events.charAt(i)); + } + if (brackets == 0 && (events.charAt(i) != ' ') + && (events.charAt(i) != '\t' + && (events.charAt(i) != '\n') + && (events.charAt(i) != '\r'))) { + if (sb.length() > 0) { + batch.add(new BatchItem(sb.toString())); + } + sb = new StringBuilder(); } - sb = new StringBuilder(); } } diff --git a/src/main/java/org/zalando/nakadi/domain/BatchItem.java b/src/main/java/org/zalando/nakadi/domain/BatchItem.java index 0444f25811..2b55c02840 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchItem.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchItem.java @@ -13,20 +13,16 @@ public class BatchItem { private String brokerId; private int eventSize; - public BatchItem(final JSONObject event) { + public BatchItem(final String event) { + this.event = new JSONObject(event); + this.eventSize = event.getBytes(StandardCharsets.UTF_8).length; this.response = new BatchItemResponse(); - this.event = event; - Optional.ofNullable(event.optJSONObject("metadata")) + Optional.ofNullable(this.event.optJSONObject("metadata")) .map(e -> e.optString("eid", null)) .ifPresent(this.response::setEid); } - public BatchItem(final String event) { - this(new JSONObject(event)); - eventSize = event.getBytes(StandardCharsets.UTF_8).length; - } - public JSONObject getEvent() { return this.event; } diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index a8455217c6..1c92600fc1 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.service; -import org.json.JSONArray; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +57,7 @@ public EventPublisher(final TopicRepository topicRepository, this.nakadiSettings = nakadiSettings; } - public EventPublishResult publish(final JSONArray events, final String eventTypeName, final Client client) + public EventPublishResult publish(final String events, final String eventTypeName, final Client client) throws NoSuchEventTypeException, InternalNakadiException { final EventType eventType = eventTypeCache.getEventType(eventTypeName); final List batch = BatchFactory.from(events); diff --git a/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java index 87415e618f..2ff8601369 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java @@ -2,7 +2,6 @@ import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; -import org.json.JSONArray; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -91,7 +90,7 @@ public void whenResultIsSubmittedThen200() throws Exception { Mockito .doReturn(result) .when(publisher) - .publish(any(JSONArray.class), eq(TOPIC), any(Client.class)); + .publish(any(String.class), eq(TOPIC), any(Client.class)); postBatch(TOPIC, EVENT_BATCH) .andExpect(status().isOk()) @@ -114,7 +113,7 @@ public void whenResultIsAbortedThen422() throws Exception { Mockito .doReturn(result) .when(publisher) - .publish(any(JSONArray.class), eq(TOPIC), any(Client.class)); + .publish(any(String.class), eq(TOPIC), any(Client.class)); postBatch(TOPIC, EVENT_BATCH) .andExpect(status().isUnprocessableEntity()) @@ -128,7 +127,7 @@ public void whenResultIsAbortedThen207() throws Exception { Mockito .doReturn(result) .when(publisher) - .publish(any(JSONArray.class), eq(TOPIC), any(Client.class)); + .publish(any(String.class), eq(TOPIC), any(Client.class)); postBatch(TOPIC, EVENT_BATCH) .andExpect(status().isMultiStatus()) @@ -140,7 +139,7 @@ public void whenEventTypeNotFoundThen404() throws Exception { Mockito .doThrow(NoSuchEventTypeException.class) .when(publisher) - .publish(any(JSONArray.class), eq(TOPIC), any(Client.class)); + .publish(any(String.class), eq(TOPIC), any(Client.class)); postBatch(TOPIC, EVENT_BATCH) .andExpect(content().contentType("application/problem+json")) diff --git a/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java index 8b620cea6c..5b233b829f 100644 --- a/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java +++ b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java @@ -50,4 +50,47 @@ public void testMalformedJSON() { fail(); } catch (JSONException e) {} } + + @Test + public void testEscapedQuotation() { + final String events = "[{\"hello\":\"wor\\\"ld\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(1, batch.size()); + assertEquals("{\"hello\":\"wor\\\"ld\"}", batch.get(0).getEvent().toString()); + } + + @Test + public void testEscapedBrackets() { + final String events = "[{\"hello\":\"wor\\\\}ld\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(1, batch.size()); + assertEquals("{\"hello\":\"wor\\\\}ld\"}", batch.get(0).getEvent().toString()); + } + + @Test + public void testEmptyEvent() { + final String events = "[{\"name\":\"MyEvent\"},,,,{\"name\":\"MyOtherEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + assertEquals("{\"name\":\"MyEvent\"}", batch.get(0).getEvent().toString()); + assertEquals("{\"name\":\"MyOtherEvent\"}", batch.get(1).getEvent().toString()); + } + + @Test + public void testSpacesBetweenEvents() { + final String events = "[{\"name\":\"MyEvent\"}, {\"name\":\"MyOtherEvent\"}]"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + assertEquals("{\"name\":\"MyEvent\"}", batch.get(0).getEvent().toString()); + assertEquals("{\"name\":\"MyOtherEvent\"}", batch.get(1).getEvent().toString()); + } + + @Test + public void testGarbageBetweenEvents() { + final String events = "[{\"name\":\"MyEvent\"},atb#{\"name\":\"MyOtherEvent\"}]"; + try { + BatchFactory.from(events); + fail(); + } catch (JSONException e) {} + } } diff --git a/src/test/java/org/zalando/nakadi/enrichment/EnrichmentTest.java b/src/test/java/org/zalando/nakadi/enrichment/EnrichmentTest.java index e827a704ff..d03d687a2a 100644 --- a/src/test/java/org/zalando/nakadi/enrichment/EnrichmentTest.java +++ b/src/test/java/org/zalando/nakadi/enrichment/EnrichmentTest.java @@ -11,7 +11,7 @@ import static org.zalando.nakadi.domain.EventCategory.BUSINESS; import static org.zalando.nakadi.domain.EventCategory.DATA; import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; -import static org.zalando.nakadi.utils.TestUtils.createBatch; +import static org.zalando.nakadi.utils.TestUtils.createBatchItem; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -59,7 +59,7 @@ public void enrichAppliesStrategies() throws Exception { final EventType eventType = buildDefaultEventType(); eventType.getEnrichmentStrategies().add(EnrichmentStrategyDescriptor.METADATA_ENRICHMENT); final JSONObject event = new JSONObject(); - final BatchItem batchItem = createBatch(event); + final BatchItem batchItem = createBatchItem(event); final EnrichmentStrategy strategy = mock(EnrichmentStrategy.class); Mockito diff --git a/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java b/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java index b39e6c1a91..f9fee233f4 100644 --- a/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java +++ b/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java @@ -7,10 +7,11 @@ import org.joda.time.DateTimeUtils; import org.json.JSONObject; import org.junit.Test; +import org.zalando.nakadi.utils.TestUtils; import static org.zalando.nakadi.utils.TestUtils.buildBusinessEvent; import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; -import static org.zalando.nakadi.utils.TestUtils.createBatch; +import static org.zalando.nakadi.utils.TestUtils.createBatchItem; import static org.zalando.nakadi.utils.TestUtils.randomString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.isEmptyString; @@ -23,17 +24,19 @@ public class MetadataEnrichmentStrategyTest { public void setReceivedAtWithSystemTimeInUTC() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); + final BatchItem batch = TestUtils.createBatchItem(event); assertThat(event.getJSONObject("metadata").optString("received_at"), isEmptyString()); try { DateTimeUtils.setCurrentMillisFixed(0); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(batch, eventType); } finally { DateTimeUtils.setCurrentMillisSystem(); } - assertThat(event.getJSONObject("metadata").getString("received_at"), equalTo("1970-01-01T00:00:00.000Z")); + assertThat(batch.getEvent().getJSONObject("metadata").getString("received_at"), + equalTo("1970-01-01T00:00:00.000Z")); } @Test(expected = EnrichmentException.class) @@ -43,33 +46,35 @@ public void throwsExceptionIfPathNotPresent() throws Exception { event.remove("metadata"); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(TestUtils.createBatchItem(event), eventType); } @Test public void setEventTypeName() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); + final BatchItem batch = TestUtils.createBatchItem(event); assertThat(event.getJSONObject("metadata").optString("event_type"), isEmptyString()); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(batch, eventType); - assertThat(event.getJSONObject("metadata").getString("event_type"), equalTo(eventType.getName())); + assertThat(batch.getEvent().getJSONObject("metadata").getString("event_type"), equalTo(eventType.getName())); } @Test public void setFlowId() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); + final BatchItem batch = createBatchItem(event); assertThat(event.getJSONObject("metadata").optString("flow_id"), isEmptyString()); final String flowId = randomString(); FlowIdUtils.push(flowId); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(batch, eventType); - assertThat(event.getJSONObject("metadata").getString("flow_id"), equalTo(flowId)); + assertThat(batch.getEvent().getJSONObject("metadata").getString("flow_id"), equalTo(flowId)); } @Test @@ -77,11 +82,11 @@ public void setPartition() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); final String partition = randomString(); - final BatchItem batch = createBatch(event); + final BatchItem batch = createBatchItem(event); batch.setPartition(partition); strategy.enrich(batch, eventType); - assertThat(event.getJSONObject("metadata").getString("partition"), equalTo(partition)); + assertThat(batch.getEvent().getJSONObject("metadata").getString("partition"), equalTo(partition)); } } \ No newline at end of file diff --git a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index 67c9a6cbf4..eb7d0e3f74 100644 --- a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -13,7 +13,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; -import org.json.JSONObject; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -216,7 +215,7 @@ public void invalidateInvalidCursors() throws NakadiException { @Test public void whenPostEventTimesOutThenUpdateItemStatus() throws Exception { - final BatchItem item = new BatchItem(new JSONObject()); + final BatchItem item = new BatchItem("{}"); item.setPartition("1"); final List batch = new ArrayList<>(); batch.add(item); @@ -240,7 +239,7 @@ public void whenPostEventTimesOutThenUpdateItemStatus() throws Exception { @Test public void whenPostEventOverflowsBufferThenUpdateItemStatus() throws Exception { - final BatchItem item = new BatchItem(new JSONObject()); + final BatchItem item = new BatchItem("{}"); item.setPartition("1"); final List batch = new ArrayList<>(); batch.add(item); @@ -265,9 +264,9 @@ public void whenPostEventOverflowsBufferThenUpdateItemStatus() throws Exception @Test public void whenKafkaPublishCallbackWithExceptionThenEventPublishingException() throws Exception { - final BatchItem firstItem = new BatchItem(new JSONObject()); + final BatchItem firstItem = new BatchItem("{}"); firstItem.setPartition("1"); - final BatchItem secondItem = new BatchItem(new JSONObject()); + final BatchItem secondItem = new BatchItem("{}"); secondItem.setPartition("2"); final List batch = ImmutableList.of(firstItem, secondItem); @@ -391,7 +390,7 @@ public void whenKafkaPublishTimeoutThenCircuitIsOpened() throws Exception { final List batches = new LinkedList<>(); for (int i = 0; i < 1000; i++) { try { - final BatchItem batchItem = new BatchItem(new JSONObject()); + final BatchItem batchItem = new BatchItem("{}"); batchItem.setPartition("1"); batches.add(batchItem); kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), ImmutableList.of(batchItem)); diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index d8a418e229..5f6d720029 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -6,6 +6,7 @@ import org.junit.Test; import org.mockito.Mockito; import org.zalando.nakadi.config.NakadiSettings; +import org.zalando.nakadi.domain.BatchItem; import org.zalando.nakadi.domain.BatchItemResponse; import org.zalando.nakadi.domain.EventPublishResult; import org.zalando.nakadi.domain.EventPublishingStatus; @@ -44,7 +45,7 @@ import static org.mockito.Mockito.verify; import static org.zalando.nakadi.utils.TestUtils.buildBusinessEvent; import static org.zalando.nakadi.utils.TestUtils.buildDefaultEventType; -import static org.zalando.nakadi.utils.TestUtils.createBatch; +import static org.zalando.nakadi.utils.TestUtils.createBatchItem; import static org.zalando.nakadi.utils.TestUtils.randomString; import static org.zalando.nakadi.utils.TestUtils.randomStringOfLength; import static org.zalando.nakadi.utils.TestUtils.randomValidStringOfLength; @@ -75,9 +76,9 @@ public void whenPublishIsSuccessfulThenResultIsSubmitted() throws Exception { final JSONArray batch = buildDefaultBatch(1); final JSONObject event = batch.getJSONObject(0); - mockSuccessfulValidation(eventType, event); + mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); verify(topicRepository, times(1)).syncPostBatch(eq(eventType.getTopic()), any()); @@ -89,9 +90,9 @@ public void whenEventHasEidThenSetItInTheResponse() throws Exception { final JSONObject event = buildBusinessEvent(); final JSONArray batch = new JSONArray(Arrays.asList(event)); - mockSuccessfulValidation(eventType, event); + mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getResponses().get(0).getEid(), equalTo(event.getJSONObject("metadata").optString("eid"))); verify(topicRepository, times(1)).syncPostBatch(eq(eventType.getTopic()), any()); @@ -103,12 +104,12 @@ public void whenValidationFailsThenResultIsAborted() throws Exception { final JSONArray batch = buildDefaultBatch(1); final JSONObject event = batch.getJSONObject(0); - mockFaultValidation(eventType, event, "error"); + mockFaultValidation(eventType, "error"); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); - verify(enrichment, times(0)).enrich(createBatch(event), eventType); + verify(enrichment, times(0)).enrich(createBatchItem(event), eventType); verify(partitionResolver, times(0)).resolvePartition(eventType, event); verify(topicRepository, times(0)).syncPostBatch(any(), any()); } @@ -119,9 +120,9 @@ public void whenValidationFailsThenSubsequentItemsAreAborted() throws Exception final JSONArray batch = buildDefaultBatch(2); final JSONObject event = batch.getJSONObject(0); - mockFaultValidation(eventType, event, "error"); + mockFaultValidation(eventType, "error"); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -145,7 +146,7 @@ public void whenEventIsTooLargeThenResultIsAborted() throws Exception { mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(enrichment, times(0)).enrich(any(), any()); @@ -166,7 +167,7 @@ public void whenEventIsTooLargeThenAllItemsAreAborted() throws Exception { mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -199,7 +200,7 @@ public void whenEnrichmentMakesEventTooLargeThenAllItemsAreAborted() throws Exce mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -226,7 +227,7 @@ public void whenEventIsExactlyMaxSizeThenResultIsSuccess() throws Exception { mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); verify(enrichment, times(1)).enrich(any(), any()); @@ -241,7 +242,7 @@ public void whenEventIsOneByteOverMaxSizeThenResultIsAborted() throws Exception mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(enrichment, times(0)).enrich(any(), any()); @@ -256,7 +257,7 @@ public void whenEventIsOneByteOverMaxSizeWithMultiByteCharsThenResultIsAborted() mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(enrichment, times(0)).enrich(any(), any()); @@ -271,7 +272,7 @@ public void whenEventIsExactlyMaxSizeWithMultiByteCharsThenResultIsSuccess() thr mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); verify(enrichment, times(1)).enrich(any(), any()); @@ -282,13 +283,15 @@ public void whenEventIsExactlyMaxSizeWithMultiByteCharsThenResultIsSuccess() thr @Test public void whenPartitionFailsThenResultIsAborted() throws Exception { final EventType eventType = buildDefaultEventType(); - final JSONArray batch = buildDefaultBatch(1); - final JSONObject event = batch.getJSONObject(0); + final List batch = new ArrayList<>(); + batch.add(createBatchItem(buildDefaultBatch(1).getJSONObject(0))); + final JSONObject event = batch.get(0).getEvent(); - mockSuccessfulValidation(eventType, event); - mockFaultPartition(eventType, event); + mockSuccessfulValidation(eventType); + mockFaultPartition(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(createStringFromBatchItems(batch), + eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); } @@ -296,13 +299,16 @@ public void whenPartitionFailsThenResultIsAborted() throws Exception { @Test public void whenPartitionFailsThenSubsequentItemsAreAborted() throws Exception { final EventType eventType = buildDefaultEventType(); - final JSONArray batch = buildDefaultBatch(2); - final JSONObject event = batch.getJSONObject(0); + final JSONArray array = buildDefaultBatch(2); + final List batch = new ArrayList<>(); + batch.add(createBatchItem(array.getJSONObject(0))); + batch.add(createBatchItem(array.getJSONObject(1))); mockSuccessfulValidation(eventType); - mockFaultPartition(eventType, event); + mockFaultPartition(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(createStringFromBatchItems(batch), + eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -328,7 +334,7 @@ public void whenPublishingFailsThenResultIsFailed() throws Exception { mockSuccessfulValidation(eventType); mockFailedPublishing(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.FAILED)); verify(topicRepository, times(1)).syncPostBatch(any(), any()); @@ -340,14 +346,14 @@ public void whenEnrichmentFailsThenResultIsAborted() throws Exception { final JSONArray batch = buildDefaultBatch(1); final JSONObject event = batch.getJSONObject(0); - mockSuccessfulValidation(eventType, event); + mockSuccessfulValidation(eventType); mockFaultEnrichment(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(cache, times(1)).getValidator(eventType.getName()); - verify(partitionResolver, times(1)).resolvePartition(eventType, event); + verify(partitionResolver, times(1)).resolvePartition(any(), any()); verify(enrichment, times(1)).enrich(any(), any()); verify(topicRepository, times(0)).syncPostBatch(any(), any()); } @@ -360,7 +366,7 @@ public void whenEnrichmentFailsThenSubsequentItemsAreAborted() throws Exception mockSuccessfulValidation(eventType); mockFaultEnrichment(); - final EventPublishResult result = publisher.publish(batch, eventType.getName(), FULL_ACCESS_CLIENT); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), FULL_ACCESS_CLIENT); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -382,7 +388,7 @@ public void testScopeWrite() throws Exception { final EventType eventType = EventTypeTestBuilder.builder().writeScopes(SCOPE_WRITE).build(); Mockito.when(cache.getEventType(eventType.getName())).thenReturn(eventType); mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(buildDefaultBatch(0), eventType.getName(), + final EventPublishResult result = publisher.publish(buildDefaultBatch(0).toString(), eventType.getName(), new NakadiClient(CLIENT_ID, SCOPE_WRITE)); Assert.assertEquals(result.getStatus(), EventPublishingStatus.SUBMITTED); @@ -392,7 +398,7 @@ public void testScopeWrite() throws Exception { public void testNoScopeWrite() throws Exception { final EventType eventType = EventTypeTestBuilder.builder().writeScopes(SCOPE_WRITE).build(); Mockito.when(cache.getEventType(eventType.getName())).thenReturn(eventType); - publisher.publish(buildDefaultBatch(0), eventType.getName(), + publisher.publish(buildDefaultBatch(0).toString(), eventType.getName(), new NakadiClient(CLIENT_ID, Collections.emptySet())); } @@ -403,11 +409,18 @@ private void mockFailedPublishing() throws Exception { .syncPostBatch(any(), any()); } - private void mockFaultPartition(final EventType eventType, final JSONObject event) throws PartitioningException { + private void mockFaultPartition(final EventType eventType, final BatchItem item) throws PartitioningException { Mockito .doThrow(new PartitioningException("partition error")) .when(partitionResolver) - .resolvePartition(eventType, event); + .resolvePartition(eventType, item.getEvent()); + } + + private void mockFaultPartition() throws PartitioningException { + Mockito + .doThrow(new PartitioningException("partition error")) + .when(partitionResolver) + .resolvePartition(any(), any()); } private void mockFaultEnrichment() throws EnrichmentException { @@ -437,6 +450,25 @@ private void mockFaultValidation(final EventType eventType, final JSONObject eve .validate(event); } + private void mockFaultValidation(final EventType eventType, final String error) throws Exception { + final EventTypeValidator faultyValidator = mock(EventTypeValidator.class); + + Mockito + .doReturn(eventType) + .when(cache) + .getEventType(eventType.getName()); + + Mockito + .doReturn(faultyValidator) + .when(cache) + .getValidator(eventType.getName()); + + Mockito + .doReturn(Optional.of(new ValidationError(error))) + .when(faultyValidator) + .validate(any()); + } + private void mockSuccessfulValidation(final EventType eventType, final JSONObject event) throws Exception { final EventTypeValidator truthyValidator = mock(EventTypeValidator.class); @@ -522,4 +554,15 @@ private JSONArray buildBatch(final int numberOfEvents, final int length) { return new JSONArray(events); } + + private String createStringFromBatchItems(final List batch) { + final StringBuilder sb = new StringBuilder(); + sb.append("["); + for (BatchItem item:batch) { + sb.append(item.getEvent().toString()); + sb.append(","); + } + sb.setCharAt(sb.length() - 1, ']'); + return sb.toString(); + } } diff --git a/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/src/test/java/org/zalando/nakadi/utils/TestUtils.java index d08533e1f6..cb33cb43cb 100644 --- a/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -177,7 +177,11 @@ public static void waitFor(final Runnable runnable, final int timeoutMs, final i .withWaitBetweenEachTry(intervalMs)); } - public static BatchItem createBatch(final JSONObject event) { + public static BatchItem createBatchItem(final JSONObject event) { + return new BatchItem(event.toString()); + } + + public static BatchItem createBatchItem(final String event) { return new BatchItem(event); } From 0fbaa46c50774f8cf2227b4ec76f386a3c095cc2 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Tue, 3 Jan 2017 17:42:12 +0100 Subject: [PATCH 24/37] ARUHA-473 Pass batch as String to publisher and fix checkstyle violations --- .../nakadi/controller/EventPublishingController.java | 2 +- .../java/org/zalando/nakadi/domain/BatchFactory.java | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java index 541ecc7fa3..ed95c8b704 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java @@ -85,7 +85,7 @@ private ResponseEntity postEventInternal(final String eventTypeName, final JSONArray eventsAsJsonObjects = new JSONArray(eventsAsString); final int eventCount = eventsAsJsonObjects.length(); - final EventPublishResult result = publisher.publish(eventsAsJsonObjects, eventTypeName, client); + final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, client); reportMetrics(eventTypeMetrics, result, eventsAsString, eventCount); final ResponseEntity response = response(result); diff --git a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java index eb9e6586e4..bc1d961439 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java @@ -18,20 +18,19 @@ public static List from(final String events) { } for (int i = 1; i < events.length() - 1; i++) { if (!escaped && events.charAt(i) == '"') { - if (insideQuote) + if (insideQuote) { insideQuote = false; - else + } else { insideQuote = true; + } } if (escaped) { sb.append(events.charAt(i)); escaped = false; - } - else if (!escaped && events.charAt(i) == '\\') { + } else if (!escaped && events.charAt(i) == '\\') { sb.append(events.charAt(i)); escaped = true; - } - else if (insideQuote) { + } else if (insideQuote) { sb.append(events.charAt(i)); } else { if (events.charAt(i) == '{') { From 20aed22fb4e9c9599e4006fdbe7ffc6d25c596b8 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Wed, 4 Jan 2017 17:59:58 +0100 Subject: [PATCH 25/37] ARUHA-473 Don't parse input to JSON in EventPublishingController --- .../controller/EventPublishingController.java | 6 +-- .../EventPublishingControllerTest.java | 39 +++++++++++++++---- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java index ed95c8b704..f60c4ff06c 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventPublishingController.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.controller; -import org.json.JSONArray; import org.json.JSONException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,11 +81,8 @@ private ResponseEntity postEventInternal(final String eventTypeName, final Client client) { final long startingNanos = System.nanoTime(); try { - final JSONArray eventsAsJsonObjects = new JSONArray(eventsAsString); - - final int eventCount = eventsAsJsonObjects.length(); final EventPublishResult result = publisher.publish(eventsAsString, eventTypeName, client); - reportMetrics(eventTypeMetrics, result, eventsAsString, eventCount); + reportMetrics(eventTypeMetrics, result, eventsAsString, result.getResponses().size()); final ResponseEntity response = response(result); return response; diff --git a/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java index 2ff8601369..a059432c99 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java @@ -2,6 +2,7 @@ import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; +import org.json.JSONException; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -14,6 +15,8 @@ import org.zalando.nakadi.config.SecuritySettings; import org.zalando.nakadi.domain.BatchItemResponse; import org.zalando.nakadi.domain.EventPublishResult; +import org.zalando.nakadi.domain.EventPublishingStatus; +import org.zalando.nakadi.domain.EventPublishingStep; import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.metrics.EventTypeMetricRegistry; @@ -41,6 +44,7 @@ import static org.zalando.nakadi.domain.EventPublishingStatus.ABORTED; import static org.zalando.nakadi.domain.EventPublishingStatus.FAILED; import static org.zalando.nakadi.domain.EventPublishingStatus.SUBMITTED; +import static org.zalando.nakadi.domain.EventPublishingStep.NONE; import static org.zalando.nakadi.domain.EventPublishingStep.PARTITIONING; import static org.zalando.nakadi.domain.EventPublishingStep.PUBLISHING; import static org.zalando.nakadi.domain.EventPublishingStep.VALIDATING; @@ -85,7 +89,7 @@ public void setUp() throws Exception { @Test public void whenResultIsSubmittedThen200() throws Exception { - final EventPublishResult result = new EventPublishResult(SUBMITTED, null, null); + final EventPublishResult result = new EventPublishResult(SUBMITTED, null, submittedResponses(1)); Mockito .doReturn(result) @@ -99,11 +103,12 @@ public void whenResultIsSubmittedThen200() throws Exception { @Test public void whenInvalidPostBodyThen400() throws Exception { - final String expectedPayload = "{\"type\":\"http://httpstatus.es/400\"," + - "\"title\":\"Bad Request\",\"status\":400," + - "\"detail\":\"A JSONArray text must start with '[' at 1 [character 2 line 1]\"}"; - postBatch(TOPIC, "invalid json array").andExpect(status().isBadRequest()) - .andExpect(content().string(expectedPayload)); + + Mockito.doThrow(new JSONException("Error")) + .when(publisher) + .publish(any(String.class), eq(TOPIC), any(Client.class)); + + postBatch(TOPIC, "invalid json array").andExpect(status().isBadRequest()); } @Test @@ -148,7 +153,7 @@ public void whenEventTypeNotFoundThen404() throws Exception { @Test public void publishedEventsAreReportedPerEventType() throws Exception { - final EventPublishResult success = new EventPublishResult(SUBMITTED, null, null); + final EventPublishResult success = new EventPublishResult(SUBMITTED, null, submittedResponses(3)); Mockito .doReturn(success) .doReturn(success) @@ -177,6 +182,26 @@ private List responses() { return responses; } + private List submittedResponses(final int number) { + return responses(number, SUBMITTED, PUBLISHING); + } + + private List abortedResponses(final int number) { + return responses(number, ABORTED, NONE); + } + + private List responses(final int number, final EventPublishingStatus status, + final EventPublishingStep step) { + final List responses = new ArrayList<>(); + for (int i = 0; i < number; i++) { + final BatchItemResponse response = new BatchItemResponse(); + response.setPublishingStatus(status); + response.setStep(step); + responses.add(response); + } + return responses; + } + private ResultActions postBatch(final String eventType, final String batch) throws Exception { final String url = "/event-types/" + eventType + "/events"; final MockHttpServletRequestBuilder requestBuilder = post(url) From b9c36f89d2afabe292c43f0a284b6b4f9c4eb7a7 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 5 Jan 2017 12:10:25 +0100 Subject: [PATCH 26/37] ARUHA-473 Remove acceptance test --- .../nakadi/webservice/EventPublishingAT.java | 36 ------------------- 1 file changed, 36 deletions(-) delete mode 100644 src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java deleted file mode 100644 index 26893e686b..0000000000 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/EventPublishingAT.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.zalando.nakadi.webservice; - -import com.jayway.restassured.http.ContentType; -import com.jayway.restassured.response.Response; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpStatus; -import org.junit.Test; -import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.webservice.utils.NakadiTestUtils; - -import java.text.MessageFormat; - -import static com.jayway.restassured.RestAssured.given; - -public class EventPublishingAT extends BaseAT { - - @Test - public void whenPublishingEventTooLargeThen422() throws Exception { - final EventType eventType = NakadiTestUtils.createEventType(); - - publishLargeEvent(eventType) - .then() - .statusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY); - } - - private Response publishLargeEvent(final EventType eventType) { - final StringBuilder sb = new StringBuilder(1000023); - sb.append("[{\"blah\":\""); - sb.append(StringUtils.repeat("a", 1000010)); - sb.append("\"}]"); - return given() - .body(sb.toString()) - .contentType(ContentType.JSON) - .post(MessageFormat.format("/event-types/{0}/events", eventType.getName())); - } -} From 33bbc035116ae84fe7adce5cfae71552875cf825 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 5 Jan 2017 14:23:12 +0100 Subject: [PATCH 27/37] ARUHA-473 Remove unused private method --- .../nakadi/controller/EventPublishingControllerTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java b/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java index a059432c99..7bbfcc5fd0 100644 --- a/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java @@ -44,7 +44,6 @@ import static org.zalando.nakadi.domain.EventPublishingStatus.ABORTED; import static org.zalando.nakadi.domain.EventPublishingStatus.FAILED; import static org.zalando.nakadi.domain.EventPublishingStatus.SUBMITTED; -import static org.zalando.nakadi.domain.EventPublishingStep.NONE; import static org.zalando.nakadi.domain.EventPublishingStep.PARTITIONING; import static org.zalando.nakadi.domain.EventPublishingStep.PUBLISHING; import static org.zalando.nakadi.domain.EventPublishingStep.VALIDATING; @@ -186,10 +185,6 @@ private List submittedResponses(final int number) { return responses(number, SUBMITTED, PUBLISHING); } - private List abortedResponses(final int number) { - return responses(number, ABORTED, NONE); - } - private List responses(final int number, final EventPublishingStatus status, final EventPublishingStep step) { final List responses = new ArrayList<>(); From f6fcfe9d7510775f6d301f1ca8d1490460f9d9f9 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 5 Jan 2017 17:42:29 +0100 Subject: [PATCH 28/37] ARUHA-473 Error message for events too large includes the size of the event and the max size allowed --- src/main/java/org/zalando/nakadi/service/EventPublisher.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/zalando/nakadi/service/EventPublisher.java b/src/main/java/org/zalando/nakadi/service/EventPublisher.java index 1c92600fc1..12a07ca8c0 100644 --- a/src/main/java/org/zalando/nakadi/service/EventPublisher.java +++ b/src/main/java/org/zalando/nakadi/service/EventPublisher.java @@ -152,7 +152,8 @@ private void validateSchema(final JSONObject event, final EventType eventType) t private void validateEventSize(final BatchItem item) throws EventValidationException { if (item.getEventSize() > nakadiSettings.getEventMaxBytes()) { - throw new EventValidationException("Event too large"); + throw new EventValidationException("Event too large: " + item.getEventSize() + + " bytes, max size is " + nakadiSettings.getEventMaxBytes() + " bytes"); } } From f90e477abae714bc09928085b3ca8f1b0bb76ef5 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 5 Jan 2017 17:54:52 +0100 Subject: [PATCH 29/37] ARUHA-473 Fix tests' expected error messages for events too large --- .../java/org/zalando/nakadi/service/EventPublisherTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java index 5f6d720029..56c0c4d027 100644 --- a/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventPublisherTest.java @@ -37,6 +37,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.isEmptyString; +import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -179,7 +180,7 @@ public void whenEventIsTooLargeThenAllItemsAreAborted() throws Exception { final BatchItemResponse secondResponse = result.getResponses().get(1); assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); - assertThat(secondResponse.getDetail(), equalTo("Event too large")); + assertThat(secondResponse.getDetail(), startsWith("Event too large")); final BatchItemResponse thirdResponse = result.getResponses().get(2); assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -212,7 +213,7 @@ public void whenEnrichmentMakesEventTooLargeThenAllItemsAreAborted() throws Exce final BatchItemResponse secondResponse = result.getResponses().get(1); assertThat(secondResponse.getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); assertThat(secondResponse.getStep(), equalTo(EventPublishingStep.VALIDATING)); - assertThat(secondResponse.getDetail(), equalTo("Event too large")); + assertThat(secondResponse.getDetail(), startsWith("Event too large")); final BatchItemResponse thirdResponse = result.getResponses().get(2); assertThat(thirdResponse.getPublishingStatus(), equalTo(EventPublishingStatus.ABORTED)); From bf90d6e28503f0850537a1b705fe85ddedbb71ed Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 5 Jan 2017 18:47:56 +0100 Subject: [PATCH 30/37] ARUHA-473 Fix test --- .../nakadi/enrichment/MetadataEnrichmentStrategyTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java b/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java index 1107506ecb..768b9547c2 100644 --- a/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java +++ b/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java @@ -66,12 +66,13 @@ public void setEventTypeName() throws Exception { public void setEventTypeSchemaVersion() throws Exception { final EventType eventType = buildDefaultEventType(); final JSONObject event = buildBusinessEvent(); + final BatchItem batchItem = createBatchItem(event); - assertThat(event.getJSONObject("metadata").optString("version"), isEmptyString()); + assertThat(batchItem.getEvent().getJSONObject("metadata").optString("version"), isEmptyString()); - strategy.enrich(createBatch(event), eventType); + strategy.enrich(batchItem, eventType); - assertThat(event.getJSONObject("metadata").getString("version"), equalTo("1.0.0")); + assertThat(batchItem.getEvent().getJSONObject("metadata").getString("version"), equalTo("1.0.0")); } @Test From 0330145b3c2d38afa0cfdc6a6b452179ed2474ee Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 6 Jan 2017 10:43:32 +0100 Subject: [PATCH 31/37] ARUHA-473 Change max event size in swagger file --- api/nakadi-event-bus-api.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/api/nakadi-event-bus-api.yaml b/api/nakadi-event-bus-api.yaml index 03f5b56029..ee912832ae 100644 --- a/api/nakadi-event-bus-api.yaml +++ b/api/nakadi-event-bus-api.yaml @@ -317,8 +317,9 @@ paths: is **rejected** in the first case of failure. If the offending validation rule provides information about the violation it will be included in the `BatchItemResponse`. If the `EventType` defines schema validation it will be performed at this moment. The size of each - Event will also be validated. The maximum size per Event is 1,000,000 bytes, before - enrichment. + Event will also be validated. The maximum size per Event is 999,000 bytes. We use the batch + input to measure the size of events, so unnecessary spaces, tabs, and carriage returns will + count towards the event size. 1. Once the validation succeeded, the content of the Event is updated according to the enrichment rules in the order the rules are defined in the `EventType`. No preexisting From 4cc0bcd170e67696cdb79d7fb2c3607661e1d030 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 6 Jan 2017 13:14:43 +0100 Subject: [PATCH 32/37] ARUHA-473 Fix bug when a batch is an array with only spaces. --- .../org/zalando/nakadi/domain/BatchFactory.java | 7 +++++-- .../zalando/nakadi/domain/BatchFactoryTest.java | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java index bc1d961439..b8caaeb27f 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java @@ -39,7 +39,11 @@ public static List from(final String events) { if (events.charAt(i) == '}') { brackets--; } - if (!((brackets == 0) && (events.charAt(i) == ','))) { + if (!((brackets == 0) && ((events.charAt(i) == ',') + || (events.charAt(i) == ' ') + || (events.charAt(i) == '\t') + || (events.charAt(i) == '\n') + || (events.charAt(i) == '\r')))) { sb.append(events.charAt(i)); } if (brackets == 0 && (events.charAt(i) != ' ') @@ -60,5 +64,4 @@ public static List from(final String events) { return batch; } - } diff --git a/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java index 5b233b829f..8b23e3ac45 100644 --- a/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java +++ b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java @@ -19,6 +19,20 @@ public void testOneEvent() { assertEquals("{\"name\":\"MyEvent\"}", batch.get(0).getEvent().toString()); } + @Test + public void testNoEvent() { + final String events = "[]"; + final List batch = BatchFactory.from(events); + assertEquals(0, batch.size()); + } + + @Test + public void testNoEventAndSpace() { + final String events = "[ ]"; + final List batch = BatchFactory.from(events); + assertEquals(0, batch.size()); + } + @Test public void testMultipleEvents() { final String events = "[{\"name\":\"MyEvent\"},{\"name\":\"MyOtherEvent\"}]"; From 30db7f54f88e051f59cff048d589fe889ba14db9 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Fri, 6 Jan 2017 13:29:58 +0100 Subject: [PATCH 33/37] ARUHA-473 Set max event size to 999,000 bytes by default --- src/main/resources/application.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f88219d016..b1da81c300 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -79,7 +79,7 @@ nakadi: auth: plugin: factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory - event.max.bytes: 1000000 + event.max.bytes: 999000 --- spring: From 781db1e261df03a71727b48d6344df7e1f4621de Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Tue, 10 Jan 2017 15:19:15 +0100 Subject: [PATCH 34/37] ARUHA-473 Accept valid batches surrounded with tabs, carriage returns, or spaces --- .../zalando/nakadi/domain/BatchFactory.java | 31 +++++++++++++++++-- .../nakadi/domain/BatchFactoryTest.java | 7 +++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java index b8caaeb27f..7cac8d482a 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java @@ -13,10 +13,35 @@ public static List from(final String events) { int brackets = 0; boolean insideQuote = false; boolean escaped = false; - if ((!events.startsWith("[")) || (!events.endsWith("]"))) { - throw new JSONException("Array must be surrounded with square brackets"); + int start = 0; + final int length = events.length(); + int end = length - 1; + + while ((events.charAt(start) == ' ' + || events.charAt(start) == '\t' + || events.charAt(start) == '\n' + || events.charAt(start) == '\r') + && start < end) { + start++; + } + while ((events.charAt(end) == ' ' + || events.charAt(end) == '\t' + || events.charAt(end) == '\n' + || events.charAt(end) == '\r') + && end > start) { + end--; + } + if (!(events.charAt(start) == '[')) { + throw new JSONException(String.format("Unexpected character %s in position %d, expected '['", + events.charAt(start), start)); } - for (int i = 1; i < events.length() - 1; i++) { + start++; + if (!(events.charAt(end) == ']')) { + throw new JSONException(String.format("Unexpected character %s in position %d, expected ']'", + events.charAt(end), end)); + } + + for (int i = start; i < end; i++) { if (!escaped && events.charAt(i) == '"') { if (insideQuote) { insideQuote = false; diff --git a/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java index 8b23e3ac45..010ab63f6b 100644 --- a/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java +++ b/src/test/java/org/zalando/nakadi/domain/BatchFactoryTest.java @@ -99,6 +99,13 @@ public void testSpacesBetweenEvents() { assertEquals("{\"name\":\"MyOtherEvent\"}", batch.get(1).getEvent().toString()); } + @Test + public void testEmptyCharactersAroundArray() { + final String events = "\t [{\"name\":\"MyEvent\"},{\"name\":\"MyOtherEvent\"}]\n\n"; + final List batch = BatchFactory.from(events); + assertEquals(2, batch.size()); + } + @Test public void testGarbageBetweenEvents() { final String events = "[{\"name\":\"MyEvent\"},atb#{\"name\":\"MyOtherEvent\"}]"; From ffd729a245964c34656fc43e8e3044465f93e7ff Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Tue, 10 Jan 2017 15:52:28 +0100 Subject: [PATCH 35/37] ARUHA-473 Refactoring --- .../zalando/nakadi/domain/BatchFactory.java | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java index 7cac8d482a..30e3993d95 100644 --- a/src/main/java/org/zalando/nakadi/domain/BatchFactory.java +++ b/src/main/java/org/zalando/nakadi/domain/BatchFactory.java @@ -17,18 +17,10 @@ public static List from(final String events) { final int length = events.length(); int end = length - 1; - while ((events.charAt(start) == ' ' - || events.charAt(start) == '\t' - || events.charAt(start) == '\n' - || events.charAt(start) == '\r') - && start < end) { + while (isEmptyCharacter(events.charAt(start)) && start < end) { start++; } - while ((events.charAt(end) == ' ' - || events.charAt(end) == '\t' - || events.charAt(end) == '\n' - || events.charAt(end) == '\r') - && end > start) { + while (isEmptyCharacter(events.charAt(end)) && end > start) { end--; } if (!(events.charAt(start) == '[')) { @@ -65,16 +57,10 @@ public static List from(final String events) { brackets--; } if (!((brackets == 0) && ((events.charAt(i) == ',') - || (events.charAt(i) == ' ') - || (events.charAt(i) == '\t') - || (events.charAt(i) == '\n') - || (events.charAt(i) == '\r')))) { + || isEmptyCharacter(events.charAt(i))))) { sb.append(events.charAt(i)); } - if (brackets == 0 && (events.charAt(i) != ' ') - && (events.charAt(i) != '\t' - && (events.charAt(i) != '\n') - && (events.charAt(i) != '\r'))) { + if (brackets == 0 && !isEmptyCharacter(events.charAt(i))) { if (sb.length() > 0) { batch.add(new BatchItem(sb.toString())); } @@ -89,4 +75,8 @@ public static List from(final String events) { return batch; } + + private static boolean isEmptyCharacter(final char c) { + return (c == ' ' || c == '\t' || c == '\n' || c == '\r'); + } } From 58cb450f264bf93116ed8007f224b943184a5084 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Wed, 18 Jan 2017 14:55:42 +0100 Subject: [PATCH 36/37] ARUHA-473 Remove kafka max bytes property --- src/main/resources/application.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b1da81c300..4016aa989f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -59,7 +59,6 @@ nakadi: send.timeoutMs: 5000 batch.size: 5242880 linger.ms: 0 - fetch.message.max.bytes: 2000000 zookeeper: kafkaNamespace: brokers: 127.0.0.1:2181 From 3f2a2acf5396e9eaf25f3b357df2d7df3dde0ad6 Mon Sep 17 00:00:00 2001 From: Lionel Montrieux Date: Thu, 19 Jan 2017 11:17:28 +0100 Subject: [PATCH 37/37] ARUHA-473 Finish removing kafka max bytes property --- .../nakadi/repository/kafka/KafkaRepositoryAT.java | 4 +--- .../nakadi/repository/kafka/KafkaLocationManager.java | 4 +--- .../zalando/nakadi/repository/kafka/KafkaSettings.java | 9 +-------- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index c993351e65..2e1cced20c 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -54,7 +54,6 @@ public class KafkaRepositoryAT extends BaseAT { private static final int KAFKA_BATCH_SIZE = 1048576; private static final long KAFKA_LINGER_MS = 0; private static final long NAKADI_EVENT_MAX_BYTES = 1000000L; - private static final int KAFKA_FETCH_MESSAGE_MAX_BYTES = 2000000; private NakadiSettings nakadiSettings; private KafkaSettings kafkaSettings; @@ -76,8 +75,7 @@ public void setup() { NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, NAKADI_EVENT_MAX_BYTES); - kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS, - KAFKA_FETCH_MESSAGE_MAX_BYTES); + kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_LINGER_MS); zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT); kafkaHelper = new KafkaTestHelper(KAFKA_URL); kafkaTopicRepository = createKafkaTopicRepository(); diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java index cdd1db3355..90ecf8f9e4 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java @@ -99,9 +99,7 @@ private void updateBrokers() { } public Properties getKafkaConsumerProperties() { - final Properties consumerProps = (Properties) kafkaProperties.clone(); - consumerProps.put("fetch.message.max.bytes", kafkaSettings.getFetchMessageMaxBytes()); - return consumerProps; + return (Properties) kafkaProperties.clone(); } public Properties getKafkaProducerProperties() { diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java index 8f23fd0eef..ed27c69f96 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java @@ -16,17 +16,14 @@ public class KafkaSettings { // /kafka/clients/producer/ProducerConfig.java#L232 private final int batchSize; private final long lingerMs; - private final int fetchMessageMaxBytes; @Autowired public KafkaSettings(@Value("${nakadi.kafka.request.timeout.ms}") final int requestTimeoutMs, @Value("${nakadi.kafka.batch.size}") final int batchSize, - @Value("${nakadi.kafka.linger.ms}") final long lingerMs, - @Value("${nakadi.kafka.fetch.message.max.bytes}") final int fetchMessageMaxBytes) { + @Value("${nakadi.kafka.linger.ms}") final long lingerMs) { this.requestTimeoutMs = requestTimeoutMs; this.batchSize = batchSize; this.lingerMs = lingerMs; - this.fetchMessageMaxBytes = fetchMessageMaxBytes; } public int getRequestTimeoutMs() { @@ -40,8 +37,4 @@ public int getBatchSize() { public long getLingerMs() { return lingerMs; } - - public int getFetchMessageMaxBytes() { - return fetchMessageMaxBytes; - } }