diff --git a/api/pom.xml b/api/pom.xml index 2298d1bb5..138db7487 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -58,15 +58,15 @@ io.quarkus - quarkus-resteasy-reactive + quarkus-rest io.quarkus - quarkus-resteasy-reactive-jackson + quarkus-rest-jackson io.quarkus - quarkus-rest-client-reactive + quarkus-rest-client io.quarkus @@ -84,6 +84,10 @@ io.quarkus quarkus-kubernetes-client + + io.quarkus + quarkus-apicurio-registry-avro + io.smallrye.common smallrye-common-annotation @@ -109,6 +113,24 @@ kafka-oauth-client + + io.apicurio + apicurio-registry-serdes-avro-serde + + + io.apicurio + apicurio-registry-serdes-jsonschema-serde + + + io.apicurio + apicurio-registry-serdes-protobuf-serde + 2.5.8.Final + + + com.google.protobuf + protobuf-java-util + + com.fasterxml.jackson.core jackson-annotations diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index cccbd5e54..91c78153b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -16,6 +16,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -53,6 +54,8 @@ import org.apache.kafka.common.security.plain.PlainLoginModule; import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -65,9 +68,11 @@ import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.TrustAllCertificateManager; import com.github.streamshub.console.api.support.ValidationProxy; +import com.github.streamshub.console.api.support.serdes.RecordData; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; +import io.apicurio.registry.serde.SerdeConfig; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; @@ -420,6 +425,7 @@ Map requiredConsumerConfig() { configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000); + configs.put(SerdeConfig.ENABLE_HEADERS, "true"); return configs; } @@ -520,26 +526,16 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + public BiFunction, Deserializer, Consumer> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { var configs = maybeAuthenticate(context, Consumer.class); - Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer - return () -> client; - } - - public void consumerDisposer(@Disposes Supplier> consumer) { - consumer.get().close(); + return (keyDeser, valueDeser) -> new KafkaConsumer<>(configs, keyDeser, valueDeser); // NOSONAR / closed in consumerDisposer } @Produces @RequestScoped - public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + public BiFunction, Serializer, Producer> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { var configs = maybeAuthenticate(context, Producer.class); - Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer - return () -> client; - } - - public void producerDisposer(@Disposes Supplier> producer) { - producer.get().close(); + return (keySer, valueSer) -> new KafkaProducer<>(configs, keySer, valueSer); // NOSONAR / closed by service code } Map maybeAuthenticate(KafkaContext context, Class clientType) { @@ -571,6 +567,12 @@ Map buildConfig(Set configNames, .map(Optional::get) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new)); + // Ensure no given properties are skipped. The previous stream processing allows + // for the standard config names to be obtained from the given maps, but also from + // config overrides via MicroProfile Config. + clientProperties.get().forEach(cfg::putIfAbsent); + config.getProperties().forEach(cfg::putIfAbsent); + var listenerSpec = cluster.map(Kafka::getSpec) .map(KafkaSpec::getKafka) .map(KafkaClusterSpec::getListeners) diff --git a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java index ecd388ea5..0eff125ac 100644 --- a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java @@ -67,12 +67,12 @@ public class RecordsResource { summary = "Consume records from a topic", description = "Consume a limited number of records from a topic, optionally specifying a partition and an absolute offset or timestamp as the starting point for message retrieval.") @APIResponseSchema( - value = KafkaRecord.ListResponse.class, + value = KafkaRecord.KafkaRecordDataList.class, responseDescription = "List of records matching the request query parameters.") @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") - public Response consumeRecords( + public CompletionStage consumeRecords( @Parameter(description = "Cluster identifier") @PathParam("clusterId") String clusterId, @@ -121,10 +121,20 @@ public Response consumeRecords( List fields) { requestedFields.accept(fields); - var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength()); - CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store"); - return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build(); + + return recordService.consumeRecords( + topicId, + params.getPartition(), + params.getOffset(), + params.getTimestamp(), + params.getLimit(), + fields, + params.getMaxValueLength()) + .thenApply(KafkaRecord.KafkaRecordDataList::new) + .thenApply(Response::ok) + .thenApply(response -> response.cacheControl(noStore)) + .thenApply(Response.ResponseBuilder::build); } @POST @@ -135,7 +145,7 @@ public Response consumeRecords( description = "Produce (write) a single record to a topic") @APIResponseSchema( responseCode = "201", - value = KafkaRecord.KafkaRecordDocument.class, + value = KafkaRecord.KafkaRecordData.class, responseDescription = "Record was successfully sent to the topic") @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @@ -151,20 +161,19 @@ public CompletionStage produceRecord( String topicId, @Valid - KafkaRecord.KafkaRecordDocument message) { + KafkaRecord.KafkaRecordData message) { final UriBuilder location = uriInfo.getRequestUriBuilder(); requestedFields.accept(KafkaRecord.Fields.ALL); - return recordService.produceRecord(topicId, message.getData().getAttributes()) - .thenApply(KafkaRecord.KafkaRecordDocument::new) + return recordService.produceRecord(topicId, message.getData()) + .thenApply(KafkaRecord.KafkaRecordData::new) .thenApply(entity -> Response.status(Status.CREATED) .entity(entity) .location(location - .queryParam("filter[partition]", entity.getData().getAttributes().getPartition()) - .queryParam("filter[offset]", entity.getData().getAttributes().getOffset()) + .queryParam("filter[partition]", entity.getData().partition()) + .queryParam("filter[offset]", entity.getData().offset()) .build())) .thenApply(Response.ResponseBuilder::build); - } } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java index 1b27fca62..d3375fc7b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java @@ -19,9 +19,17 @@ import io.xlate.validation.constraints.Expression; -@Schema(name = "KafkaRecordAttributes") -@JsonFilter("fieldFilter") -public class KafkaRecord { +@Schema(name = "KafkaRecord") +@Expression( + when = "self.type != null", + value = "self.type == '" + KafkaRecord.API_TYPE + "'", + message = "resource type conflicts with operation", + node = "type", + payload = ErrorCategory.ResourceConflict.class) +public class KafkaRecord extends Resource { + + public static final String API_TYPE = "records"; + public static final String FIELDS_PARAM = "fields[" + API_TYPE + "]"; public static final class Fields { public static final String PARTITION = "partition"; @@ -58,173 +66,163 @@ private Fields() { } } - @Schema(name = "KafkaRecordListResponse") - public static final class ListResponse extends DataList { - public ListResponse(List data) { - super(data.stream().map(RecordResource::new).toList()); - } - } - - @Schema(name = "KafkaRecordDocument") - public static final class KafkaRecordDocument extends DataSingleton { - @JsonCreator - public KafkaRecordDocument(@JsonProperty("data") RecordResource data) { + @Schema(name = "KafkaRecordDataList") + public static final class KafkaRecordDataList extends DataList { + public KafkaRecordDataList(List data) { super(data); } - - public KafkaRecordDocument(KafkaRecord data) { - super(new RecordResource(data)); - } } - @Schema(name = "KafkaRecord") - @Expression( - when = "self.type != null", - value = "self.type == 'records'", - message = "resource type conflicts with operation", - node = "type", - payload = ErrorCategory.ResourceConflict.class - ) - public static final class RecordResource extends Resource { + @Schema(name = "KafkaRecordData") + public static final class KafkaRecordData extends DataSingleton { @JsonCreator - public RecordResource(String type, KafkaRecord attributes) { - super(null, type, attributes); - } - - public RecordResource(KafkaRecord attributes) { - super(null, "records", attributes); + public KafkaRecordData(@JsonProperty("data") KafkaRecord data) { + super(data); } } - @JsonIgnore - String topic; + @JsonFilter("fieldFilter") + @Schema(name = "KafkaRecordAttributes") + static class Attributes { + @JsonIgnore + String topic; - @Schema(description = "The record's partition within the topic") - Integer partition; + @JsonProperty + @Schema(description = "The record's partition within the topic") + Integer partition; - @Schema(readOnly = true, description = "The record's offset within the topic partition") - Long offset; + @JsonProperty + @Schema(readOnly = true, description = "The record's offset within the topic partition") + Long offset; - @Schema(description = "Timestamp associated with the record. The type is indicated by `timestampType`. When producing a record, this value will be used as the record's `CREATE_TIME`.", format = "date-time") - Instant timestamp; + @JsonProperty + @Schema(description = "Timestamp associated with the record. The type is indicated by `timestampType`. When producing a record, this value will be used as the record's `CREATE_TIME`.", format = "date-time") + Instant timestamp; - @Schema(readOnly = true, description = "Type of timestamp associated with the record") - String timestampType; + @JsonProperty + @Schema(readOnly = true, description = "Type of timestamp associated with the record") + String timestampType; - @Schema(description = "Record headers, key/value pairs") - Map headers; + @JsonProperty + @Schema(description = "Record headers, key/value pairs") + Map headers; - @Schema(description = "Record key") - String key; + @JsonProperty + @Schema(description = "Record key") + String key; - @NotNull - @Schema(description = "Record value") - String value; + @JsonProperty + @NotNull + @Schema(description = "Record value") + String value; - @Schema(readOnly = true, description = "Size of the uncompressed record, not including the overhead of the record in the log segment.") - Long size; + @JsonProperty + @Schema(readOnly = true, description = "Size of the uncompressed record, not including the overhead of the record in the log segment.") + Long size; + } + @JsonCreator public KafkaRecord() { - super(); + super(null, API_TYPE, new Attributes()); } public KafkaRecord(String topic) { - this.topic = topic; + this(); + attributes.topic = topic; } public KafkaRecord(String topic, Integer partition, Instant timestamp, Map headers, String key, String value, Long size) { this(topic); - this.partition = partition; - this.timestamp = timestamp; - this.headers = headers; - this.key = key; - this.value = value; - this.size = size; + attributes.partition = partition; + attributes.timestamp = timestamp; + attributes.headers = headers; + attributes.key = key; + attributes.value = value; + attributes.size = size; } @JsonIgnore public URI buildUri(UriBuilder builder, String topicName) { - builder.queryParam(Fields.PARTITION, partition); - builder.queryParam(Fields.OFFSET, offset); + builder.queryParam(Fields.PARTITION, attributes.partition); + builder.queryParam(Fields.OFFSET, attributes.offset); return builder.build(topicName); } @AssertTrue(message = "invalid timestamp") @JsonIgnore public boolean isTimestampValid() { - if (timestamp == null) { + if (attributes.timestamp == null) { return true; } try { - return timestamp.isAfter(Instant.ofEpochMilli(-1)); + return attributes.timestamp.isAfter(Instant.ofEpochMilli(-1)); } catch (Exception e) { return false; } } - public Integer getPartition() { - return partition; + public Integer partition() { + return attributes.partition; } - public void setPartition(Integer partition) { - this.partition = partition; + public void partition(Integer partition) { + attributes.partition = partition; } - public Long getOffset() { - return offset; + public Long offset() { + return attributes.offset; } - public void setOffset(Long offset) { - this.offset = offset; + public void offset(Long offset) { + attributes.offset = offset; } - public Instant getTimestamp() { - return timestamp; + public Instant timestamp() { + return attributes.timestamp; } - public void setTimestamp(Instant timestamp) { - this.timestamp = timestamp; + public void timestamp(Instant timestamp) { + attributes.timestamp = timestamp; } - public String getTimestampType() { - return timestampType; + public String timestampType() { + return attributes.timestampType; } - public void setTimestampType(String timestampType) { - this.timestampType = timestampType; + public void timestampType(String timestampType) { + attributes.timestampType = timestampType; } - public Map getHeaders() { - return headers; + public Map headers() { + return attributes.headers; } - public void setHeaders(Map headers) { - this.headers = headers; + public void headers(Map headers) { + attributes.headers = headers; } - public String getKey() { - return key; + public String key() { + return attributes.key; } - public void setKey(String key) { - this.key = key; + public void key(String key) { + attributes.key = key; } - public String getValue() { - return value; + public String value() { + return attributes.value; } - public void setValue(String value) { - this.value = value; + public void value(String value) { + attributes.value = value; } - public Long getSize() { - return size; + public Long size() { + return attributes.size; } - public void setSize(Long size) { - this.size = size; + public void size(Long size) { + attributes.size = size; } - } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java b/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java index 75d0ad24e..7fd62f51e 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java @@ -19,12 +19,18 @@ @Expression( when = "self.rawTimestamp != null", value = "self.rawOffset == null", - node = "filter[offset]", + node = RecordFilterParams.FILTER_OFFSET, message = "Parameter `filter[offset]` must not be used when `filter[timestamp]` is present.", payload = ErrorCategory.InvalidQueryParameter.class) public class RecordFilterParams { - @QueryParam("filter[partition]") + static final String FILTER_PARTITION = "filter[partition]"; + static final String FILTER_OFFSET = "filter[offset]"; + static final String FILTER_TIMESTAMP = "filter[timestamp]"; + static final String PAGE_SIZE = "page[size]"; + static final String MAX_VALUE_LENGTH = "maxValueLength"; + + @QueryParam(FILTER_PARTITION) @Parameter( description = """ Retrieve messages only from the partition identified by this parameter. @@ -39,23 +45,23 @@ public class RecordFilterParams { value = "self.operator == 'eq'", message = "unsupported filter operator, supported values: [ 'eq' ]", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[partition]") + node = FILTER_PARTITION) @Expression( when = "self != null", value = "self.operands.size() == 1", message = "exactly 1 operand is required", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[partition]") + node = FILTER_PARTITION) @Expression( when = "self != null && self.operator == 'eq' && self.operands.size() == 1", value = "val = Integer.parseInt(self.firstOperand); val >= 0 && val <= Integer.MAX_VALUE", exceptionalValue = ExceptionalValue.FALSE, message = "operand must be an integer between 0 and " + Integer.MAX_VALUE + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[partition]") + node = FILTER_PARTITION) FetchFilter partition; - @QueryParam("filter[offset]") + @QueryParam(FILTER_OFFSET) @Parameter( description = """ Retrieve messages with an offset greater than or equal to the filter @@ -80,23 +86,23 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "unsupported filter operator, supported values: [ 'gte' ]", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[offset]") + node = FILTER_OFFSET) @Expression( when = "self != null", value = "self.operands.size() == 1", message = "exactly 1 operand is required", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[offset]") + node = FILTER_OFFSET) @Expression( when = "self != null && self.operator == 'gte'", value = "val = Long.parseLong(self.firstOperand); val >= 0 && val <= Long.MAX_VALUE", exceptionalValue = ExceptionalValue.FALSE, message = "operand must be an integer between 0 and " + Long.MAX_VALUE + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[offset]") + node = FILTER_OFFSET) FetchFilter offset; - @QueryParam("filter[timestamp]") + @QueryParam(FILTER_TIMESTAMP) @Parameter( description = """ Retrieve messages with a timestamp greater than or equal to the filter @@ -120,13 +126,13 @@ public class RecordFilterParams { value = "self.operator == 'gte'", message = "unsupported filter operator, supported values: [ 'gte' ]", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[timestamp]") + node = FILTER_TIMESTAMP) @Expression( when = "self != null", value = "self.operands.size() == 1", message = "exactly 1 operand is required", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[timestamp]") + node = FILTER_TIMESTAMP) @Expression( when = "self != null && self.operator == 'gte'", classImports = "java.time.Instant", @@ -134,10 +140,10 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "operand must be a valid RFC 3339 date-time no earlier than `1970-01-01T00:00:00Z`", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[timestamp]") + node = FILTER_TIMESTAMP) FetchFilter timestamp; - @QueryParam("page[size]") + @QueryParam(PAGE_SIZE) @DefaultValue(ListFetchParams.PAGE_SIZE_DEFAULT + "") @Parameter( description = "Limit the number of records fetched and returned", @@ -152,10 +158,10 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "must be an integer between 1 and " + ListFetchParams.PAGE_SIZE_MAX + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "page[size]") + node = PAGE_SIZE) String pageSize; - @QueryParam("maxValueLength") + @QueryParam(MAX_VALUE_LENGTH) @Parameter( description = """ Maximum length of string values returned in the response. @@ -169,7 +175,7 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "must be an integer between 1 and " + Integer.MAX_VALUE + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "maxValueLength") + node = MAX_VALUE_LENGTH) String maxValueLength; public String getRawOffset() { diff --git a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java index 0b8c40a3d..380ca4455 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java @@ -1,12 +1,8 @@ package com.github.streamshub.console.api.service; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -23,11 +19,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -38,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -45,21 +44,28 @@ import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.context.ThreadContext; import org.jboss.logging.Logger; +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.streamshub.console.api.model.KafkaRecord; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.SizeLimitedSortedSet; +import com.github.streamshub.console.api.support.serdes.MultiformatDeserializer; +import com.github.streamshub.console.api.support.serdes.MultiformatSerializer; +import com.github.streamshub.console.api.support.serdes.RecordData; + +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.client.RegistryClientFactory; import static java.util.Objects.requireNonNullElse; @ApplicationScoped public class RecordService { - public static final String BINARY_DATA_MESSAGE = "Binary or non-UTF-8 encoded data cannot be displayed"; - static final int REPLACEMENT_CHARACTER = '\uFFFD'; - @Inject Logger logger; @@ -67,15 +73,55 @@ public class RecordService { KafkaContext kafkaContext; @Inject - Supplier> consumerSupplier; + BiFunction< + Deserializer, + Deserializer, + Consumer + > consumerFactory; @Inject - Supplier> producerSupplier; + BiFunction< + Serializer, + Serializer, + Producer + > producerFactory; @Inject ThreadContext threadContext; - public List consumeRecords(String topicId, + @Inject + ObjectMapper objectMapper; + + RegistryClient registryClient; + + MultiformatDeserializer keyDeserializer; + MultiformatDeserializer valueDeserializer; + + MultiformatSerializer keySerializer; + MultiformatSerializer valueSerializer; + + @PostConstruct + public void initialize() { + String registryUrl = ConfigProvider.getConfig().getOptionalValue("console.registry.endpoint", String.class) + // TODO: remove default + .orElse("http://localhost:9080"); + + registryClient = RegistryClientFactory.create(registryUrl); + + keyDeserializer = new MultiformatDeserializer(registryClient); + keyDeserializer.configure(kafkaContext.configs(Consumer.class), true); + + valueDeserializer = new MultiformatDeserializer(registryClient); + valueDeserializer.configure(kafkaContext.configs(Consumer.class), false); + + keySerializer = new MultiformatSerializer(registryClient, objectMapper); + keySerializer.configure(kafkaContext.configs(Producer.class), true); + + valueSerializer = new MultiformatSerializer(registryClient, objectMapper); + valueSerializer.configure(kafkaContext.configs(Producer.class), false); + } + + public CompletionStage> consumeRecords(String topicId, Integer partition, Long offset, Instant timestamp, @@ -83,72 +129,65 @@ public List consumeRecords(String topicId, List include, Integer maxValueLength) { - List partitions = topicNameForId(topicId) - .thenApplyAsync( - topicName -> consumerSupplier.get().partitionsFor(topicName), - threadContext.currentContextExecutor()) - .toCompletableFuture() - .join(); + return topicNameForId(topicId).thenApplyAsync(topicName -> { + Consumer consumer = consumerFactory.apply(keyDeserializer, valueDeserializer); + List partitions = consumer.partitionsFor(topicName); + List assignments = partitions.stream() + .filter(p -> partition == null || partition.equals(p.partition())) + .map(p -> new TopicPartition(p.topic(), p.partition())) + .toList(); - List assignments = partitions.stream() - .filter(p -> partition == null || partition.equals(p.partition())) - .map(p -> new TopicPartition(p.topic(), p.partition())) - .toList(); - - if (assignments.isEmpty()) { - return Collections.emptyList(); - } + if (assignments.isEmpty()) { + return Collections.emptyList(); + } - Consumer consumer = consumerSupplier.get(); - consumer.assign(assignments); - var endOffsets = consumer.endOffsets(assignments); + consumer.assign(assignments); + var endOffsets = consumer.endOffsets(assignments); - if (timestamp != null) { - seekToTimestamp(consumer, assignments, timestamp); - } else { - seekToOffset(consumer, assignments, endOffsets, offset, limit); - } + if (timestamp != null) { + seekToTimestamp(consumer, assignments, timestamp); + } else { + seekToOffset(consumer, assignments, endOffsets, offset, limit); + } - Iterable> poll = () -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit); - var limitSet = new SizeLimitedSortedSet>(buildComparator(timestamp, offset), limit); + Iterable> poll = () -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit); + var limitSet = new SizeLimitedSortedSet>(buildComparator(timestamp, offset), limit); - return StreamSupport.stream(poll.spliterator(), false) - .flatMap(records -> StreamSupport.stream(records.spliterator(), false)) - .collect(Collectors.toCollection(() -> limitSet)) - .stream() - .map(rec -> getItems(rec, topicId, include, maxValueLength)) - .toList(); + return StreamSupport.stream(poll.spliterator(), false) + .flatMap(records -> StreamSupport.stream(records.spliterator(), false)) + .collect(Collectors.toCollection(() -> limitSet)) + .stream() + .map(rec -> getItems(rec, topicId, include, maxValueLength)) + .toList(); + }, threadContext.currentContextExecutor()); } public CompletionStage produceRecord(String topicId, KafkaRecord input) { CompletableFuture promise = new CompletableFuture<>(); Executor asyncExec = threadContext.currentContextExecutor(); - topicNameForId(topicId) - .thenApplyAsync( - topicName -> producerSupplier.get().partitionsFor(topicName), - asyncExec) - .thenAcceptAsync( - partitions -> { - Producer producer = producerSupplier.get(); - String topicName = partitions.iterator().next().topic(); - Integer partition = input.getPartition(); - - if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) { - promise.completeExceptionally(invalidPartition(topicId, partition)); - } else { - send(topicName, input, producer, promise); - } - }, - asyncExec); + topicNameForId(topicId).thenAcceptAsync(topicName -> { + Producer producer = producerFactory.apply(keySerializer, valueSerializer); + List partitions = producer.partitionsFor(topicName); + Integer partition = input.partition(); + + if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) { + promise.completeExceptionally(invalidPartition(topicId, partition)); + } else { + send(topicName, (String) input.getMeta("format-value"), input, producer, promise); + } + + promise.whenComplete((kafkaRecord, error) -> producer.close()); + }, asyncExec).exceptionally(e -> { + promise.completeExceptionally(e); + return null; + }); return promise; } - void send(String topicName, KafkaRecord input, Producer producer, CompletableFuture promise) { - String key = input.getKey(); - - List
headers = Optional.ofNullable(input.getHeaders()) + void send(String topicName, String format, KafkaRecord input, Producer producer, CompletableFuture promise) { + List
headers = Optional.ofNullable(input.headers()) .orElseGet(Collections::emptyMap) .entrySet() .stream() @@ -164,15 +203,31 @@ public byte[] value() { } }) .map(Header.class::cast) - .toList(); + .collect(Collectors.toCollection(ArrayList::new)); - Long timestamp = Optional.ofNullable(input.getTimestamp()).map(Instant::toEpochMilli).orElse(null); + if (format != null) { + headers.add(new Header() { + @Override + public String key() { + return "com.github.streamshub.console.message-format-value"; + } + + @Override + public byte[] value() { + return format != null ? format.getBytes() : null; + } + }); + } - ProducerRecord request = new ProducerRecord<>(topicName, - input.getPartition(), + Long timestamp = Optional.ofNullable(input.timestamp()).map(Instant::toEpochMilli).orElse(null); + var key = new RecordData(input.key()); + var value = new RecordData(input.value()); + + ProducerRecord request = new ProducerRecord<>(topicName, + input.partition(), timestamp, key, - input.getValue(), + value, headers); producer.send(request, (meta, exception) -> { @@ -180,16 +235,17 @@ public byte[] value() { promise.completeExceptionally(exception); } else { KafkaRecord result = new KafkaRecord(); - result.setPartition(meta.partition()); + result.partition(meta.partition()); if (meta.hasOffset()) { - result.setOffset(meta.offset()); + result.offset(meta.offset()); } if (meta.hasTimestamp()) { - result.setTimestamp(Instant.ofEpochMilli(meta.timestamp())); + result.timestamp(Instant.ofEpochMilli(meta.timestamp())); } - result.setKey(input.getKey()); - result.setValue(input.getValue()); - result.setHeaders(input.getHeaders()); + result.key(key.dataString(null)); + result.value(value.dataString(null)); + result.headers(input.headers()); + result.size(sizeOf(meta, request.headers())); promise.complete(result); } }); @@ -210,7 +266,7 @@ CompletionStage topicNameForId(String topicId) { .orElseThrow(() -> noSuchTopic(topicId))); } - void seekToTimestamp(Consumer consumer, List assignments, Instant timestamp) { + void seekToTimestamp(Consumer consumer, List assignments, Instant timestamp) { Long tsMillis = timestamp.toEpochMilli(); Map timestampsToSearch = assignments.stream() .collect(Collectors.toMap(Function.identity(), p -> tsMillis)); @@ -237,7 +293,7 @@ void seekToTimestamp(Consumer consumer, List ass }); } - void seekToOffset(Consumer consumer, List assignments, Map endOffsets, Long offset, int limit) { + void seekToOffset(Consumer consumer, List assignments, Map endOffsets, Long offset, int limit) { var beginningOffsets = consumer.beginningOffsets(assignments); assignments.forEach(p -> { @@ -263,9 +319,9 @@ void seekToOffset(Consumer consumer, List assign }); } - Comparator> buildComparator(Instant timestamp, Long offset) { - Comparator> comparator = Comparator - .>comparingLong(ConsumerRecord::timestamp) + Comparator> buildComparator(Instant timestamp, Long offset) { + Comparator> comparator = Comparator + .>comparingLong(ConsumerRecord::timestamp) .thenComparingInt(ConsumerRecord::partition) .thenComparingLong(ConsumerRecord::offset); @@ -277,66 +333,49 @@ Comparator> buildComparator(Instant timestamp, Lo return comparator; } - KafkaRecord getItems(ConsumerRecord rec, String topicId, List include, Integer maxValueLength) { + KafkaRecord getItems(ConsumerRecord rec, String topicId, List include, Integer maxValueLength) { KafkaRecord item = new KafkaRecord(topicId); - setProperty(KafkaRecord.Fields.PARTITION, include, rec::partition, item::setPartition); - setProperty(KafkaRecord.Fields.OFFSET, include, rec::offset, item::setOffset); - setProperty(KafkaRecord.Fields.TIMESTAMP, include, () -> Instant.ofEpochMilli(rec.timestamp()), item::setTimestamp); - setProperty(KafkaRecord.Fields.TIMESTAMP_TYPE, include, rec.timestampType()::name, item::setTimestampType); - setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.setKey(bytesToString(k, maxValueLength))); - setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength))); - setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders); - setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::setSize); + Optional.ofNullable(rec.key()).map(RecordData::type) + .ifPresent(fmt -> item.addMeta("key-format", fmt)); + Optional.ofNullable(rec.value()).map(RecordData::type) + .ifPresent(fmt -> item.addMeta("value-format", fmt)); + + setProperty(KafkaRecord.Fields.PARTITION, include, rec::partition, item::partition); + setProperty(KafkaRecord.Fields.OFFSET, include, rec::offset, item::offset); + setProperty(KafkaRecord.Fields.TIMESTAMP, include, () -> Instant.ofEpochMilli(rec.timestamp()), item::timestamp); + setProperty(KafkaRecord.Fields.TIMESTAMP_TYPE, include, rec.timestampType()::name, item::timestampType); + setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.key(k.dataString(maxValueLength))); + setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.value(v.dataString(maxValueLength))); + setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::headers); + setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::size); return item; } void setProperty(String fieldName, List include, Supplier source, java.util.function.Consumer target) { if (include.contains(fieldName)) { - target.accept(source.get()); - } - } - - String bytesToString(byte[] bytes, Integer maxValueLength) { - if (bytes == null) { - return null; - } - - if (bytes.length == 0) { - return ""; - } - - int bufferSize = maxValueLength != null ? Math.min(maxValueLength, bytes.length) : bytes.length; - StringBuilder buffer = new StringBuilder(bufferSize); - - try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) { - int input; - - while ((input = reader.read()) > -1) { - if (input == REPLACEMENT_CHARACTER || !Character.isDefined(input)) { - return BINARY_DATA_MESSAGE; - } - - buffer.append((char) input); - - if (maxValueLength != null && buffer.length() == maxValueLength) { - break; - } + T value = source.get(); + if (value != null) { + target.accept(value); } - - return buffer.toString(); - } catch (IOException e) { - return BINARY_DATA_MESSAGE; } } Map headersToMap(Headers headers, Integer maxValueLength) { Map headerMap = new LinkedHashMap<>(); - headers.iterator().forEachRemaining(h -> headerMap.put(h.key(), bytesToString(h.value(), maxValueLength))); + headers.iterator().forEachRemaining(h -> headerMap.put(h.key(), RecordData.bytesToString(h.value(), maxValueLength))); return headerMap; } + long sizeOf(RecordMetadata meta, Headers headers) { + return meta.serializedKeySize() + + meta.serializedValueSize() + + Arrays.stream(headers.toArray()) + .mapToLong(h -> h.key().length() + h.value().length) + .sum(); + } + long sizeOf(ConsumerRecord rec) { return rec.serializedKeySize() + rec.serializedValueSize() + diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java new file mode 100644 index 000000000..0e256ec89 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java @@ -0,0 +1,73 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; + +import io.apicurio.registry.serde.avro.DefaultAvroDatumProvider; +import io.apicurio.registry.types.ArtifactType; + +public class AvroDatumProvider extends DefaultAvroDatumProvider { + @Override + public DatumWriter createDatumWriter(RecordData data, Schema schema) { + GenericDatumWriter writer = new GenericDatumWriter<>(schema); + + return new DatumWriter() { + @Override + public void write(RecordData data, org.apache.avro.io.Encoder out) throws IOException { + final DatumReader reader = new GenericDatumReader<>(schema); + final InputStream dataStream = new ByteArrayInputStream(data.data); + final Decoder jsonDecoder = DecoderFactory.get().jsonDecoder(schema, dataStream); + final Object datum = reader.read(null, jsonDecoder); + writer.write(datum, out); + + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, buffer); + writer.write(datum, jsonEncoder); + jsonEncoder.flush(); + data.data = buffer.toByteArray(); + } + + @Override + public void setSchema(Schema schema) { + // No-op + } + }; + } + + @Override + public DatumReader createDatumReader(Schema schema) { + GenericDatumReader target = new GenericDatumReader<>(schema); + + return new DatumReader() { + @Override + public RecordData read(RecordData reuse, Decoder in) throws IOException { + final Object datum = target.read(reuse, in); + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + final DatumWriter writer = new GenericDatumWriter<>(schema); + final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, buffer); + writer.write(datum, jsonEncoder); + jsonEncoder.flush(); + + return new RecordData(ArtifactType.AVRO, buffer.toByteArray(), null); + } + + @Override + public void setSchema(Schema schema) { + // No-op + } + }; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java new file mode 100644 index 000000000..906c997b8 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java @@ -0,0 +1,31 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.nio.ByteBuffer; + +import org.apache.avro.Schema; +import org.apache.kafka.common.header.Headers; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.avro.AvroKafkaDeserializer; + +class AvroDeserializer extends AvroKafkaDeserializer { + AvroDeserializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + @Override + public RecordData readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + return super.readData(schema, buffer, start, length); + } + + @Override + public RecordData readData(Headers headers, + ParsedSchema schema, + ByteBuffer buffer, + int start, + int length) { + return super.readData(headers, schema, buffer, start, length); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroSerializer.java new file mode 100644 index 000000000..264ad8290 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroSerializer.java @@ -0,0 +1,29 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.avro.Schema; +import org.apache.kafka.common.header.Headers; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.avro.AvroKafkaSerializer; + +class AvroSerializer extends AvroKafkaSerializer { + AvroSerializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + @Override + public void serializeData(ParsedSchema schema, RecordData data, OutputStream out) throws IOException { + super.serializeData(schema, data, out); + } + + @Override + public void serializeData(Headers headers, ParsedSchema schema, RecordData data, OutputStream out) + throws IOException { + super.serializeData(headers, schema, data, out); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java new file mode 100644 index 000000000..8f7272eb2 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java @@ -0,0 +1,205 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.kafka.common.header.Headers; + +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.DefaultSchemaResolver; +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaLookupResult; +import io.apicurio.registry.resolver.SchemaParser; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.resolver.strategy.ArtifactReference; +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.serde.AbstractKafkaDeserializer; +import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig; +import io.apicurio.registry.serde.config.BaseKafkaSerDeConfig; +import io.apicurio.registry.types.ArtifactType; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +public class MultiformatDeserializer extends AbstractKafkaDeserializer { + + AvroDeserializer avroDeserializer; + ProtobufDeserializer protobufDeserializer; + SchemaParser parser; + + public MultiformatDeserializer(RegistryClient client) { + super(); + setSchemaResolver(newResolver(client)); + this.avroDeserializer = new AvroDeserializer(newResolver(client)); + this.protobufDeserializer = new ProtobufDeserializer(newResolver(client)); + } + + static SchemaResolver newResolver(RegistryClient client) { + var resolver = new DefaultSchemaResolver(); + resolver.setClient(client); + return resolver; + } + + @Override + public void configure(Map configs, boolean isKey) { + Map avroConfigs = new HashMap<>(configs); + avroConfigs.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, AvroDatumProvider.class.getName()); + avroDeserializer.configure(avroConfigs, isKey); + + protobufDeserializer.configure(configs, isKey); + + parser = new MultiformatSchemaParser<>(Set.of( + avroDeserializer.schemaParser(), + protobufDeserializer.schemaParser() + )); + + super.configure(new BaseKafkaSerDeConfig(configs), isKey); + } + + @Override + public void close() { + // don't close - deserializer will be reused + } + + @Override + public SchemaParser schemaParser() { + return parser; + } + + @Override + protected RecordData readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + Object parsedSchema = schema != null ? schema.getParsedSchema() : null; + RecordData result; + + if (parsedSchema instanceof Schema) { + try { + result = avroDeserializer.readData(cast(schema), buffer, start, length); + result.schema = schema; + } catch (Exception e) { + result = new RecordData(null, null, schema); + result.meta.put("error", e.getMessage()); + } + } else if (parsedSchema instanceof ProtobufSchema) { + try { + Message msg = protobufDeserializer.readData(cast(schema), buffer, start, length); + byte[] data = com.google.protobuf.util.JsonFormat.printer().print(msg).getBytes(); + result = new RecordData(ArtifactType.PROTOBUF, data, schema); + result.schema = schema; + } catch (Exception e) { + result = new RecordData(null, null, schema); + result.meta.put("error", e.getMessage()); + } + } else { + byte[] bytes = new byte[length]; + System.arraycopy(buffer.array(), start, bytes, 0, length); + result = new RecordData(null, bytes, null); + } + + return result; + } + + @Override + protected RecordData readData(Headers headers, ParsedSchema schema, ByteBuffer buffer, int start, int length) { + Object parsedSchema = schema != null ? schema.getParsedSchema() : null; + RecordData result; + + if (parsedSchema instanceof Schema) { + try { + result = avroDeserializer.readData(headers, cast(schema), buffer, start, length); + } catch (Exception e) { + result = new RecordData(null, null, schema); + result.meta.put("error", e.getMessage()); + } + } else if (parsedSchema instanceof ProtobufSchema) { + try { + Message msg = protobufDeserializer.readData(headers, cast(schema), buffer, start, length); + byte[] data = com.google.protobuf.util.JsonFormat.printer().print(msg).getBytes(); + result = new RecordData(ArtifactType.PROTOBUF, data, schema); + } catch (Exception e) { + result = new RecordData(null, null, schema); + result.meta.put("error", e.getMessage()); + } + } else { + byte[] bytes = new byte[length]; + System.arraycopy(buffer.array(), start, bytes, 0, length); + result = new RecordData(null, bytes, null); + } + + return result; + } + + @SuppressWarnings("unchecked") + static T cast(Object object) { + return (T) object; + } + + @Override + public RecordData deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + ByteBuffer buffer; + SchemaLookupResult schema; + int length; + + if (data[0] == MAGIC_BYTE) { + buffer = getByteBuffer(data); + ArtifactReference artifactReference = getIdHandler().readId(buffer); + schema = resolve(artifactReference); + length = buffer.limit() - 1 - getIdHandler().idSize(); + } else { + buffer = ByteBuffer.wrap(data); + // Empty schema + schema = SchemaLookupResult.builder().build(); + length = buffer.limit() - 1; + } + + int start = buffer.position() + buffer.arrayOffset(); + + return readData(schema.getParsedSchema(), buffer, start, length); + } + + @Override + public RecordData deserialize(String topic, Headers headers, byte[] data) { + if (data == null) { + return null; + } + ArtifactReference artifactReference = null; + if (headersHandler != null && headers != null) { + artifactReference = headersHandler.readHeaders(headers); + + if (artifactReference.hasValue()) { + return readData(headers, data, artifactReference); + } + } + + if (headers == null) { + return deserialize(topic, data); + } else { + //try to read data even if artifactReference has no value, maybe there is a fallbackArtifactProvider configured + return readData(headers, data, artifactReference); + } + } + + private RecordData readData(Headers headers, byte[] data, ArtifactReference artifactReference) { + SchemaLookupResult schema = resolve(artifactReference); + + ByteBuffer buffer = ByteBuffer.wrap(data); + int length = buffer.limit(); + int start = buffer.position(); + + return readData(headers, schema.getParsedSchema(), buffer, start, length); + } + + private SchemaLookupResult resolve(ArtifactReference artifactReference) { + try { + return getSchemaResolver().resolveSchemaByArtifactReference(artifactReference); + } catch (RuntimeException e) { + // Empty result + return SchemaLookupResult.builder().build(); + } + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java new file mode 100644 index 000000000..b2f07cf27 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java @@ -0,0 +1,118 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaParser; +import io.apicurio.registry.resolver.data.Record; +import io.apicurio.registry.types.ArtifactType; + +public class MultiformatSchemaParser implements SchemaParser { + + private static final int CACHE_LIMIT = 20; + private static final char[] PROTOBUF_INTRO = "message ".toCharArray(); + + private final Map> delegates; + private final Map schemaCache = new LinkedHashMap<>(CACHE_LIMIT); + + public MultiformatSchemaParser(Set> delegates) { + this.delegates = delegates.stream() + .map(p -> Map.entry(p.artifactType(), p)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public String artifactType() { + throw new UnsupportedOperationException("MultiformatSchemaParser#artifactType()"); + } + + @Override + public Object parseSchema(byte[] rawSchema, Map> resolvedReferences) { + if (schemaCache.containsKey(rawSchema)) { + return schemaCache.get(rawSchema); + } + + Object parsedSchema = loadSchema(rawSchema, resolvedReferences); + + if (schemaCache.size() == CACHE_LIMIT) { + // Remove the oldest entry + var iter = schemaCache.entrySet().iterator(); + iter.next(); + iter.remove(); + } + + schemaCache.put(rawSchema, parsedSchema); + + return parsedSchema; + } + + @Override + public boolean supportsExtractSchemaFromData() { + return false; + } + + @Override + public ParsedSchema getSchemaFromData(Record data) { + throw new UnsupportedOperationException("MultiformatSchemaParser#getSchemaFromData(Record)"); + } + + @Override + public ParsedSchema getSchemaFromData(Record data, boolean dereference) { + throw new UnsupportedOperationException("MultiformatSchemaParser#getSchemaFromData(Record,boolean)"); + } + + @SuppressWarnings("unchecked") + Object loadSchema(byte[] rawSchema, Map> resolvedReferences) { + Object parsedSchema = null; + SchemaParser delegate = null; + + if (looksLikeProtobuf(rawSchema)) { + if (delegates.containsKey(ArtifactType.PROTOBUF)) { + delegate = (SchemaParser) delegates.get(ArtifactType.PROTOBUF); + } + } else { + if (delegates.containsKey(ArtifactType.AVRO)) { + delegate = (SchemaParser) delegates.get(ArtifactType.AVRO); + } + } + + if (delegate != null) { + try { + parsedSchema = delegate.parseSchema(rawSchema, resolvedReferences); + } catch (Exception e) { + // Schema is not valid, will be cached as null + } + } + + return parsedSchema; + } + + boolean looksLikeProtobuf(byte[] rawSchema) { + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(rawSchema))) { + int input; + + while ((input = reader.read()) != -1) { + if (Character.isWhitespace(input)) { + continue; + } + char[] buffer = new char[8]; + buffer[0] = (char) input; + + if (reader.read(buffer, 1, 7) == 7 && Arrays.equals(PROTOBUF_INTRO, buffer)) { + return true; + } + } + } catch (IOException e) { + // Ignore + } + return false; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java new file mode 100644 index 000000000..6bd567ba3 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java @@ -0,0 +1,150 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import jakarta.ws.rs.BadRequestException; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Struct; + +import io.apicurio.registry.resolver.DefaultSchemaResolver; +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaParser; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.resolver.SchemaResolverConfig; +import io.apicurio.registry.resolver.data.Record; +import io.apicurio.registry.resolver.strategy.ArtifactReference; +import io.apicurio.registry.resolver.strategy.ArtifactReferenceResolverStrategy; +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.serde.AbstractKafkaSerializer; +import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig; +import io.apicurio.registry.serde.config.BaseKafkaSerDeConfig; +import io.apicurio.registry.serde.data.KafkaSerdeMetadata; +import io.apicurio.registry.serde.data.KafkaSerdeRecord; +import io.apicurio.registry.types.ArtifactType; + +public class MultiformatSerializer extends AbstractKafkaSerializer implements ArtifactReferenceResolverStrategy { + + final ObjectMapper objectMapper; + AvroSerializer avroSerializer; + ProtobufSerializer protobufSerializer; + SchemaParser parser; + + public MultiformatSerializer(RegistryClient client, ObjectMapper objectMapper) { + super(); + setSchemaResolver(newResolver(client)); + this.objectMapper = objectMapper; + this.avroSerializer = new AvroSerializer(newResolver(client)); + this.protobufSerializer = new ProtobufSerializer(newResolver(client)); + } + + static SchemaResolver newResolver(RegistryClient client) { + var resolver = new DefaultSchemaResolver(); + resolver.setClient(client); + return resolver; + } + + @Override + public void configure(Map configs, boolean isKey) { + Map avroConfigs = new HashMap<>(configs); + avroConfigs.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, AvroDatumProvider.class.getName()); + avroConfigs.put(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY, this); + avroConfigs.put(SchemaResolverConfig.FIND_LATEST_ARTIFACT, true); + avroSerializer.configure(avroConfigs, isKey); + + Map protobufConfigs = new HashMap<>(configs); + protobufConfigs.put(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY, this); + protobufConfigs.put(SchemaResolverConfig.FIND_LATEST_ARTIFACT, true); + protobufSerializer.configure(configs, isKey); + + parser = new MultiformatSchemaParser<>(Set.of( + avroSerializer.schemaParser(), + protobufSerializer.schemaParser() + )); + + super.configure(new BaseKafkaSerDeConfig(configs), isKey); + } + + @Override + public void close() { + // don't close - serializer will be reused + } + + @Override + public SchemaParser schemaParser() { + return parser; + } + + @Override + public byte[] serialize(String topic, Headers headers, RecordData data) { + // just return null + if (data == null) { + return null; // NOSONAR - we want to return null and not an empty array + } + + String formatHeaderName = "com.github.streamshub.console.message-format" + (isKey() ? "-key" : "-value"); + Header formatHeader = headers.lastHeader(formatHeaderName); + String format = Optional.ofNullable(formatHeader) + .map(Header::value) + .map(String::new) + .orElse(""); + headers.remove(formatHeaderName); + + if (ArtifactType.AVRO.equals(format)) { + try { + return avroSerializer.serialize(topic, headers, data); + } catch (Exception e) { + throw new BadRequestException(e.getMessage()); + } + } else if (ArtifactType.PROTOBUF.equals(format)) { + var structBuilder = Struct.newBuilder(); + try { + com.google.protobuf.util.JsonFormat.parser() + .ignoringUnknownFields() + .merge(data.dataString(null), structBuilder); + } catch (InvalidProtocolBufferException e) { + throw new BadRequestException(e.getMessage()); + } + return protobufSerializer.serialize(topic, headers, structBuilder.build()); + } else { + return data.data; + } + } + + /** + * @see io.apicurio.registry.resolver.strategy.ArtifactReferenceResolverStrategy#artifactReference(io.apicurio.registry.resolver.data.Record, io.apicurio.registry.resolver.ParsedSchema) + */ + @Override + public ArtifactReference artifactReference(Record data, ParsedSchema parsedSchema) { + KafkaSerdeRecord kdata = (KafkaSerdeRecord) data; + KafkaSerdeMetadata metadata = kdata.metadata(); + + return ArtifactReference.builder() + .groupId("default") + .artifactId(String.format("%s-%s", metadata.getTopic(), metadata.isKey() ? "key" : "value")) + .build(); + } + + @Override + public boolean loadSchema() { + return false; + } + + @Override + protected void serializeData(ParsedSchema schema, RecordData data, OutputStream out) { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeData(Headers headers, ParsedSchema schema, RecordData data, OutputStream out) { + throw new UnsupportedOperationException(); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java new file mode 100644 index 000000000..1fe3f5d55 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java @@ -0,0 +1,33 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.nio.ByteBuffer; + +import org.apache.kafka.common.header.Headers; + +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +class ProtobufDeserializer extends ProtobufKafkaDeserializer { + ProtobufDeserializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + @Override + public Message readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + return super.readData(schema, buffer, start, length); + } + + @Override + public Message readData(Headers headers, + ParsedSchema schema, + ByteBuffer buffer, + int start, + int length) { + return super.readData(headers, schema, buffer, start, length); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java new file mode 100644 index 000000000..8b583f3e4 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java @@ -0,0 +1,32 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.kafka.common.header.Headers; + +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +class ProtobufSerializer extends ProtobufKafkaSerializer { + ProtobufSerializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + @Override + public void serializeData(ParsedSchema schema, Message data, OutputStream out) + throws IOException { + super.serializeData(schema, data, out); + } + + @Override + public void serializeData(Headers headers, ParsedSchema schema, Message data, OutputStream out) + throws IOException { + super.serializeData(headers, schema, data, out); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java new file mode 100644 index 000000000..a55cb4fbc --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java @@ -0,0 +1,86 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; + +import io.apicurio.registry.resolver.ParsedSchema; + +public class RecordData { + + public static final String BINARY_DATA_MESSAGE = "Binary or non-UTF-8 encoded data cannot be displayed"; + static final int REPLACEMENT_CHARACTER = '\uFFFD'; + + public final Map meta = new LinkedHashMap<>(1); + public final String type; + byte[] data; + ParsedSchema schema; + + public RecordData(String type, byte[] data, ParsedSchema schema) { + super(); + this.type = type; + this.data = data; + this.schema = schema; + } + + public RecordData(byte[] data) { + super(); + this.type = null; + this.data = data; + this.schema = null; + } + + public RecordData(String data) { + this(data != null ? data.getBytes(StandardCharsets.UTF_8) : null); + } + + public String type() { + return type; + } + + public ParsedSchema schema() { + return schema; + } + + public String dataString(Integer maxValueLength) { + return bytesToString(data, maxValueLength); + } + + public static String bytesToString(byte[] bytes, Integer maxValueLength) { + if (bytes == null) { + return null; + } + + if (bytes.length == 0) { + return ""; + } + + int bufferSize = maxValueLength != null ? Math.min(maxValueLength, bytes.length) : bytes.length; + StringBuilder buffer = new StringBuilder(bufferSize); + + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) { + int input; + + while ((input = reader.read()) > -1) { + if (input == REPLACEMENT_CHARACTER || !Character.isDefined(input)) { + return BINARY_DATA_MESSAGE; + } + + buffer.append((char) input); + + if (maxValueLength != null && buffer.length() == maxValueLength) { + break; + } + } + + return buffer.toString(); + } catch (IOException e) { + return BINARY_DATA_MESSAGE; + } + } + +} diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index 0378be55a..a96088f0d 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -25,6 +25,7 @@ quarkus.kafka.devservices.enabled=false quarkus.kubernetes-client.devservices.enabled=false quarkus.devservices.enabled=false +quarkus.vertx.event-loops-pool-size=5 quarkus.vertx.max-event-loop-execute-time=4000 mp.openapi.scan.disable=false diff --git a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java index a8285ac6a..ae4e8f134 100644 --- a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java @@ -34,7 +34,7 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; -import com.github.streamshub.console.api.service.RecordService; +import com.github.streamshub.console.api.support.serdes.RecordData; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; @@ -376,7 +376,7 @@ void testConsumeRecordWithBinaryValue() throws NoSuchAlgorithmException { .assertThat() .statusCode(is(Status.OK.getStatusCode())) .body("data", hasSize(1)) - .body("data[0].attributes.value", is(equalTo(RecordService.BINARY_DATA_MESSAGE))); + .body("data[0].attributes.value", is(equalTo(RecordData.BINARY_DATA_MESSAGE))); } @ParameterizedTest