From 3995bab6aa5934f963db6dccedd6ec8ef908a834 Mon Sep 17 00:00:00 2001 From: pastor Date: Tue, 15 Sep 2020 17:26:42 +0300 Subject: [PATCH 1/2] Send and receive kafka headers #https://github.com/confluentinc/kafka-rest/issues/698 #https://github.com/confluentinc/kafka-rest/issues/523 --- .../kafkarest/NoSchemaRestProducer.java | 16 ++- .../kafkarest/SchemaRestProducer.java | 16 ++- .../kafkarest/entities/ConsumerRecord.java | 10 +- .../kafkarest/entities/ForwardHeader.java | 103 ++++++++++++++++++ .../kafkarest/entities/ProduceRecord.java | 9 +- .../entities/v2/BinaryConsumerRecord.java | 23 +++- .../v2/BinaryPartitionProduceRequest.java | 17 ++- .../v2/BinaryTopicProduceRequest.java | 20 +++- .../entities/v2/JsonConsumerRecord.java | 21 +++- .../v2/JsonPartitionProduceRequest.java | 17 ++- .../entities/v2/JsonTopicProduceRequest.java | 18 ++- .../entities/v2/SchemaConsumerRecord.java | 20 +++- .../v2/SchemaPartitionProduceRequest.java | 17 ++- .../v2/SchemaTopicProduceRequest.java | 18 ++- .../kafkarest/tools/ProducerPerformance.java | 2 +- .../v2/BinaryKafkaConsumerState.java | 9 +- .../kafkarest/v2/JsonKafkaConsumerState.java | 14 ++- .../v2/SchemaKafkaConsumerState.java | 8 +- .../kafkarest/entities/ForwardHeaderTest.java | 48 ++++++++ .../integration/AuthorizationErrorTest.java | 8 +- .../integration/AvroProducerTest.java | 16 +-- .../integration/JsonProducerTest.java | 59 +++++----- .../kafkarest/integration/ProducerTest.java | 66 +++++------ .../ProducerTopicAutoCreationTest.java | 16 +-- .../v2/AvroProduceConsumeTest.java | 6 +- .../v2/JsonSchemaProduceConsumeTest.java | 6 +- .../v2/ProtobufProduceConsumeTest.java | 6 +- .../v2/TopicsResourceAvroProduceTest.java | 4 +- .../v2/TopicsResourceBinaryProduceTest.java | 22 ++-- .../kafkarest/unit/AvroRestProducerTest.java | 6 +- .../v2/KafkaConsumerManagerTest.java | 23 ++-- .../io/confluent/kafkarest/v2/LoadTest.java | 19 ++-- 32 files changed, 499 insertions(+), 164 deletions(-) create mode 100644 kafka-rest/src/main/java/io/confluent/kafkarest/entities/ForwardHeader.java create mode 100644 kafka-rest/src/test/java/io/confluent/kafkarest/entities/ForwardHeaderTest.java diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/NoSchemaRestProducer.java b/kafka-rest/src/main/java/io/confluent/kafkarest/NoSchemaRestProducer.java index c898c2a854..1968ed4ec3 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/NoSchemaRestProducer.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/NoSchemaRestProducer.java @@ -15,10 +15,15 @@ package io.confluent.kafkarest; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.ProduceRecord; import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; /** * Wrapper producer for content types which have no associated schema (e.g. binary or JSON). @@ -43,8 +48,17 @@ public void produce( if (recordPartition == null) { recordPartition = record.getPartition(); } + List
headers = null; + if (record.getHeaders() != null && record.getHeaders().size() > 0) { + headers = record + .getHeaders() + .stream() + .filter(m -> m.value != null && m.value.length > 0) + .map(ForwardHeader::toHeader) + .collect(Collectors.toList()); + } producer.send( - new ProducerRecord<>(topic, recordPartition, record.getKey(), record.getValue()), + new ProducerRecord<>(topic, recordPartition, record.getKey(), record.getValue(), headers), task.createCallback() ); } diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/SchemaRestProducer.java b/kafka-rest/src/main/java/io/confluent/kafkarest/SchemaRestProducer.java index 9842c83a9d..78e98d9a8d 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/SchemaRestProducer.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/SchemaRestProducer.java @@ -22,17 +22,22 @@ import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe; import io.confluent.kafkarest.converters.ConversionException; import io.confluent.kafkarest.converters.SchemaConverter; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.ProduceRecord; import io.confluent.kafkarest.entities.ProduceRequest; import io.confluent.rest.exceptions.RestException; import java.io.IOException; +import java.util.List; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; public class SchemaRestProducer implements RestProducer { @@ -131,7 +136,16 @@ public void produce( if (recordPartition == null) { recordPartition = record.getPartition(); } - kafkaRecords.add(new ProducerRecord(topic, recordPartition, key, value)); + List
headers = null; + if (record.getHeaders() != null && record.getHeaders().size() > 0) { + headers = record + .getHeaders() + .stream() + .filter(m -> m.value != null && m.value.length > 0) + .map(ForwardHeader::toHeader) + .collect(Collectors.toList()); + } + kafkaRecords.add(new ProducerRecord(topic, recordPartition, key, value, headers)); } } catch (ConversionException e) { throw Errors.jsonConversionException(e); diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerRecord.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerRecord.java index 84d00eb59b..8719790925 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerRecord.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerRecord.java @@ -16,7 +16,9 @@ package io.confluent.kafkarest.entities; import com.google.auto.value.AutoValue; + import javax.annotation.Nullable; +import java.util.List; @AutoValue public abstract class ConsumerRecord { @@ -36,8 +38,12 @@ public abstract class ConsumerRecord { public abstract long getOffset(); + public abstract List getHeaders(); + public static ConsumerRecord create( - String topic, @Nullable K key, @Nullable V value, int partition, long offset) { - return new AutoValue_ConsumerRecord<>(topic, key, value, partition, offset); + String topic, + @Nullable K key, @Nullable V value, int partition, long offset, + @Nullable List headers) { + return new AutoValue_ConsumerRecord<>(topic, key, value, partition, offset, headers); } } diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ForwardHeader.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ForwardHeader.java new file mode 100644 index 0000000000..bb923c0048 --- /dev/null +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ForwardHeader.java @@ -0,0 +1,103 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.kafkarest.entities; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +@JsonSerialize(using = ForwardHeader.HeaderSerializer.class) +@JsonDeserialize(using = ForwardHeader.HeaderDeserializer.class) +public final class ForwardHeader { + private static final String NULL_KEY_MESSAGE = "Null header keys are not permitted"; + public final String key; + public final byte[] value; + + public ForwardHeader(Header header) { + this(header.key(), header.value()); + } + + public ForwardHeader(String key, byte[] value) { + this.key = Objects.requireNonNull(key, NULL_KEY_MESSAGE); + this.value = value; + } + + public ForwardHeader(String key, String value) { + this(key, Objects.requireNonNull( + value, "Null header string value").getBytes(StandardCharsets.UTF_8)); + } + + public Header toHeader() { + return new RecordHeader(key, value); + } + + protected static final class HeaderDeserializer extends StdDeserializer { + + public HeaderDeserializer() { + super(ForwardHeader.class); + } + + protected HeaderDeserializer(Class vc) { + super(vc); + } + + @Override + public ForwardHeader deserialize(JsonParser p, DeserializationContext ctx) throws IOException { + String key = p.nextFieldName(); + String value = p.nextTextValue(); + //noinspection StatementWithEmptyBody + while (p.nextToken() != JsonToken.END_OBJECT) { + } + if (value != null) { + return new ForwardHeader(key, value.getBytes(StandardCharsets.UTF_8)); + } + return null; + } + } + + protected static final class HeaderSerializer extends StdSerializer { + + public HeaderSerializer() { + super(ForwardHeader.class); + } + + protected HeaderSerializer(Class t) { + super(t); + } + + @Override + public void serialize(ForwardHeader value, JsonGenerator gen, SerializerProvider provider) + throws IOException { + if (value != null && value.value != null) { + gen.writeStartObject(); + gen.writeStringField(value.key, new String(value.value, StandardCharsets.UTF_8)); + gen.writeEndObject(); + } + } + } +} diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ProduceRecord.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ProduceRecord.java index 6c6b38b9d8..01ec0413f0 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ProduceRecord.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ProduceRecord.java @@ -17,6 +17,7 @@ import com.google.auto.value.AutoValue; import javax.annotation.Nullable; +import java.util.List; @AutoValue public abstract class ProduceRecord { @@ -33,8 +34,12 @@ public abstract class ProduceRecord { @Nullable public abstract Integer getPartition(); + @Nullable + public abstract List getHeaders(); + public static ProduceRecord create( - @Nullable K key, @Nullable V value, @Nullable Integer partition) { - return new AutoValue_ProduceRecord<>(key, value, partition); + @Nullable K key, @Nullable V value, @Nullable Integer partition, + @Nullable List headers) { + return new AutoValue_ProduceRecord<>(key, value, partition, headers); } } \ No newline at end of file diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryConsumerRecord.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryConsumerRecord.java index eaabae54f0..5fa316749e 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryConsumerRecord.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryConsumerRecord.java @@ -20,8 +20,11 @@ import com.google.protobuf.ByteString; import io.confluent.kafkarest.entities.ConsumerRecord; import io.confluent.kafkarest.entities.EntityUtils; -import java.util.Arrays; +import io.confluent.kafkarest.entities.ForwardHeader; + +import java.util.List; import java.util.Objects; +import java.util.Arrays; import java.util.StringJoiner; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; @@ -39,6 +42,9 @@ public final class BinaryConsumerRecord { @Nullable private final byte[] value; + @Nullable + private final List headers; + @PositiveOrZero @Nullable private final Integer partition; @@ -53,12 +59,14 @@ private BinaryConsumerRecord( @JsonProperty("key") @Nullable byte[] key, @JsonProperty("value") @Nullable byte[] value, @JsonProperty("partition") @Nullable Integer partition, - @JsonProperty("offset") @Nullable Long offset) { + @JsonProperty("offset") @Nullable Long offset, + @JsonProperty("headers") @Nullable List headers) { this.topic = topic; this.key = key; this.value = value; this.partition = partition; this.offset = offset; + this.headers = headers; } @JsonProperty @@ -79,6 +87,12 @@ public String getValue() { return value != null ? EntityUtils.encodeBase64Binary(value) : null; } + @JsonProperty + @Nullable + public List getHeaders() { + return headers; + } + @JsonProperty @Nullable public Integer getPartition() { @@ -104,7 +118,8 @@ public static BinaryConsumerRecord fromConsumerRecord( record.getKey() != null ? record.getKey().toByteArray() : null, record.getValue() != null ? record.getValue().toByteArray() : null, record.getPartition(), - record.getOffset()); + record.getOffset(), + record.getHeaders()); } public ConsumerRecord toConsumerRecord() { @@ -122,7 +137,7 @@ public ConsumerRecord toConsumerRecord() { key != null ? ByteString.copyFrom(key) : null, value != null ? ByteString.copyFrom(value) : null, partition, - offset); + offset, headers); } @Override diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryPartitionProduceRequest.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryPartitionProduceRequest.java index 79020850bc..d5f98cf35f 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryPartitionProduceRequest.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryPartitionProduceRequest.java @@ -18,9 +18,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.confluent.kafkarest.entities.EntityUtils; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.ProduceRecord; import io.confluent.kafkarest.entities.ProduceRequest; import io.confluent.rest.validation.ConstraintViolations; + import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -70,7 +72,7 @@ public ProduceRequest toProduceRequest() { } return ProduceRequest.create( records.stream() - .map(record -> ProduceRecord.create(record.key, record.value, null)) + .map(record -> ProduceRecord.create(record.key, record.value, null, record.headers)) .collect(Collectors.toList()), /* keySchema= */ null, /* keySchemaId= */ null, @@ -111,10 +113,14 @@ public static final class BinaryPartitionProduceRecord { @Nullable private final byte[] value; + @Nullable + private final List headers; + @JsonCreator public BinaryPartitionProduceRecord( @JsonProperty("key") @Nullable String key, - @JsonProperty("value") @Nullable String value + @JsonProperty("value") @Nullable String value, + @JsonProperty("headers") @Nullable List headers ) { try { this.key = (key != null) ? EntityUtils.parseBase64Binary(key) : null; @@ -126,6 +132,7 @@ public BinaryPartitionProduceRecord( } catch (IllegalArgumentException e) { throw ConstraintViolations.simpleException("Record value contains invalid base64 encoding"); } + this.headers = headers; } @JsonProperty("key") @@ -140,6 +147,12 @@ public String getValue() { return (value == null ? null : EntityUtils.encodeBase64Binary(value)); } + @JsonProperty + @Nullable + public List getHeaders() { + return headers; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryTopicProduceRequest.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryTopicProduceRequest.java index 74fc88a664..bb8c985920 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryTopicProduceRequest.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/BinaryTopicProduceRequest.java @@ -18,13 +18,15 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.confluent.kafkarest.entities.EntityUtils; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.ProduceRecord; import io.confluent.kafkarest.entities.ProduceRequest; import io.confluent.rest.validation.ConstraintViolations; -import java.util.Arrays; + import java.util.List; import java.util.Objects; import java.util.StringJoiner; +import java.util.Arrays; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.validation.constraints.NotEmpty; @@ -71,7 +73,8 @@ public ProduceRequest toProduceRequest() { } return ProduceRequest.create( records.stream() - .map(record -> ProduceRecord.create(record.key, record.value, record.partition)) + .map(record -> ProduceRecord.create( + record.key, record.value, record.partition, record.headers)) .collect(Collectors.toList()), /* keySchema= */ null, /* keySchemaId= */ null, @@ -116,11 +119,15 @@ public static final class BinaryTopicProduceRecord { @Nullable private final Integer partition; + @Nullable + private final List headers; + @JsonCreator public BinaryTopicProduceRecord( @JsonProperty("key") @Nullable String key, @JsonProperty("value") @Nullable String value, - @JsonProperty("partition") @Nullable Integer partition + @JsonProperty("partition") @Nullable Integer partition, + @JsonProperty("headers") @Nullable List headers ) { try { this.key = (key != null) ? EntityUtils.parseBase64Binary(key) : null; @@ -133,6 +140,7 @@ public BinaryTopicProduceRecord( throw ConstraintViolations.simpleException("Record value contains invalid base64 encoding"); } this.partition = partition; + this.headers = headers; } @JsonProperty("key") @@ -153,6 +161,12 @@ public Integer getPartition() { return partition; } + @JsonProperty + @Nullable + public List getHeaders() { + return headers; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonConsumerRecord.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonConsumerRecord.java index 3ab8ab3411..75a5c714d6 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonConsumerRecord.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonConsumerRecord.java @@ -18,6 +18,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.confluent.kafkarest.entities.ConsumerRecord; +import io.confluent.kafkarest.entities.ForwardHeader; + +import java.util.List; import java.util.Objects; import java.util.StringJoiner; import javax.annotation.Nullable; @@ -36,6 +39,9 @@ public final class JsonConsumerRecord { @Nullable private final Object value; + @Nullable + private final List headers; + @PositiveOrZero @Nullable private final Integer partition; @@ -50,12 +56,14 @@ private JsonConsumerRecord( @JsonProperty("key") @Nullable Object key, @JsonProperty("value") @Nullable Object value, @JsonProperty("partition") @Nullable Integer partition, - @JsonProperty("offset") @Nullable Long offset) { + @JsonProperty("offset") @Nullable Long offset, + @JsonProperty("headers") @Nullable List headers) { this.topic = topic; this.key = key; this.value = value; this.partition = partition; this.offset = offset; + this.headers = headers; } @JsonProperty @@ -82,6 +90,12 @@ public Integer getPartition() { return partition; } + @JsonProperty + @Nullable + public List getHeaders() { + return headers; + } + @JsonProperty @Nullable public Long getOffset() { @@ -100,7 +114,8 @@ public static JsonConsumerRecord fromConsumerRecord(ConsumerRecord toConsumerRecord() { @@ -113,7 +128,7 @@ public ConsumerRecord toConsumerRecord() { if (offset == null || offset < 0) { throw new IllegalStateException(); } - return ConsumerRecord.create(topic, key, value, partition, offset); + return ConsumerRecord.create(topic, key, value, partition, offset, headers); } @Override diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonPartitionProduceRequest.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonPartitionProduceRequest.java index 390760136c..0126f6c17f 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonPartitionProduceRequest.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonPartitionProduceRequest.java @@ -17,8 +17,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.ProduceRecord; import io.confluent.kafkarest.entities.ProduceRequest; + import java.util.List; import java.util.Objects; import java.util.StringJoiner; @@ -67,7 +69,7 @@ public ProduceRequest toProduceRequest() { } return ProduceRequest.create( records.stream() - .map(record -> ProduceRecord.create(record.key, record.value, null)) + .map(record -> ProduceRecord.create(record.key, record.value, null, record.headers)) .collect(Collectors.toList()), /* keySchema= */ null, /* keySchemaId= */ null, @@ -108,13 +110,18 @@ public static final class JsonPartitionProduceRecord { @Nullable private final Object value; + @Nullable + private final List headers; + @JsonCreator public JsonPartitionProduceRecord( @JsonProperty("key") @Nullable Object key, - @JsonProperty("value") @Nullable Object value + @JsonProperty("value") @Nullable Object value, + @JsonProperty("headers") @Nullable List headers ) { this.key = key; this.value = value; + this.headers = headers; } @JsonProperty("key") @@ -129,6 +136,12 @@ public Object getValue() { return value; } + @JsonProperty + @Nullable + public List getHeaders() { + return headers; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonTopicProduceRequest.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonTopicProduceRequest.java index ed2446a0dd..5e77ee9bcf 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonTopicProduceRequest.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/JsonTopicProduceRequest.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.ProduceRecord; import io.confluent.kafkarest.entities.ProduceRequest; import java.util.List; @@ -68,7 +69,9 @@ public ProduceRequest toProduceRequest() { } return ProduceRequest.create( records.stream() - .map(record -> ProduceRecord.create(record.key, record.value, record.partition)) + .map(record -> + ProduceRecord.create( + record.key, record.value, record.partition, record.headers)) .collect(Collectors.toList()), /* keySchema= */ null, /* keySchemaId= */ null, @@ -108,6 +111,9 @@ public static final class JsonTopicProduceRecord { @Nullable private final Object value; + @Nullable + private final List headers; + @PositiveOrZero @Nullable private final Integer partition; @@ -116,11 +122,13 @@ public static final class JsonTopicProduceRecord { public JsonTopicProduceRecord( @JsonProperty("key") @Nullable Object key, @JsonProperty("value") @Nullable Object value, - @JsonProperty("partition") @Nullable Integer partition + @JsonProperty("partition") @Nullable Integer partition, + @JsonProperty("headers") @Nullable List headers ) { this.key = key; this.value = value; this.partition = partition; + this.headers = headers; } @JsonProperty("key") @@ -141,6 +149,12 @@ public Integer getPartition() { return partition; } + @JsonProperty + @Nullable + public List getHeaders() { + return headers; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaConsumerRecord.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaConsumerRecord.java index 320b9720fa..e7dd0bbb14 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaConsumerRecord.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaConsumerRecord.java @@ -19,6 +19,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import io.confluent.kafkarest.entities.ConsumerRecord; +import io.confluent.kafkarest.entities.ForwardHeader; + +import java.util.List; import java.util.Objects; import java.util.StringJoiner; import javax.annotation.Nullable; @@ -37,6 +40,9 @@ public final class SchemaConsumerRecord { @Nullable private final JsonNode value; + @Nullable + private final List headers; + @PositiveOrZero @Nullable private final Integer partition; @@ -51,12 +57,14 @@ private SchemaConsumerRecord( @JsonProperty("key") @Nullable JsonNode key, @JsonProperty("value") @Nullable JsonNode value, @JsonProperty("partition") @Nullable Integer partition, - @JsonProperty("offset") @Nullable Long offset) { + @JsonProperty("offset") @Nullable Long offset, + @JsonProperty("headers") @Nullable List headers) { this.topic = topic; this.key = key; this.value = value; this.partition = partition; this.offset = offset; + this.headers = headers; } @JsonProperty @@ -77,6 +85,12 @@ public JsonNode getValue() { return value; } + @JsonProperty + @Nullable + public List getHeaders() { + return headers; + } + @JsonProperty @Nullable public Integer getPartition() { @@ -101,7 +115,7 @@ public static SchemaConsumerRecord fromConsumerRecord(ConsumerRecord toConsumerRecord() { @@ -114,7 +128,7 @@ public ConsumerRecord toConsumerRecord() { if (offset == null || offset < 0) { throw new IllegalStateException(); } - return ConsumerRecord.create(topic, key, value, partition, offset); + return ConsumerRecord.create(topic, key, value, partition, offset, headers); } @Override diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaPartitionProduceRequest.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaPartitionProduceRequest.java index 92558ad763..fb7322df6d 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaPartitionProduceRequest.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaPartitionProduceRequest.java @@ -18,8 +18,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.ProduceRecord; import io.confluent.kafkarest.entities.ProduceRequest; + import java.util.List; import java.util.Objects; import java.util.StringJoiner; @@ -109,7 +111,7 @@ public ProduceRequest toProduceRequest() { } return ProduceRequest.create( records.stream() - .map(record -> ProduceRecord.create(record.key, record.value, null)) + .map(record -> ProduceRecord.create(record.key, record.value, null, record.headers)) .collect(Collectors.toList()), keySchema, keySchemaId, @@ -158,13 +160,18 @@ public static final class SchemaPartitionProduceRecord { @Nullable private final JsonNode value; + @Nullable + private final List headers; + @JsonCreator public SchemaPartitionProduceRecord( @JsonProperty("key") @Nullable JsonNode key, - @JsonProperty("value") @Nullable JsonNode value + @JsonProperty("value") @Nullable JsonNode value, + @JsonProperty("headers") @Nullable List headers ) { this.key = key; this.value = value; + this.headers = headers; } @JsonProperty("key") @@ -179,6 +186,12 @@ public JsonNode getValue() { return value; } + @JsonProperty + @Nullable + public List getHeaders() { + return headers; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaTopicProduceRequest.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaTopicProduceRequest.java index 3e88a62c4a..de677c207e 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaTopicProduceRequest.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/v2/SchemaTopicProduceRequest.java @@ -18,8 +18,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.ProduceRecord; import io.confluent.kafkarest.entities.ProduceRequest; + import java.util.List; import java.util.Objects; import java.util.StringJoiner; @@ -110,7 +112,8 @@ public ProduceRequest toProduceRequest() { } return ProduceRequest.create( records.stream() - .map(record -> ProduceRecord.create(record.key, record.value, record.partition)) + .map(record -> ProduceRecord.create( + record.key, record.value, record.partition, record.headers)) .collect(Collectors.toList()), keySchema, keySchemaId, @@ -163,16 +166,21 @@ public static final class SchemaTopicProduceRecord { @Nullable private final Integer partition; + @Nullable + private final List headers; + @JsonCreator public SchemaTopicProduceRecord( @JsonProperty("key") @Nullable JsonNode key, @JsonProperty("value") @Nullable JsonNode value, - @JsonProperty("partition") @Nullable Integer partition + @JsonProperty("partition") @Nullable Integer partition, + @JsonProperty("headers") @Nullable List headers ) { this.key = key; this.value = value; this.partition = partition; + this.headers = headers; } @JsonProperty("key") @@ -193,6 +201,12 @@ public Integer getPartition() { return partition; } + @JsonProperty + @Nullable + public List getHeaders() { + return headers; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/tools/ProducerPerformance.java b/kafka-rest/src/main/java/io/confluent/kafkarest/tools/ProducerPerformance.java index 3b35e5eb43..731484eae1 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/tools/ProducerPerformance.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/tools/ProducerPerformance.java @@ -83,7 +83,7 @@ public ProducerPerformance( /* setup perf test */ targetUrl = baseUrl + "/topics/" + topic; BinaryTopicProduceRequest.BinaryTopicProduceRecord record = - new BinaryTopicProduceRequest.BinaryTopicProduceRecord(null, "payload", null); + new BinaryTopicProduceRequest.BinaryTopicProduceRecord(null, "payload", null, null); BinaryTopicProduceRequest.BinaryTopicProduceRecord[] records = new BinaryTopicProduceRequest.BinaryTopicProduceRecord[recordsPerIteration]; Arrays.fill(records, record); diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/BinaryKafkaConsumerState.java b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/BinaryKafkaConsumerState.java index 6dadba30a6..38d744dd7a 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/BinaryKafkaConsumerState.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/BinaryKafkaConsumerState.java @@ -19,9 +19,13 @@ import io.confluent.kafkarest.ConsumerInstanceId; import io.confluent.kafkarest.ConsumerRecordAndSize; import io.confluent.kafkarest.KafkaRestConfig; +import io.confluent.kafkarest.entities.ForwardHeader; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.util.ArrayList; +import java.util.List; + /** * Binary implementation of KafkaConsumerState that does no decoding, returning the raw bytes @@ -41,14 +45,15 @@ public ConsumerRecordAndSize createConsumerRecord( ConsumerRecord record) { long approxSize = (record.key() != null ? record.key().length : 0) + (record.value() != null ? record.value().length : 0); - + List headers = new ArrayList<>(); + record.headers().forEach(header -> headers.add(new ForwardHeader(header))); return new ConsumerRecordAndSize<>( io.confluent.kafkarest.entities.ConsumerRecord.create( record.topic(), record.key() != null ? ByteString.copyFrom(record.key()) : null, record.value() != null ? ByteString.copyFrom(record.value()) : null, record.partition(), - record.offset()), + record.offset(), headers), approxSize); } } diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/JsonKafkaConsumerState.java b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/JsonKafkaConsumerState.java index bac80c2b77..791c5ab529 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/JsonKafkaConsumerState.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/JsonKafkaConsumerState.java @@ -19,10 +19,14 @@ import io.confluent.kafkarest.ConsumerInstanceId; import io.confluent.kafkarest.ConsumerRecordAndSize; import io.confluent.kafkarest.KafkaRestConfig; +import io.confluent.kafkarest.entities.ForwardHeader; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.errors.SerializationException; +import java.util.ArrayList; +import java.util.List; + public class JsonKafkaConsumerState extends KafkaConsumerState { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -45,19 +49,23 @@ public ConsumerRecordAndSize createConsumerRecord( // and just use the raw bytes, but that risks returning invalid data to the user // if their data is not actually JSON encoded. - if (record.key() != null) { + if (record.key() != null && record.key().length > 0) { approxSize += record.key().length; key = deserialize(record.key()); } - if (record.value() != null) { + if (record.value() != null && record.value().length > 0) { approxSize += record.value().length; value = deserialize(record.value()); } + List headers = new ArrayList<>(); + record.headers().forEach(header -> headers.add(new ForwardHeader(header))); + return new ConsumerRecordAndSize<>( io.confluent.kafkarest.entities.ConsumerRecord.create( - record.topic(), key, value, record.partition(), record.offset()), approxSize); + record.topic(), key, value, record.partition(), + record.offset(), headers), approxSize); } private Object deserialize(byte[] data) { diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/SchemaKafkaConsumerState.java b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/SchemaKafkaConsumerState.java index 6018e016b5..dcb69b61f5 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/SchemaKafkaConsumerState.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/SchemaKafkaConsumerState.java @@ -20,9 +20,13 @@ import io.confluent.kafkarest.ConsumerRecordAndSize; import io.confluent.kafkarest.KafkaRestConfig; import io.confluent.kafkarest.converters.SchemaConverter; +import io.confluent.kafkarest.entities.ForwardHeader; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.util.ArrayList; +import java.util.List; + /** * Schema-specific implementation of KafkaConsumerState, which decodes * into Objects or primitive types. @@ -45,13 +49,15 @@ public ConsumerRecordAndSize createConsumerRecord( ConsumerRecord record) { SchemaConverter.JsonNodeAndSize keyNode = schemaConverter.toJson(record.key()); SchemaConverter.JsonNodeAndSize valueNode = schemaConverter.toJson(record.value()); + List headers = new ArrayList<>(); + record.headers().forEach(header -> headers.add(new ForwardHeader(header))); return new ConsumerRecordAndSize<>( io.confluent.kafkarest.entities.ConsumerRecord.create( record.topic(), keyNode.getJson(), valueNode.getJson(), record.partition(), - record.offset()), + record.offset(), headers), keyNode.getSize() + valueNode.getSize()); } } diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/entities/ForwardHeaderTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/entities/ForwardHeaderTest.java new file mode 100644 index 0000000000..49155b0271 --- /dev/null +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/entities/ForwardHeaderTest.java @@ -0,0 +1,48 @@ +package io.confluent.kafkarest.entities; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public final class ForwardHeaderTest { + + public static final String KEYS_SINGLE = "[{\"key1\":\"value1\"}]"; + public static final String KEYS_TWO = "[{\"key1\":\"value1\"},{\"key1\":\"value2\"}]"; + ObjectMapper mapper; + + @Before + public void setUp() { + mapper = new ObjectMapper(); + } + + @Test + public void serialize() throws JsonProcessingException { + List headers = new ArrayList<>(); + assertEquals("[]", mapper.writeValueAsString(headers)); + headers.add(new ForwardHeader("key1", "value1")); + assertEquals(KEYS_SINGLE, mapper.writeValueAsString(headers)); + headers.add(new ForwardHeader("key1", "value2")); + assertEquals(KEYS_TWO, mapper.writeValueAsString(headers)); + } + + @Test + public void deserialize() throws JsonProcessingException { + ForwardHeader[] headers = mapper.readValue(KEYS_SINGLE, ForwardHeader[].class); + assertEquals(1, headers.length); + assertEquals("key1", headers[0].key); + assertArrayEquals("value1".getBytes(), headers[0].value); + headers = mapper.readValue(KEYS_TWO, ForwardHeader[].class); + assertEquals(2, headers.length); + assertEquals("key1", headers[0].key); + assertArrayEquals("value1".getBytes(), headers[0].value); + assertEquals("key1", headers[1].key); + assertArrayEquals("value2".getBytes(), headers[1].value); + } +} \ No newline at end of file diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java index c5805f4f88..cf3d29d71f 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AuthorizationErrorTest.java @@ -56,10 +56,10 @@ public class AuthorizationErrorTest // Produce to topic inputs & results private final List topicRecords = Arrays.asList( - new BinaryTopicProduceRecord("key", "value", null), - new BinaryTopicProduceRecord("key", "value2", null), - new BinaryTopicProduceRecord("key", "value3", null), - new BinaryTopicProduceRecord("key", "value4", null) + new BinaryTopicProduceRecord("key", "value", null, null), + new BinaryTopicProduceRecord("key", "value2", null, null), + new BinaryTopicProduceRecord("key", "value3", null, null), + new BinaryTopicProduceRecord("key", "value4", null, null) ); private final List produceOffsets = Arrays.asList( diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AvroProducerTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AvroProducerTest.java index d1c981db3c..9395df09a1 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AvroProducerTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/AvroProducerTest.java @@ -76,10 +76,10 @@ public class AvroProducerTest extends ClusterTestHarness { // Produce to topic inputs & results protected final List topicRecordsWithPartitionsAndKeys = Arrays.asList( - new SchemaTopicProduceRecord(testKeys[0], testValues[0], 0), - new SchemaTopicProduceRecord(testKeys[1], testValues[1], 1), - new SchemaTopicProduceRecord(testKeys[2], testValues[2], 1), - new SchemaTopicProduceRecord(testKeys[3], testValues[3], 2) + new SchemaTopicProduceRecord(testKeys[0], testValues[0], 0, null), + new SchemaTopicProduceRecord(testKeys[1], testValues[1], 1, null), + new SchemaTopicProduceRecord(testKeys[2], testValues[2], 1, null), + new SchemaTopicProduceRecord(testKeys[3], testValues[3], 2, null) ); protected final List partitionOffsetsWithPartitionsAndKeys = Arrays.asList( new PartitionOffset(0, 0L, null, null), @@ -90,10 +90,10 @@ public class AvroProducerTest extends ClusterTestHarness { // Produce to partition inputs & results protected final List partitionRecordsOnlyValues = Arrays.asList( - new SchemaPartitionProduceRecord(null, testValues[0]), - new SchemaPartitionProduceRecord(null, testValues[1]), - new SchemaPartitionProduceRecord(null, testValues[2]), - new SchemaPartitionProduceRecord(null, testValues[3]) + new SchemaPartitionProduceRecord(null, testValues[0], null), + new SchemaPartitionProduceRecord(null, testValues[1], null), + new SchemaPartitionProduceRecord(null, testValues[2], null), + new SchemaPartitionProduceRecord(null, testValues[3], null) ); protected final List producePartitionOffsetOnlyValues = Arrays.asList( new PartitionOffset(0, 0L, null, null), diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/JsonProducerTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/JsonProducerTest.java index ee0b6076d1..bd25d0ed22 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/JsonProducerTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/JsonProducerTest.java @@ -16,17 +16,15 @@ import io.confluent.kafka.serializers.KafkaJsonDeserializer; import io.confluent.kafkarest.Versions; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.v2.JsonPartitionProduceRequest; import io.confluent.kafkarest.entities.v2.JsonPartitionProduceRequest.JsonPartitionProduceRecord; import io.confluent.kafkarest.entities.v2.JsonTopicProduceRequest; import io.confluent.kafkarest.entities.v2.JsonTopicProduceRequest.JsonTopicProduceRecord; import io.confluent.kafkarest.entities.v2.PartitionOffset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; + +import java.util.*; + import org.junit.Before; import org.junit.Test; import scala.collection.JavaConverters; @@ -35,6 +33,7 @@ public class JsonProducerTest extends AbstractProducerTest { private String topicName = "topic1"; + private static final List headers = Collections.emptyList(); @Before @Override @@ -72,39 +71,39 @@ private List exampleListValue() { } private final List topicRecordsWithKeys = Arrays.asList( - new JsonTopicProduceRecord("key", "value", 0), - new JsonTopicProduceRecord("key", null, 0), - new JsonTopicProduceRecord("key", 53.4, 0), - new JsonTopicProduceRecord("key", 45, 0), - new JsonTopicProduceRecord("key", exampleMapValue(), 0), - new JsonTopicProduceRecord("key", exampleListValue(), 0) + new JsonTopicProduceRecord("key", "value", 0, headers), + new JsonTopicProduceRecord("key", null, 0, headers), + new JsonTopicProduceRecord("key", 53.4, 0, headers), + new JsonTopicProduceRecord("key", 45, 0, headers), + new JsonTopicProduceRecord("key", exampleMapValue(), 0, headers), + new JsonTopicProduceRecord("key", exampleListValue(), 0, headers) ); private final List topicRecordsWithoutKeys = Arrays.asList( - new JsonTopicProduceRecord(null, "value", 0), - new JsonTopicProduceRecord(null, null, 0), - new JsonTopicProduceRecord(null, 53.4, 0), - new JsonTopicProduceRecord(null, 45, 0), - new JsonTopicProduceRecord(null, exampleMapValue(), 0), - new JsonTopicProduceRecord(null, exampleListValue(), 0) + new JsonTopicProduceRecord(null, "value", 0, headers), + new JsonTopicProduceRecord(null, null, 0, headers), + new JsonTopicProduceRecord(null, 53.4, 0, headers), + new JsonTopicProduceRecord(null, 45, 0, headers), + new JsonTopicProduceRecord(null, exampleMapValue(), 0, headers), + new JsonTopicProduceRecord(null, exampleListValue(), 0, headers) ); private final List partitionRecordsWithKeys = Arrays.asList( - new JsonPartitionProduceRecord("key", "value"), - new JsonPartitionProduceRecord("key", null), - new JsonPartitionProduceRecord("key", 53.4), - new JsonPartitionProduceRecord("key", 45), - new JsonPartitionProduceRecord("key", exampleMapValue()), - new JsonPartitionProduceRecord("key", exampleListValue()) + new JsonPartitionProduceRecord("key", "value", headers), + new JsonPartitionProduceRecord("key", null, headers), + new JsonPartitionProduceRecord("key", 53.4, headers), + new JsonPartitionProduceRecord("key", 45, headers), + new JsonPartitionProduceRecord("key", exampleMapValue(), headers), + new JsonPartitionProduceRecord("key", exampleListValue(), headers) ); private final List partitionRecordsWithoutKeys = Arrays.asList( - new JsonPartitionProduceRecord(null, "value"), - new JsonPartitionProduceRecord(null, null), - new JsonPartitionProduceRecord(null, 53.4), - new JsonPartitionProduceRecord(null, 45), - new JsonPartitionProduceRecord(null, exampleMapValue()), - new JsonPartitionProduceRecord(null, exampleListValue()) + new JsonPartitionProduceRecord(null, "value", headers), + new JsonPartitionProduceRecord(null, null, headers), + new JsonPartitionProduceRecord(null, 53.4, headers), + new JsonPartitionProduceRecord(null, 45, headers), + new JsonPartitionProduceRecord(null, exampleMapValue(), headers), + new JsonPartitionProduceRecord(null, exampleListValue(), headers) ); private final List produceOffsets = Arrays.asList( diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ProducerTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ProducerTest.java index 13aef61c47..570012e3e1 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ProducerTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ProducerTest.java @@ -23,15 +23,14 @@ import io.confluent.kafkarest.RecordMetadataOrException; import io.confluent.kafkarest.Versions; import io.confluent.kafkarest.entities.EmbeddedFormat; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.v2.BinaryPartitionProduceRequest; import io.confluent.kafkarest.entities.v2.BinaryPartitionProduceRequest.BinaryPartitionProduceRecord; import io.confluent.kafkarest.entities.v2.BinaryTopicProduceRequest; import io.confluent.kafkarest.entities.v2.BinaryTopicProduceRequest.BinaryTopicProduceRecord; import io.confluent.kafkarest.entities.v2.PartitionOffset; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Properties; + +import java.util.*; import javax.ws.rs.client.Entity; import javax.ws.rs.core.Response; import org.apache.kafka.clients.producer.ProducerConfig; @@ -45,57 +44,58 @@ public class ProducerTest extends AbstractProducerTest { private static final String topicName = "topic1"; + private static final List headers = Collections.emptyList(); // Produce to topic inputs & results private final List topicRecordsWithKeys = Arrays.asList( - new BinaryTopicProduceRecord("key", "value", null), - new BinaryTopicProduceRecord("key", "value2", null), - new BinaryTopicProduceRecord("key", "value3", null), - new BinaryTopicProduceRecord("key", "value4", null) + new BinaryTopicProduceRecord("key", "value", null, headers), + new BinaryTopicProduceRecord("key", "value2", null, headers), + new BinaryTopicProduceRecord("key", "value3", null, headers), + new BinaryTopicProduceRecord("key", "value4", null, headers) ); private final List topicRecordsWithPartitions = Arrays.asList( - new BinaryTopicProduceRecord(null, "value", 0), - new BinaryTopicProduceRecord(null, "value2", 1), - new BinaryTopicProduceRecord(null, "value3", 1), - new BinaryTopicProduceRecord(null, "value4", 2) + new BinaryTopicProduceRecord(null, "value", 0, headers), + new BinaryTopicProduceRecord(null, "value2", 1, headers), + new BinaryTopicProduceRecord(null, "value3", 1, headers), + new BinaryTopicProduceRecord(null, "value4", 2, headers) ); private final List topicRecordsWithPartitionsAndKeys = Arrays.asList( - new BinaryTopicProduceRecord("key", "value", 0), - new BinaryTopicProduceRecord("key2", "value2", 1), - new BinaryTopicProduceRecord("key3", "value3", 1), - new BinaryTopicProduceRecord("key4", "value4", 2) + new BinaryTopicProduceRecord("key", "value", 0, headers), + new BinaryTopicProduceRecord("key2", "value2", 1, headers), + new BinaryTopicProduceRecord("key3", "value3", 1, headers), + new BinaryTopicProduceRecord("key4", "value4", 2, headers) ); private final List topicRecordsWithNullValues = Arrays.asList( - new BinaryTopicProduceRecord("key", null, null), - new BinaryTopicProduceRecord("key", null, null), - new BinaryTopicProduceRecord("key", null, null), - new BinaryTopicProduceRecord("key", null, null) + new BinaryTopicProduceRecord("key", null, null, headers), + new BinaryTopicProduceRecord("key", null, null, headers), + new BinaryTopicProduceRecord("key", null, null, headers), + new BinaryTopicProduceRecord("key", null, null, headers) ); // Produce to partition inputs & results private final List partitionRecordsOnlyValues = Arrays.asList( - new BinaryPartitionProduceRecord(null, "value"), - new BinaryPartitionProduceRecord(null, "value2"), - new BinaryPartitionProduceRecord(null, "value3"), - new BinaryPartitionProduceRecord(null, "value4") + new BinaryPartitionProduceRecord(null, "value", headers), + new BinaryPartitionProduceRecord(null, "value2", headers), + new BinaryPartitionProduceRecord(null, "value3", headers), + new BinaryPartitionProduceRecord(null, "value4", headers) ); private final List partitionRecordsWithKeys = Arrays.asList( - new BinaryPartitionProduceRecord("key", "value"), - new BinaryPartitionProduceRecord("key", "value2"), - new BinaryPartitionProduceRecord("key", "value3"), - new BinaryPartitionProduceRecord("key", "value4") + new BinaryPartitionProduceRecord("key", "value", headers), + new BinaryPartitionProduceRecord("key", "value2", headers), + new BinaryPartitionProduceRecord("key", "value3", headers), + new BinaryPartitionProduceRecord("key", "value4", headers) ); private final List partitionRecordsWithNullValues = Arrays.asList( - new BinaryPartitionProduceRecord("key1", null), - new BinaryPartitionProduceRecord("key2", null), - new BinaryPartitionProduceRecord("key3", null), - new BinaryPartitionProduceRecord("key4", null) + new BinaryPartitionProduceRecord("key1", null, headers), + new BinaryPartitionProduceRecord("key2", null, headers), + new BinaryPartitionProduceRecord("key3", null, headers), + new BinaryPartitionProduceRecord("key4", null, headers) ); private final List produceOffsets = Arrays.asList( @@ -147,7 +147,7 @@ public void testProducerConfigOverrides() { BinaryPartitionProduceRequest request = BinaryPartitionProduceRequest.create( - Collections.singletonList(new BinaryPartitionProduceRecord(null, "data"))); + Collections.singletonList(new BinaryPartitionProduceRecord(null, "data", headers))); sawCallback = false; pool.produce( diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ProducerTopicAutoCreationTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ProducerTopicAutoCreationTest.java index dfe76e9fb1..32485ddbe5 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ProducerTopicAutoCreationTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/ProducerTopicAutoCreationTest.java @@ -15,13 +15,14 @@ package io.confluent.kafkarest.integration; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.v2.BinaryPartitionProduceRequest; import io.confluent.kafkarest.entities.v2.BinaryTopicProduceRequest; import io.confluent.kafkarest.entities.v2.BinaryTopicProduceRequest.BinaryTopicProduceRecord; import io.confluent.kafkarest.entities.v2.PartitionOffset; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; + +import java.util.*; + import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.junit.Test; @@ -29,12 +30,13 @@ public class ProducerTopicAutoCreationTest extends AbstractProducerTest { private static final String topicName = "nonexistant"; + private static final List headers = Collections.emptyList(); private final List topicRecords = Arrays.asList( - new BinaryTopicProduceRecord("key", "value", null), - new BinaryTopicProduceRecord("key", "value2", null), - new BinaryTopicProduceRecord("key", "value3", null), - new BinaryTopicProduceRecord("key", "value4", null) + new BinaryTopicProduceRecord("key", "value", null, headers), + new BinaryTopicProduceRecord("key", "value2", null, headers), + new BinaryTopicProduceRecord("key", "value3", null, headers), + new BinaryTopicProduceRecord("key", "value4", null, headers) ); private final List partitionOffsets = Arrays.asList( new PartitionOffset(0, 0L, null, null), diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/AvroProduceConsumeTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/AvroProduceConsumeTest.java index 3d0ad4fc5b..be87e08fa8 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/AvroProduceConsumeTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/AvroProduceConsumeTest.java @@ -40,17 +40,17 @@ public final class AvroProduceConsumeTest extends SchemaProduceConsumeTest { new IntNode(1), AVRO_CONVERTER.toJson( new GenericRecordBuilder(VALUE_SCHEMA).set("value", 11).build()).getJson(), - /* partition= */ 0), + /* partition= */ 0, null), new SchemaTopicProduceRecord( new IntNode(2), AVRO_CONVERTER.toJson( new GenericRecordBuilder(VALUE_SCHEMA).set("value", 12).build()).getJson(), - /* partition= */ 0), + /* partition= */ 0, null), new SchemaTopicProduceRecord( new IntNode(3), AVRO_CONVERTER.toJson( new GenericRecordBuilder(VALUE_SCHEMA).set("value", 13).build()).getJson(), - /* partition= */ 0)); + /* partition= */ 0, null)); public AvroProduceConsumeTest() { super(/* numBrokers= */ 1, /* withSchemaRegistry= */ true); diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/JsonSchemaProduceConsumeTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/JsonSchemaProduceConsumeTest.java index 0734a3de51..10568f1d1a 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/JsonSchemaProduceConsumeTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/JsonSchemaProduceConsumeTest.java @@ -29,15 +29,15 @@ public final class JsonSchemaProduceConsumeTest extends SchemaProduceConsumeTest new SchemaTopicProduceRecord( new IntNode(1), JSON_SCHEMA_CONVERTER.toJson(getMessage(11)).getJson(), - /* partition= */ 0), + /* partition= */ 0, null), new SchemaTopicProduceRecord( new IntNode(2), JSON_SCHEMA_CONVERTER.toJson(getMessage(12)).getJson(), - /* partition= */ 0), + /* partition= */ 0, null), new SchemaTopicProduceRecord( new IntNode(3), JSON_SCHEMA_CONVERTER.toJson(getMessage(13)).getJson(), - /* partition= */ 0)); + /* partition= */ 0, null)); private static JsonNode getMessage(int value) { ObjectMapper objectMapper = new ObjectMapper(); diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/ProtobufProduceConsumeTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/ProtobufProduceConsumeTest.java index d25f9a4272..ead8c316ea 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/ProtobufProduceConsumeTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/integration/v2/ProtobufProduceConsumeTest.java @@ -28,15 +28,15 @@ public final class ProtobufProduceConsumeTest extends SchemaProduceConsumeTest { new SchemaTopicProduceRecord( PROOTBUF_CONVERTER.toJson(getMessage(KEY_SCHEMA, "key", 1)).getJson(), PROOTBUF_CONVERTER.toJson(getMessage(VALUE_SCHEMA, "value", 11)).getJson(), - /* partition= */ 0), + /* partition= */ 0, null), new SchemaTopicProduceRecord( PROOTBUF_CONVERTER.toJson(getMessage(KEY_SCHEMA, "key", 2)).getJson(), PROOTBUF_CONVERTER.toJson(getMessage(VALUE_SCHEMA, "value", 12)).getJson(), - /* partition= */ 0), + /* partition= */ 0, null), new SchemaTopicProduceRecord( PROOTBUF_CONVERTER.toJson(getMessage(KEY_SCHEMA, "key", 3)).getJson(), PROOTBUF_CONVERTER.toJson(getMessage(VALUE_SCHEMA, "value", 13)).getJson(), - /* partition= */ 0)); + /* partition= */ 0, null)); private static Message getMessage(ProtobufSchema schema, String fieldName, int value) { DynamicMessage.Builder messageBuilder = schema.newMessageBuilder(); diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v2/TopicsResourceAvroProduceTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v2/TopicsResourceAvroProduceTest.java index a0b95d84f8..dbcbe046fe 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v2/TopicsResourceAvroProduceTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v2/TopicsResourceAvroProduceTest.java @@ -100,8 +100,8 @@ public TopicsResourceAvroProduceTest() throws RestConfigException { addResource(new ProduceToTopicAction(ctx)); produceRecordsWithPartitionsAndKeys = Arrays.asList( - new SchemaTopicProduceRecord(testKeys[0], testValues[0], 0), - new SchemaTopicProduceRecord(testKeys[1], testValues[1], 0) + new SchemaTopicProduceRecord(testKeys[0], testValues[0], 0, null), + new SchemaTopicProduceRecord(testKeys[1], testValues[1], 0, null) ); } diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v2/TopicsResourceBinaryProduceTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v2/TopicsResourceBinaryProduceTest.java index 9ae588fc53..623f38fcbe 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v2/TopicsResourceBinaryProduceTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v2/TopicsResourceBinaryProduceTest.java @@ -89,24 +89,24 @@ public TopicsResourceBinaryProduceTest() throws RestConfigException { addResource(new ProduceToTopicAction(ctx)); produceRecordsOnlyValues = Arrays.asList( - new BinaryTopicProduceRecord(null, "value", null), - new BinaryTopicProduceRecord(null, "value2", null) + new BinaryTopicProduceRecord(null, "value", null, null), + new BinaryTopicProduceRecord(null, "value2", null, null) ); produceRecordsWithKeys = Arrays.asList( - new BinaryTopicProduceRecord("key", "value", null), - new BinaryTopicProduceRecord("key2", "value2", null) + new BinaryTopicProduceRecord("key", "value", null, null), + new BinaryTopicProduceRecord("key2", "value2", null, null) ); produceRecordsWithPartitions = Arrays.asList( - new BinaryTopicProduceRecord(null, "value", 0), - new BinaryTopicProduceRecord(null, "value2", 0) + new BinaryTopicProduceRecord(null, "value", 0, null), + new BinaryTopicProduceRecord(null, "value2", 0, null) ); produceRecordsWithPartitionsAndKeys = Arrays.asList( - new BinaryTopicProduceRecord("key", "value", 0), - new BinaryTopicProduceRecord("key2", "value2", 0) + new BinaryTopicProduceRecord("key", "value", 0, null), + new BinaryTopicProduceRecord("key2", "value2", 0, null) ); produceRecordsWithNullValues = Arrays.asList( - new BinaryTopicProduceRecord("key", null, null), - new BinaryTopicProduceRecord("key2", null, null) + new BinaryTopicProduceRecord("key", null, null, null), + new BinaryTopicProduceRecord("key2", null, null, null) ); TopicPartition tp0 = new TopicPartition(topicName, 0); @@ -121,7 +121,7 @@ public TopicsResourceBinaryProduceTest() throws RestConfigException { ); produceExceptionData = Arrays.asList( - new BinaryTopicProduceRecord(null, null, null) + new BinaryTopicProduceRecord(null, null, null, null) ); produceGenericExceptionResults = Arrays.asList( diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/unit/AvroRestProducerTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/unit/AvroRestProducerTest.java index 3a6932cbe9..11caeb3ea1 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/unit/AvroRestProducerTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/unit/AvroRestProducerTest.java @@ -67,7 +67,7 @@ public void testInvalidSchema() throws Exception { schemaHolder = ProduceRequest.create( Collections.singletonList( - ProduceRecord.create(mapper.readTree("{}"), mapper.readTree("{}"), null)), + ProduceRecord.create(mapper.readTree("{}"), mapper.readTree("{}"), null, null)), /* keySchema= */ null, /* keySchemaId= */ null, "invalidValueSchema", @@ -84,7 +84,7 @@ public void testInvalidData() throws Exception { schemaHolder = ProduceRequest.create( Collections.singletonList( - ProduceRecord.create(null, mapper.readTree("\"string\""), null)), + ProduceRecord.create(null, mapper.readTree("\"string\""), null, null)), /* keySchema= */ null, /* keySchemaId= */ null, /* valueSchema= */ "\"int\"", @@ -130,7 +130,7 @@ public void testRepeatedProducer() throws Exception { schemaHolder = ProduceRequest.create( Collections.singletonList( - ProduceRecord.create(null, mapper.readTree("{\"name\": \"bob\"}"), null)), + ProduceRecord.create(null, mapper.readTree("{\"name\": \"bob\"}"), null, null)), /* keySchema= */ null, /* keySchemaId= */ null, valueSchemaStr, diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/v2/KafkaConsumerManagerTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/v2/KafkaConsumerManagerTest.java index 6e68012e2a..e60a0723d2 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/v2/KafkaConsumerManagerTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/v2/KafkaConsumerManagerTest.java @@ -26,17 +26,11 @@ import io.confluent.kafkarest.ConsumerReadCallback; import io.confluent.kafkarest.KafkaRestConfig; import io.confluent.kafkarest.SystemTime; -import io.confluent.kafkarest.entities.ConsumerInstanceConfig; -import io.confluent.kafkarest.entities.ConsumerRecord; -import io.confluent.kafkarest.entities.EmbeddedFormat; -import io.confluent.kafkarest.entities.TopicPartitionOffset; +import io.confluent.kafkarest.entities.*; import io.confluent.kafkarest.entities.v2.ConsumerOffsetCommitRequest; import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionRecord; import java.time.Instant; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Properties; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -73,6 +67,7 @@ public class KafkaConsumerManagerTest { private static final String groupName = "testgroup"; private static final String topicName = "testtopic"; + private static final List headers = Collections.emptyList(); // Setup holding vars for results from callback private boolean sawCallback = false; @@ -526,9 +521,12 @@ private List> bootstrapConsumer(final Moc private List> bootstrapConsumer(final MockConsumer consumer, boolean toExpectCreate) { List> referenceRecords = Arrays.asList( - ConsumerRecord.create(topicName, ByteString.copyFromUtf8("k1"), ByteString.copyFromUtf8("v1"), 0, 0), - ConsumerRecord.create(topicName, ByteString.copyFromUtf8("k2"), ByteString.copyFromUtf8("v2"), 0, 1), - ConsumerRecord.create(topicName, ByteString.copyFromUtf8("k3"), ByteString.copyFromUtf8("v3"), 0, 2)); + ConsumerRecord.create(topicName, ByteString.copyFromUtf8("k1"), + ByteString.copyFromUtf8("v1"), 0, 0, headers), + ConsumerRecord.create(topicName, ByteString.copyFromUtf8("k2"), + ByteString.copyFromUtf8("v2"), 0, 1, headers), + ConsumerRecord.create(topicName, ByteString.copyFromUtf8("k3"), + ByteString.copyFromUtf8("v3"), 0, 2, headers)); if (toExpectCreate) expectCreate(consumer); @@ -591,7 +589,8 @@ private ConsumerRecord binaryConsumerRecord(int offset) ByteString.copyFromUtf8(String.format("k%d", offset)), ByteString.copyFromUtf8(String.format("v%d", offset)), 0, - offset + offset, + headers ); } diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/v2/LoadTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/v2/LoadTest.java index 16f8d8db7c..582df807e7 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/v2/LoadTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/v2/LoadTest.java @@ -27,14 +27,10 @@ import io.confluent.kafkarest.SystemTime; import io.confluent.kafkarest.entities.ConsumerRecord; import io.confluent.kafkarest.entities.EmbeddedFormat; +import io.confluent.kafkarest.entities.ForwardHeader; import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionRecord; import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.Random; +import java.util.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.kafka.clients.consumer.Consumer; @@ -59,6 +55,8 @@ public class LoadTest { private static final String topicName = "testtopic"; + private static final List headers = Collections.emptyList(); + private Capture capturedConsumerConfig; private long requestTimeoutMs = 1000; @@ -146,11 +144,14 @@ void verifyRead() { private List> referenceRecords() { return Arrays.asList( ConsumerRecord.create( - topicName, ByteString.copyFromUtf8("k1"), ByteString.copyFromUtf8("v1"), 0, latestOffset - 3), + topicName, ByteString.copyFromUtf8("k1"), ByteString.copyFromUtf8("v1"), 0, + latestOffset - 3, headers), ConsumerRecord.create( - topicName, ByteString.copyFromUtf8("k2"), ByteString.copyFromUtf8("v2"), 0, latestOffset - 2), + topicName, ByteString.copyFromUtf8("k2"), ByteString.copyFromUtf8("v2"), 0, + latestOffset - 2, headers), ConsumerRecord.create( - topicName, ByteString.copyFromUtf8("k3"), ByteString.copyFromUtf8("v3"), 0, latestOffset - 1)); + topicName, ByteString.copyFromUtf8("k3"), ByteString.copyFromUtf8("v3"), 0, + latestOffset - 1, headers)); } private void schedulePoll() { From 53dfd2108c96f83ddbf2d0bcc3f9d0b39b3c2d39 Mon Sep 17 00:00:00 2001 From: pastor Date: Tue, 15 Sep 2020 18:04:15 +0300 Subject: [PATCH 2/2] FIX: NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE --- .../java/io/confluent/kafkarest/entities/ConsumerRecord.java | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerRecord.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerRecord.java index 8719790925..b96dd39103 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerRecord.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerRecord.java @@ -38,6 +38,7 @@ public abstract class ConsumerRecord { public abstract long getOffset(); + @Nullable public abstract List getHeaders(); public static ConsumerRecord create(