From 264cb1389c94f7c646b717d22703cad1edd2fe6c Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Fri, 6 Sep 2024 07:21:22 -0400 Subject: [PATCH] Apicurio Registry integration with Avro+Protobuf ser/des support Signed-off-by: Michael Edgar --- api/pom.xml | 28 +- .../streamshub/console/api/ClientFactory.java | 30 +- .../console/api/RecordsResource.java | 41 ++- .../console/api/SchemasResource.java | 109 ++++++ .../console/api/model/JsonApiDocument.java | 2 +- .../api/model/JsonApiRelationship.java | 57 +++ .../console/api/model/KafkaRecord.java | 213 ++++++----- .../console/api/model/RecordFilterParams.java | 40 ++- .../console/api/service/RecordService.java | 331 +++++++++++------- .../api/support/serdes/AvroDatumProvider.java | 73 ++++ .../api/support/serdes/AvroDeserializer.java | 31 ++ .../serdes/MultiformatDeserializer.java | 241 +++++++++++++ .../serdes/MultiformatSchemaParser.java | 114 ++++++ .../support/serdes/MultiformatSerializer.java | 222 ++++++++++++ .../support/serdes/ProtobufDeserializer.java | 33 ++ .../support/serdes/ProtobufSerializer.java | 44 +++ .../api/support/serdes/RecordData.java | 86 +++++ api/src/main/resources/application.properties | 1 + .../console/api/RecordsResourceIT.java | 10 +- 19 files changed, 1426 insertions(+), 280 deletions(-) create mode 100644 api/src/main/java/com/github/streamshub/console/api/SchemasResource.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java diff --git a/api/pom.xml b/api/pom.xml index 2298d1bb5..138db7487 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -58,15 +58,15 @@ io.quarkus - quarkus-resteasy-reactive + quarkus-rest io.quarkus - quarkus-resteasy-reactive-jackson + quarkus-rest-jackson io.quarkus - quarkus-rest-client-reactive + quarkus-rest-client io.quarkus @@ -84,6 +84,10 @@ io.quarkus quarkus-kubernetes-client + + io.quarkus + quarkus-apicurio-registry-avro + io.smallrye.common smallrye-common-annotation @@ -109,6 +113,24 @@ kafka-oauth-client + + io.apicurio + apicurio-registry-serdes-avro-serde + + + io.apicurio + apicurio-registry-serdes-jsonschema-serde + + + io.apicurio + apicurio-registry-serdes-protobuf-serde + 2.5.8.Final + + + com.google.protobuf + protobuf-java-util + + com.fasterxml.jackson.core jackson-annotations diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index cccbd5e54..91c78153b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -16,6 +16,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -53,6 +54,8 @@ import org.apache.kafka.common.security.plain.PlainLoginModule; import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -65,9 +68,11 @@ import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.TrustAllCertificateManager; import com.github.streamshub.console.api.support.ValidationProxy; +import com.github.streamshub.console.api.support.serdes.RecordData; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; +import io.apicurio.registry.serde.SerdeConfig; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; @@ -420,6 +425,7 @@ Map requiredConsumerConfig() { configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000); + configs.put(SerdeConfig.ENABLE_HEADERS, "true"); return configs; } @@ -520,26 +526,16 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + public BiFunction, Deserializer, Consumer> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { var configs = maybeAuthenticate(context, Consumer.class); - Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer - return () -> client; - } - - public void consumerDisposer(@Disposes Supplier> consumer) { - consumer.get().close(); + return (keyDeser, valueDeser) -> new KafkaConsumer<>(configs, keyDeser, valueDeser); // NOSONAR / closed in consumerDisposer } @Produces @RequestScoped - public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + public BiFunction, Serializer, Producer> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { var configs = maybeAuthenticate(context, Producer.class); - Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer - return () -> client; - } - - public void producerDisposer(@Disposes Supplier> producer) { - producer.get().close(); + return (keySer, valueSer) -> new KafkaProducer<>(configs, keySer, valueSer); // NOSONAR / closed by service code } Map maybeAuthenticate(KafkaContext context, Class clientType) { @@ -571,6 +567,12 @@ Map buildConfig(Set configNames, .map(Optional::get) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new)); + // Ensure no given properties are skipped. The previous stream processing allows + // for the standard config names to be obtained from the given maps, but also from + // config overrides via MicroProfile Config. + clientProperties.get().forEach(cfg::putIfAbsent); + config.getProperties().forEach(cfg::putIfAbsent); + var listenerSpec = cluster.map(Kafka::getSpec) .map(KafkaSpec::getKafka) .map(KafkaClusterSpec::getListeners) diff --git a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java index ecd388ea5..a82df5829 100644 --- a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java @@ -67,12 +67,12 @@ public class RecordsResource { summary = "Consume records from a topic", description = "Consume a limited number of records from a topic, optionally specifying a partition and an absolute offset or timestamp as the starting point for message retrieval.") @APIResponseSchema( - value = KafkaRecord.ListResponse.class, + value = KafkaRecord.KafkaRecordDataList.class, responseDescription = "List of records matching the request query parameters.") @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") - public Response consumeRecords( + public CompletionStage consumeRecords( @Parameter(description = "Cluster identifier") @PathParam("clusterId") String clusterId, @@ -98,7 +98,9 @@ public Response consumeRecords( KafkaRecord.Fields.HEADERS, KafkaRecord.Fields.KEY, KafkaRecord.Fields.VALUE, - KafkaRecord.Fields.SIZE + KafkaRecord.Fields.SIZE, + KafkaRecord.Fields.KEY_SCHEMA, + KafkaRecord.Fields.VALUE_SCHEMA, }, payload = ErrorCategory.InvalidQueryParameter.class) @Parameter( @@ -116,15 +118,27 @@ public Response consumeRecords( KafkaRecord.Fields.HEADERS, KafkaRecord.Fields.KEY, KafkaRecord.Fields.VALUE, - KafkaRecord.Fields.SIZE + KafkaRecord.Fields.SIZE, + KafkaRecord.Fields.KEY_SCHEMA, + KafkaRecord.Fields.VALUE_SCHEMA, })) List fields) { requestedFields.accept(fields); - var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength()); - CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store"); - return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build(); + + return recordService.consumeRecords( + topicId, + params.getPartition(), + params.getOffset(), + params.getTimestamp(), + params.getLimit(), + fields, + params.getMaxValueLength()) + .thenApply(KafkaRecord.KafkaRecordDataList::new) + .thenApply(Response::ok) + .thenApply(response -> response.cacheControl(noStore)) + .thenApply(Response.ResponseBuilder::build); } @POST @@ -135,7 +149,7 @@ public Response consumeRecords( description = "Produce (write) a single record to a topic") @APIResponseSchema( responseCode = "201", - value = KafkaRecord.KafkaRecordDocument.class, + value = KafkaRecord.KafkaRecordData.class, responseDescription = "Record was successfully sent to the topic") @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @@ -151,20 +165,19 @@ public CompletionStage produceRecord( String topicId, @Valid - KafkaRecord.KafkaRecordDocument message) { + KafkaRecord.KafkaRecordData message) { final UriBuilder location = uriInfo.getRequestUriBuilder(); requestedFields.accept(KafkaRecord.Fields.ALL); - return recordService.produceRecord(topicId, message.getData().getAttributes()) - .thenApply(KafkaRecord.KafkaRecordDocument::new) + return recordService.produceRecord(topicId, message.getData()) + .thenApply(KafkaRecord.KafkaRecordData::new) .thenApply(entity -> Response.status(Status.CREATED) .entity(entity) .location(location - .queryParam("filter[partition]", entity.getData().getAttributes().getPartition()) - .queryParam("filter[offset]", entity.getData().getAttributes().getOffset()) + .queryParam("filter[partition]", entity.getData().partition()) + .queryParam("filter[offset]", entity.getData().offset()) .build())) .thenApply(Response.ResponseBuilder::build); - } } diff --git a/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java new file mode 100644 index 000000000..4c980eb0f --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java @@ -0,0 +1,109 @@ +package com.github.streamshub.console.api; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Base64; +import java.util.Collections; +import java.util.Optional; + +import jakarta.annotation.PostConstruct; +import jakarta.inject.Inject; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.Response.Status; + +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.openapi.annotations.media.Content; +import org.eclipse.microprofile.openapi.annotations.parameters.Parameter; +import org.eclipse.microprofile.openapi.annotations.responses.APIResponse; +import org.eclipse.microprofile.openapi.annotations.tags.Tag; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.streamshub.console.api.support.serdes.MultiformatSchemaParser; + +import io.apicurio.registry.resolver.DefaultSchemaResolver; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.client.RegistryClientFactory; +import io.apicurio.registry.serde.strategy.ArtifactReference; + +@Path("/api/schemas/{schemaId}") +@Tag(name = "Schema Registry Resources") +public class SchemasResource { + + @Inject + ObjectMapper objectMapper; + + SchemaResolver schemaResolver; + + @PostConstruct + void initialize() { + String registryUrl = ConfigProvider.getConfig().getOptionalValue("console.registry.endpoint", String.class) + // TODO: remove default + .orElse("http://localhost:9080"); + + RegistryClient registryClient = RegistryClientFactory.create(registryUrl); + schemaResolver = new DefaultSchemaResolver<>(); + schemaResolver.setClient(registryClient); + schemaResolver.configure(Collections.emptyMap(), new MultiformatSchemaParser<>(Collections.emptySet())); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @APIResponse(responseCode = "200", ref = "Configurations", content = @Content()) + @APIResponse(responseCode = "404", ref = "NotFound") + @APIResponse(responseCode = "500", ref = "ServerError") + @APIResponse(responseCode = "504", ref = "ServerTimeout") + public Response describeConfigs( + @Parameter(description = "Schema identifier") + @PathParam("schemaId") + String schemaId) { + + + + InputStream in = new ByteArrayInputStream(Base64.getUrlDecoder().decode(schemaId)); + JsonNode id; + + try { + id = objectMapper.readTree(in); + } catch (IOException e) { + throw new BadRequestException("Schema id could not be parsed"); + } + + var builder = ArtifactReference.builder(); + + if (id.has("globalId")) { + builder.globalId(id.get("globalId").asLong()); + } + if (id.has("contentId")) { + builder.contentId(id.get("contentId").asLong()); + } + if (id.has("groupId")) { + builder.groupId(id.get("groupId").asText()); + } + if (id.has("artifactId")) { + builder.artifactId(id.get("artifactId").asText()); + } + if (id.has("version")) { + builder.version(id.get("version").asText()); + } + + var schema = schemaResolver.resolveSchemaByArtifactReference(builder.build()); + + var response = Optional.ofNullable(schema) + .map(s -> s.getParsedSchema()) + .map(s -> s.getRawSchema()) + .map(Response::ok) + .orElseGet(() -> Response.status(Status.NOT_FOUND)); + + return response.build(); + } + +} diff --git a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiDocument.java b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiDocument.java index 0b2d9eb70..c942c5267 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiDocument.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiDocument.java @@ -4,8 +4,8 @@ import java.util.Map; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; /** * Base class for all JSON API request and response bodies. diff --git a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java new file mode 100644 index 000000000..a8ee3ae67 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java @@ -0,0 +1,57 @@ +package com.github.streamshub.console.api.model; + +import java.util.LinkedHashMap; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonInclude.Include; + +@JsonInclude(value = Include.NON_NULL) +public class JsonApiRelationship { + + private JsonApiMeta meta; + private Identifier data; + private Map links; + + static Map addEntry(Map map, K key, V value) { + if (map == null) { + map = new LinkedHashMap<>(); + } + map.put(key, value); + return map; + } + + @JsonProperty + public JsonApiMeta meta() { + return meta; + } + + public Object meta(String key) { + return meta != null ? meta.get(key) : null; + } + + public JsonApiRelationship addMeta(String key, Object value) { + meta = JsonApiMeta.put(meta, key, value); + return this; + } + + @JsonProperty + public Identifier data() { + return data; + } + + public void data(Identifier data) { + this.data = data; + } + + @JsonProperty + public Map links() { + return links; + } + + public JsonApiRelationship addLink(String key, String value) { + links = addEntry(links, key, value); + return this; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java index 1b27fca62..77e479080 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java @@ -19,9 +19,17 @@ import io.xlate.validation.constraints.Expression; -@Schema(name = "KafkaRecordAttributes") -@JsonFilter("fieldFilter") -public class KafkaRecord { +@Schema(name = "KafkaRecord") +@Expression( + when = "self.type != null", + value = "self.type == '" + KafkaRecord.API_TYPE + "'", + message = "resource type conflicts with operation", + node = "type", + payload = ErrorCategory.ResourceConflict.class) +public class KafkaRecord extends RelatableResource { + + public static final String API_TYPE = "records"; + public static final String FIELDS_PARAM = "fields[" + API_TYPE + "]"; public static final class Fields { public static final String PARTITION = "partition"; @@ -32,6 +40,8 @@ public static final class Fields { public static final String KEY = "key"; public static final String VALUE = "value"; public static final String SIZE = "size"; + public static final String KEY_SCHEMA = "keySchema"; + public static final String VALUE_SCHEMA = "valueSchema"; public static final String DEFAULT = PARTITION + @@ -41,7 +51,9 @@ public static final class Fields { ", " + HEADERS + ", " + KEY + ", " + VALUE + - ", " + SIZE; + ", " + SIZE + + ", " + KEY_SCHEMA + + ", " + VALUE_SCHEMA; public static final List ALL = List.of( PARTITION, @@ -51,180 +63,189 @@ public static final class Fields { HEADERS, KEY, VALUE, - SIZE); + SIZE, + KEY_SCHEMA, + VALUE_SCHEMA); private Fields() { // Prevent instances } } - @Schema(name = "KafkaRecordListResponse") - public static final class ListResponse extends DataList { - public ListResponse(List data) { - super(data.stream().map(RecordResource::new).toList()); + @Schema(name = "KafkaRecordDataList") + public static final class KafkaRecordDataList extends DataList { + public KafkaRecordDataList(List data) { + super(data); } } - @Schema(name = "KafkaRecordDocument") - public static final class KafkaRecordDocument extends DataSingleton { + @Schema(name = "KafkaRecordData") + public static final class KafkaRecordData extends DataSingleton { @JsonCreator - public KafkaRecordDocument(@JsonProperty("data") RecordResource data) { + public KafkaRecordData(@JsonProperty("data") KafkaRecord data) { super(data); } - - public KafkaRecordDocument(KafkaRecord data) { - super(new RecordResource(data)); - } } - @Schema(name = "KafkaRecord") - @Expression( - when = "self.type != null", - value = "self.type == 'records'", - message = "resource type conflicts with operation", - node = "type", - payload = ErrorCategory.ResourceConflict.class - ) - public static final class RecordResource extends Resource { - @JsonCreator - public RecordResource(String type, KafkaRecord attributes) { - super(null, type, attributes); - } + @JsonFilter("fieldFilter") + @Schema(name = "KafkaRecordAttributes") + static class Attributes { + @JsonIgnore + String topic; - public RecordResource(KafkaRecord attributes) { - super(null, "records", attributes); - } - } + @JsonProperty + @Schema(description = "The record's partition within the topic") + Integer partition; - @JsonIgnore - String topic; + @JsonProperty + @Schema(readOnly = true, description = "The record's offset within the topic partition") + Long offset; - @Schema(description = "The record's partition within the topic") - Integer partition; + @JsonProperty + @Schema(description = "Timestamp associated with the record. The type is indicated by `timestampType`. When producing a record, this value will be used as the record's `CREATE_TIME`.", format = "date-time") + Instant timestamp; - @Schema(readOnly = true, description = "The record's offset within the topic partition") - Long offset; + @JsonProperty + @Schema(readOnly = true, description = "Type of timestamp associated with the record") + String timestampType; - @Schema(description = "Timestamp associated with the record. The type is indicated by `timestampType`. When producing a record, this value will be used as the record's `CREATE_TIME`.", format = "date-time") - Instant timestamp; + @JsonProperty + @Schema(description = "Record headers, key/value pairs") + Map headers; - @Schema(readOnly = true, description = "Type of timestamp associated with the record") - String timestampType; + @JsonProperty + @Schema(description = "Record key") + String key; - @Schema(description = "Record headers, key/value pairs") - Map headers; + @JsonProperty + @NotNull + @Schema(description = "Record value") + String value; - @Schema(description = "Record key") - String key; + @JsonProperty + @Schema(readOnly = true, description = "Size of the uncompressed record, not including the overhead of the record in the log segment.") + Long size; + } - @NotNull - @Schema(description = "Record value") - String value; + @JsonFilter("fieldFilter") + static class Relationships { + @JsonProperty + JsonApiRelationship keySchema; - @Schema(readOnly = true, description = "Size of the uncompressed record, not including the overhead of the record in the log segment.") - Long size; + @JsonProperty + JsonApiRelationship valueSchema; + } + @JsonCreator public KafkaRecord() { - super(); + super(null, API_TYPE, new Attributes(), new Relationships()); } public KafkaRecord(String topic) { - this.topic = topic; + this(); + attributes.topic = topic; } public KafkaRecord(String topic, Integer partition, Instant timestamp, Map headers, String key, String value, Long size) { this(topic); - this.partition = partition; - this.timestamp = timestamp; - this.headers = headers; - this.key = key; - this.value = value; - this.size = size; + attributes.partition = partition; + attributes.timestamp = timestamp; + attributes.headers = headers; + attributes.key = key; + attributes.value = value; + attributes.size = size; } @JsonIgnore public URI buildUri(UriBuilder builder, String topicName) { - builder.queryParam(Fields.PARTITION, partition); - builder.queryParam(Fields.OFFSET, offset); + builder.queryParam(Fields.PARTITION, attributes.partition); + builder.queryParam(Fields.OFFSET, attributes.offset); return builder.build(topicName); } @AssertTrue(message = "invalid timestamp") @JsonIgnore public boolean isTimestampValid() { - if (timestamp == null) { + if (attributes.timestamp == null) { return true; } try { - return timestamp.isAfter(Instant.ofEpochMilli(-1)); + return attributes.timestamp.isAfter(Instant.ofEpochMilli(-1)); } catch (Exception e) { return false; } } - public Integer getPartition() { - return partition; + public Integer partition() { + return attributes.partition; } - public void setPartition(Integer partition) { - this.partition = partition; + public void partition(Integer partition) { + attributes.partition = partition; } - public Long getOffset() { - return offset; + public Long offset() { + return attributes.offset; } - public void setOffset(Long offset) { - this.offset = offset; + public void offset(Long offset) { + attributes.offset = offset; } - public Instant getTimestamp() { - return timestamp; + public Instant timestamp() { + return attributes.timestamp; } - public void setTimestamp(Instant timestamp) { - this.timestamp = timestamp; + public void timestamp(Instant timestamp) { + attributes.timestamp = timestamp; } - public String getTimestampType() { - return timestampType; + public String timestampType() { + return attributes.timestampType; } - public void setTimestampType(String timestampType) { - this.timestampType = timestampType; + public void timestampType(String timestampType) { + attributes.timestampType = timestampType; } - public Map getHeaders() { - return headers; + public Map headers() { + return attributes.headers; } - public void setHeaders(Map headers) { - this.headers = headers; + public void headers(Map headers) { + attributes.headers = headers; } - public String getKey() { - return key; + public String key() { + return attributes.key; } - public void setKey(String key) { - this.key = key; + public void key(String key) { + attributes.key = key; } - public String getValue() { - return value; + public String value() { + return attributes.value; } - public void setValue(String value) { - this.value = value; + public void value(String value) { + attributes.value = value; } - public Long getSize() { - return size; + public Long size() { + return attributes.size; } - public void setSize(Long size) { - this.size = size; + public void size(Long size) { + attributes.size = size; } + public void keySchema(JsonApiRelationship keySchema) { + relationships.keySchema = keySchema; + } + + public void valueSchema(JsonApiRelationship valueSchema) { + relationships.valueSchema = valueSchema; + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java b/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java index 75d0ad24e..7fd62f51e 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java @@ -19,12 +19,18 @@ @Expression( when = "self.rawTimestamp != null", value = "self.rawOffset == null", - node = "filter[offset]", + node = RecordFilterParams.FILTER_OFFSET, message = "Parameter `filter[offset]` must not be used when `filter[timestamp]` is present.", payload = ErrorCategory.InvalidQueryParameter.class) public class RecordFilterParams { - @QueryParam("filter[partition]") + static final String FILTER_PARTITION = "filter[partition]"; + static final String FILTER_OFFSET = "filter[offset]"; + static final String FILTER_TIMESTAMP = "filter[timestamp]"; + static final String PAGE_SIZE = "page[size]"; + static final String MAX_VALUE_LENGTH = "maxValueLength"; + + @QueryParam(FILTER_PARTITION) @Parameter( description = """ Retrieve messages only from the partition identified by this parameter. @@ -39,23 +45,23 @@ public class RecordFilterParams { value = "self.operator == 'eq'", message = "unsupported filter operator, supported values: [ 'eq' ]", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[partition]") + node = FILTER_PARTITION) @Expression( when = "self != null", value = "self.operands.size() == 1", message = "exactly 1 operand is required", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[partition]") + node = FILTER_PARTITION) @Expression( when = "self != null && self.operator == 'eq' && self.operands.size() == 1", value = "val = Integer.parseInt(self.firstOperand); val >= 0 && val <= Integer.MAX_VALUE", exceptionalValue = ExceptionalValue.FALSE, message = "operand must be an integer between 0 and " + Integer.MAX_VALUE + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[partition]") + node = FILTER_PARTITION) FetchFilter partition; - @QueryParam("filter[offset]") + @QueryParam(FILTER_OFFSET) @Parameter( description = """ Retrieve messages with an offset greater than or equal to the filter @@ -80,23 +86,23 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "unsupported filter operator, supported values: [ 'gte' ]", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[offset]") + node = FILTER_OFFSET) @Expression( when = "self != null", value = "self.operands.size() == 1", message = "exactly 1 operand is required", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[offset]") + node = FILTER_OFFSET) @Expression( when = "self != null && self.operator == 'gte'", value = "val = Long.parseLong(self.firstOperand); val >= 0 && val <= Long.MAX_VALUE", exceptionalValue = ExceptionalValue.FALSE, message = "operand must be an integer between 0 and " + Long.MAX_VALUE + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[offset]") + node = FILTER_OFFSET) FetchFilter offset; - @QueryParam("filter[timestamp]") + @QueryParam(FILTER_TIMESTAMP) @Parameter( description = """ Retrieve messages with a timestamp greater than or equal to the filter @@ -120,13 +126,13 @@ public class RecordFilterParams { value = "self.operator == 'gte'", message = "unsupported filter operator, supported values: [ 'gte' ]", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[timestamp]") + node = FILTER_TIMESTAMP) @Expression( when = "self != null", value = "self.operands.size() == 1", message = "exactly 1 operand is required", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[timestamp]") + node = FILTER_TIMESTAMP) @Expression( when = "self != null && self.operator == 'gte'", classImports = "java.time.Instant", @@ -134,10 +140,10 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "operand must be a valid RFC 3339 date-time no earlier than `1970-01-01T00:00:00Z`", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[timestamp]") + node = FILTER_TIMESTAMP) FetchFilter timestamp; - @QueryParam("page[size]") + @QueryParam(PAGE_SIZE) @DefaultValue(ListFetchParams.PAGE_SIZE_DEFAULT + "") @Parameter( description = "Limit the number of records fetched and returned", @@ -152,10 +158,10 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "must be an integer between 1 and " + ListFetchParams.PAGE_SIZE_MAX + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "page[size]") + node = PAGE_SIZE) String pageSize; - @QueryParam("maxValueLength") + @QueryParam(MAX_VALUE_LENGTH) @Parameter( description = """ Maximum length of string values returned in the response. @@ -169,7 +175,7 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "must be an integer between 1 and " + Integer.MAX_VALUE + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "maxValueLength") + node = MAX_VALUE_LENGTH) String maxValueLength; public String getRawOffset() { diff --git a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java index 0b8c40a3d..b1164a1fb 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java @@ -1,12 +1,8 @@ package com.github.streamshub.console.api.service; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -18,16 +14,20 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -38,6 +38,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -45,21 +46,30 @@ import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.context.ThreadContext; import org.jboss.logging.Logger; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.streamshub.console.api.model.Identifier; +import com.github.streamshub.console.api.model.JsonApiRelationship; import com.github.streamshub.console.api.model.KafkaRecord; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.SizeLimitedSortedSet; +import com.github.streamshub.console.api.support.serdes.MultiformatDeserializer; +import com.github.streamshub.console.api.support.serdes.MultiformatSerializer; +import com.github.streamshub.console.api.support.serdes.RecordData; + +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.client.RegistryClientFactory; import static java.util.Objects.requireNonNullElse; @ApplicationScoped public class RecordService { - public static final String BINARY_DATA_MESSAGE = "Binary or non-UTF-8 encoded data cannot be displayed"; - static final int REPLACEMENT_CHARACTER = '\uFFFD'; - @Inject Logger logger; @@ -67,15 +77,55 @@ public class RecordService { KafkaContext kafkaContext; @Inject - Supplier> consumerSupplier; + BiFunction< + Deserializer, + Deserializer, + Consumer + > consumerFactory; @Inject - Supplier> producerSupplier; + BiFunction< + Serializer, + Serializer, + Producer + > producerFactory; @Inject ThreadContext threadContext; - public List consumeRecords(String topicId, + @Inject + ObjectMapper objectMapper; + + RegistryClient registryClient; + + MultiformatDeserializer keyDeserializer; + MultiformatDeserializer valueDeserializer; + + MultiformatSerializer keySerializer; + MultiformatSerializer valueSerializer; + + @PostConstruct + public void initialize() { + String registryUrl = ConfigProvider.getConfig().getOptionalValue("console.registry.endpoint", String.class) + // TODO: remove default + .orElse("http://localhost:9080"); + + registryClient = RegistryClientFactory.create(registryUrl); + + keyDeserializer = new MultiformatDeserializer(registryClient); + keyDeserializer.configure(kafkaContext.configs(Consumer.class), true); + + valueDeserializer = new MultiformatDeserializer(registryClient); + valueDeserializer.configure(kafkaContext.configs(Consumer.class), false); + + keySerializer = new MultiformatSerializer(registryClient, objectMapper); + keySerializer.configure(kafkaContext.configs(Producer.class), true); + + valueSerializer = new MultiformatSerializer(registryClient, objectMapper); + valueSerializer.configure(kafkaContext.configs(Producer.class), false); + } + + public CompletionStage> consumeRecords(String topicId, Integer partition, Long offset, Instant timestamp, @@ -83,72 +133,65 @@ public List consumeRecords(String topicId, List include, Integer maxValueLength) { - List partitions = topicNameForId(topicId) - .thenApplyAsync( - topicName -> consumerSupplier.get().partitionsFor(topicName), - threadContext.currentContextExecutor()) - .toCompletableFuture() - .join(); - - List assignments = partitions.stream() - .filter(p -> partition == null || partition.equals(p.partition())) - .map(p -> new TopicPartition(p.topic(), p.partition())) - .toList(); + return topicNameForId(topicId).thenApplyAsync(topicName -> { + Consumer consumer = consumerFactory.apply(keyDeserializer, valueDeserializer); + List partitions = consumer.partitionsFor(topicName); + List assignments = partitions.stream() + .filter(p -> partition == null || partition.equals(p.partition())) + .map(p -> new TopicPartition(p.topic(), p.partition())) + .toList(); - if (assignments.isEmpty()) { - return Collections.emptyList(); - } + if (assignments.isEmpty()) { + return Collections.emptyList(); + } - Consumer consumer = consumerSupplier.get(); - consumer.assign(assignments); - var endOffsets = consumer.endOffsets(assignments); + consumer.assign(assignments); + var endOffsets = consumer.endOffsets(assignments); - if (timestamp != null) { - seekToTimestamp(consumer, assignments, timestamp); - } else { - seekToOffset(consumer, assignments, endOffsets, offset, limit); - } + if (timestamp != null) { + seekToTimestamp(consumer, assignments, timestamp); + } else { + seekToOffset(consumer, assignments, endOffsets, offset, limit); + } - Iterable> poll = () -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit); - var limitSet = new SizeLimitedSortedSet>(buildComparator(timestamp, offset), limit); + Iterable> poll = () -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit); + var limitSet = new SizeLimitedSortedSet>(buildComparator(timestamp, offset), limit); - return StreamSupport.stream(poll.spliterator(), false) - .flatMap(records -> StreamSupport.stream(records.spliterator(), false)) - .collect(Collectors.toCollection(() -> limitSet)) - .stream() - .map(rec -> getItems(rec, topicId, include, maxValueLength)) - .toList(); + return StreamSupport.stream(poll.spliterator(), false) + .flatMap(records -> StreamSupport.stream(records.spliterator(), false)) + .collect(Collectors.toCollection(() -> limitSet)) + .stream() + .map(rec -> getItems(rec, topicId, include, maxValueLength)) + .toList(); + }, threadContext.currentContextExecutor()); } public CompletionStage produceRecord(String topicId, KafkaRecord input) { CompletableFuture promise = new CompletableFuture<>(); Executor asyncExec = threadContext.currentContextExecutor(); - topicNameForId(topicId) - .thenApplyAsync( - topicName -> producerSupplier.get().partitionsFor(topicName), - asyncExec) - .thenAcceptAsync( - partitions -> { - Producer producer = producerSupplier.get(); - String topicName = partitions.iterator().next().topic(); - Integer partition = input.getPartition(); - - if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) { - promise.completeExceptionally(invalidPartition(topicId, partition)); - } else { - send(topicName, input, producer, promise); - } - }, - asyncExec); + topicNameForId(topicId).thenAcceptAsync(topicName -> { + Producer producer = producerFactory.apply(keySerializer, valueSerializer); + List partitions = producer.partitionsFor(topicName); + Integer partition = input.partition(); + + if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) { + promise.completeExceptionally(invalidPartition(topicId, partition)); + } else { + send(topicName, input, producer, promise); + } + + promise.whenComplete((kafkaRecord, error) -> producer.close()); + }, asyncExec).exceptionally(e -> { + promise.completeExceptionally(e); + return null; + }); return promise; } - void send(String topicName, KafkaRecord input, Producer producer, CompletableFuture promise) { - String key = input.getKey(); - - List
headers = Optional.ofNullable(input.getHeaders()) + void send(String topicName, KafkaRecord input, Producer producer, CompletableFuture promise) { + List
headers = Optional.ofNullable(input.headers()) .orElseGet(Collections::emptyMap) .entrySet() .stream() @@ -164,35 +207,59 @@ public byte[] value() { } }) .map(Header.class::cast) - .toList(); + .collect(Collectors.toCollection(ArrayList::new)); + + Long timestamp = Optional.ofNullable(input.timestamp()).map(Instant::toEpochMilli).orElse(null); + var key = new RecordData(input.key()); + key.meta.put("schema", input.getMeta("key-schema")); - Long timestamp = Optional.ofNullable(input.getTimestamp()).map(Instant::toEpochMilli).orElse(null); + var value = new RecordData(input.value()); + value.meta.put("schema", input.getMeta("value-schema")); - ProducerRecord request = new ProducerRecord<>(topicName, - input.getPartition(), + ProducerRecord request = new ProducerRecord<>(topicName, + input.partition(), timestamp, key, - input.getValue(), + value, headers); - producer.send(request, (meta, exception) -> { - if (exception != null) { - promise.completeExceptionally(exception); - } else { + CompletableFuture.completedFuture(null) + .thenApplyAsync(nothing -> { + try { + return producer.send(request).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CompletionException("Error occurred while sending record to Kafka cluster", e); + } catch (Exception e) { + throw new CompletionException("Error occurred while sending record to Kafka cluster", e); + } + }) + .thenApply(meta -> { KafkaRecord result = new KafkaRecord(); - result.setPartition(meta.partition()); + result.partition(meta.partition()); if (meta.hasOffset()) { - result.setOffset(meta.offset()); + result.offset(meta.offset()); } if (meta.hasTimestamp()) { - result.setTimestamp(Instant.ofEpochMilli(meta.timestamp())); + result.timestamp(Instant.ofEpochMilli(meta.timestamp())); } - result.setKey(input.getKey()); - result.setValue(input.getValue()); - result.setHeaders(input.getHeaders()); - promise.complete(result); - } - }); + result.key(key.dataString(null)); + result.value(value.dataString(null)); + result.headers(Arrays.stream(request.headers().toArray()) + .collect(Collectors.toMap( + Header::key, + e -> new String(e.value())))); + result.size(sizeOf(meta, request.headers())); + maybeRelate(key).ifPresent(result::keySchema); + maybeRelate(value).ifPresent(result::valueSchema); + + return result; + }) + .thenAccept(promise::complete) + .exceptionally(exception -> { + promise.completeExceptionally(exception); + return null; + }); } CompletionStage topicNameForId(String topicId) { @@ -210,7 +277,7 @@ CompletionStage topicNameForId(String topicId) { .orElseThrow(() -> noSuchTopic(topicId))); } - void seekToTimestamp(Consumer consumer, List assignments, Instant timestamp) { + void seekToTimestamp(Consumer consumer, List assignments, Instant timestamp) { Long tsMillis = timestamp.toEpochMilli(); Map timestampsToSearch = assignments.stream() .collect(Collectors.toMap(Function.identity(), p -> tsMillis)); @@ -237,7 +304,7 @@ void seekToTimestamp(Consumer consumer, List ass }); } - void seekToOffset(Consumer consumer, List assignments, Map endOffsets, Long offset, int limit) { + void seekToOffset(Consumer consumer, List assignments, Map endOffsets, Long offset, int limit) { var beginningOffsets = consumer.beginningOffsets(assignments); assignments.forEach(p -> { @@ -263,9 +330,9 @@ void seekToOffset(Consumer consumer, List assign }); } - Comparator> buildComparator(Instant timestamp, Long offset) { - Comparator> comparator = Comparator - .>comparingLong(ConsumerRecord::timestamp) + Comparator> buildComparator(Instant timestamp, Long offset) { + Comparator> comparator = Comparator + .>comparingLong(ConsumerRecord::timestamp) .thenComparingInt(ConsumerRecord::partition) .thenComparingLong(ConsumerRecord::offset); @@ -277,71 +344,71 @@ Comparator> buildComparator(Instant timestamp, Lo return comparator; } - KafkaRecord getItems(ConsumerRecord rec, String topicId, List include, Integer maxValueLength) { + KafkaRecord getItems(ConsumerRecord rec, String topicId, List include, Integer maxValueLength) { KafkaRecord item = new KafkaRecord(topicId); - setProperty(KafkaRecord.Fields.PARTITION, include, rec::partition, item::setPartition); - setProperty(KafkaRecord.Fields.OFFSET, include, rec::offset, item::setOffset); - setProperty(KafkaRecord.Fields.TIMESTAMP, include, () -> Instant.ofEpochMilli(rec.timestamp()), item::setTimestamp); - setProperty(KafkaRecord.Fields.TIMESTAMP_TYPE, include, rec.timestampType()::name, item::setTimestampType); - setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.setKey(bytesToString(k, maxValueLength))); - setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength))); - setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders); - setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::setSize); + setProperty(KafkaRecord.Fields.PARTITION, include, rec::partition, item::partition); + setProperty(KafkaRecord.Fields.OFFSET, include, rec::offset, item::offset); + setProperty(KafkaRecord.Fields.TIMESTAMP, include, () -> Instant.ofEpochMilli(rec.timestamp()), item::timestamp); + setProperty(KafkaRecord.Fields.TIMESTAMP_TYPE, include, rec.timestampType()::name, item::timestampType); + setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.key(k.dataString(maxValueLength))); + setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.value(v.dataString(maxValueLength))); + setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::headers); + setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::size); + + maybeRelate(rec.key()).ifPresent(item::keySchema); + maybeRelate(rec.value()).ifPresent(item::valueSchema); return item; } - void setProperty(String fieldName, List include, Supplier source, java.util.function.Consumer target) { - if (include.contains(fieldName)) { - target.accept(source.get()); - } + Optional maybeRelate(RecordData data) { + return Optional.ofNullable(data) + .map(d -> d.meta.get("schema")) + .filter(Map.class::isInstance) + .map(Map.class::cast) + .map(schemaMeta -> { + String artifactType = (String) schemaMeta.get("type"); + String schemaId = (String) schemaMeta.get("id"); + String name = (String) schemaMeta.get("name"); + + var relationship = new JsonApiRelationship(); + relationship.addMeta("artifactType", artifactType); + relationship.addMeta("name", name); + relationship.data(new Identifier("schemas", schemaId)); + relationship.addLink("content", "/api/schemas/" + schemaId); + + return relationship; + }) + .filter(Objects::nonNull); } - String bytesToString(byte[] bytes, Integer maxValueLength) { - if (bytes == null) { - return null; - } - - if (bytes.length == 0) { - return ""; - } - - int bufferSize = maxValueLength != null ? Math.min(maxValueLength, bytes.length) : bytes.length; - StringBuilder buffer = new StringBuilder(bufferSize); - - try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) { - int input; - - while ((input = reader.read()) > -1) { - if (input == REPLACEMENT_CHARACTER || !Character.isDefined(input)) { - return BINARY_DATA_MESSAGE; - } - - buffer.append((char) input); - - if (maxValueLength != null && buffer.length() == maxValueLength) { - break; - } + void setProperty(String fieldName, List include, Supplier source, java.util.function.Consumer target) { + if (include.contains(fieldName)) { + T value = source.get(); + if (value != null) { + target.accept(value); } - - return buffer.toString(); - } catch (IOException e) { - return BINARY_DATA_MESSAGE; } } Map headersToMap(Headers headers, Integer maxValueLength) { Map headerMap = new LinkedHashMap<>(); - headers.iterator().forEachRemaining(h -> headerMap.put(h.key(), bytesToString(h.value(), maxValueLength))); + headers.iterator().forEachRemaining(h -> headerMap.put(h.key(), RecordData.bytesToString(h.value(), maxValueLength))); return headerMap; } + long sizeOf(RecordMetadata meta, Headers headers) { + return sizeOf(meta.serializedKeySize(), meta.serializedValueSize(), headers); + } + long sizeOf(ConsumerRecord rec) { - return rec.serializedKeySize() + - rec.serializedValueSize() + - Arrays.stream(rec.headers().toArray()) - .mapToLong(h -> h.key().length() + h.value().length) + return sizeOf(rec.serializedKeySize(), rec.serializedValueSize(), rec.headers()); + } + + long sizeOf(int keySize, int valueSize, Headers headers) { + return keySize + valueSize + Arrays.stream(headers.toArray()) + .mapToLong(h -> h.key().length() + (h.value() != null ? h.value().length : 0)) .sum(); } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java new file mode 100644 index 000000000..0e256ec89 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java @@ -0,0 +1,73 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; + +import io.apicurio.registry.serde.avro.DefaultAvroDatumProvider; +import io.apicurio.registry.types.ArtifactType; + +public class AvroDatumProvider extends DefaultAvroDatumProvider { + @Override + public DatumWriter createDatumWriter(RecordData data, Schema schema) { + GenericDatumWriter writer = new GenericDatumWriter<>(schema); + + return new DatumWriter() { + @Override + public void write(RecordData data, org.apache.avro.io.Encoder out) throws IOException { + final DatumReader reader = new GenericDatumReader<>(schema); + final InputStream dataStream = new ByteArrayInputStream(data.data); + final Decoder jsonDecoder = DecoderFactory.get().jsonDecoder(schema, dataStream); + final Object datum = reader.read(null, jsonDecoder); + writer.write(datum, out); + + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, buffer); + writer.write(datum, jsonEncoder); + jsonEncoder.flush(); + data.data = buffer.toByteArray(); + } + + @Override + public void setSchema(Schema schema) { + // No-op + } + }; + } + + @Override + public DatumReader createDatumReader(Schema schema) { + GenericDatumReader target = new GenericDatumReader<>(schema); + + return new DatumReader() { + @Override + public RecordData read(RecordData reuse, Decoder in) throws IOException { + final Object datum = target.read(reuse, in); + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + final DatumWriter writer = new GenericDatumWriter<>(schema); + final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, buffer); + writer.write(datum, jsonEncoder); + jsonEncoder.flush(); + + return new RecordData(ArtifactType.AVRO, buffer.toByteArray(), null); + } + + @Override + public void setSchema(Schema schema) { + // No-op + } + }; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java new file mode 100644 index 000000000..906c997b8 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java @@ -0,0 +1,31 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.nio.ByteBuffer; + +import org.apache.avro.Schema; +import org.apache.kafka.common.header.Headers; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.avro.AvroKafkaDeserializer; + +class AvroDeserializer extends AvroKafkaDeserializer { + AvroDeserializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + @Override + public RecordData readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + return super.readData(schema, buffer, start, length); + } + + @Override + public RecordData readData(Headers headers, + ParsedSchema schema, + ByteBuffer buffer, + int start, + int length) { + return super.readData(headers, schema, buffer, start, length); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java new file mode 100644 index 000000000..4aa699865 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java @@ -0,0 +1,241 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.kafka.common.header.Headers; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.DefaultSchemaResolver; +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaLookupResult; +import io.apicurio.registry.resolver.SchemaParser; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.resolver.strategy.ArtifactReference; +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.serde.AbstractKafkaDeserializer; +import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig; +import io.apicurio.registry.serde.config.BaseKafkaSerDeConfig; +import io.apicurio.registry.types.ArtifactType; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +public class MultiformatDeserializer extends AbstractKafkaDeserializer { + + AvroDeserializer avroDeserializer; + ProtobufDeserializer protobufDeserializer; + SchemaParser parser; + + public MultiformatDeserializer(RegistryClient client) { + super(); + setSchemaResolver(newResolver(client)); + this.avroDeserializer = new AvroDeserializer(newResolver(client)); + this.protobufDeserializer = new ProtobufDeserializer(newResolver(client)); + } + + static SchemaResolver newResolver(RegistryClient client) { + var resolver = new DefaultSchemaResolver(); + resolver.setClient(client); + return resolver; + } + + @Override + public void configure(Map configs, boolean isKey) { + Map avroConfigs = new HashMap<>(configs); + avroConfigs.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, AvroDatumProvider.class.getName()); + avroDeserializer.configure(avroConfigs, isKey); + + protobufDeserializer.configure(configs, isKey); + + parser = new MultiformatSchemaParser<>(Set.of( + avroDeserializer.schemaParser(), + protobufDeserializer.schemaParser() + )); + + super.configure(new BaseKafkaSerDeConfig(configs), isKey); + } + + @Override + public void close() { + // don't close - deserializer will be reused + } + + @Override + public SchemaParser schemaParser() { + return parser; + } + + protected RecordData readData(Headers headers, SchemaLookupResult schemaResult, ByteBuffer buffer, int start, int length) { + ParsedSchema schema = Optional.ofNullable(schemaResult) + .map(s -> s.getParsedSchema()) + .orElse(null); + Object parsedSchema = Optional.ofNullable(schema) + .map(s -> s.getParsedSchema()) + .orElse(null); + RecordData result; + + if (parsedSchema instanceof Schema avroSchema) { + try { + if (headers != null) { + result = avroDeserializer.readData(headers, cast(schema), buffer, start, length); + } else { + result = avroDeserializer.readData(cast(schema), buffer, start, length); + } + result.schema = schema; + result.meta.put("schema", Map.of( + "type", ArtifactType.AVRO, + "id", toSchemaId(schemaResult), + "name", avroSchema.getFullName())); + } catch (Exception e) { + result = new RecordData(null, null, schema); + result.meta.put("error", e.getMessage()); + } + } else if (parsedSchema instanceof ProtobufSchema protobufSchema) { + try { + Message msg; + if (headers != null) { + msg = protobufDeserializer.readData(headers, cast(schema), buffer, start, length); + } else { + msg = protobufDeserializer.readData(cast(schema), buffer, start, length); + } + byte[] data = com.google.protobuf.util.JsonFormat.printer().print(msg).getBytes(); + result = new RecordData(ArtifactType.PROTOBUF, data, schema); + result.meta.put("schema", Map.of( + "type", ArtifactType.PROTOBUF, + "id", toSchemaId(schemaResult), + "name", protobufSchema.getFileDescriptor().getFullName())); + } catch (Exception e) { + result = new RecordData(null, null, schema); + result.meta.put("error", e.getMessage()); + } + } else { + byte[] bytes = new byte[length]; + System.arraycopy(buffer.array(), start, bytes, 0, length); + result = new RecordData(null, bytes, null); + } + + return result; + } + + @SuppressWarnings("unchecked") + static T cast(Object object) { + return (T) object; + } + + @Override + public RecordData deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + ByteBuffer buffer; + SchemaLookupResult schema; + int length; + + if (data[0] == MAGIC_BYTE) { + buffer = getByteBuffer(data); + ArtifactReference artifactReference = getIdHandler().readId(buffer); + schema = resolve(artifactReference); + length = buffer.limit() - 1 - getIdHandler().idSize(); + } else { + buffer = ByteBuffer.wrap(data); + // Empty schema + schema = SchemaLookupResult.builder().build(); + length = buffer.limit() - 1; + } + + int start = buffer.position() + buffer.arrayOffset(); + + return readData(null, schema, buffer, start, length); + } + + @Override + public RecordData deserialize(String topic, Headers headers, byte[] data) { + if (data == null) { + return null; + } + ArtifactReference artifactReference = null; + if (headersHandler != null && headers != null) { + artifactReference = headersHandler.readHeaders(headers); + + if (artifactReference.hasValue()) { + return readData(headers, data, artifactReference); + } + } + + if (headers == null) { + return deserialize(topic, data); + } else { + //try to read data even if artifactReference has no value, maybe there is a fallbackArtifactProvider configured + return readData(headers, data, artifactReference); + } + } + + private RecordData readData(Headers headers, byte[] data, ArtifactReference artifactReference) { + SchemaLookupResult schema = resolve(artifactReference); + + ByteBuffer buffer = ByteBuffer.wrap(data); + int length = buffer.limit(); + int start = buffer.position(); + + return readData(headers, schema, buffer, start, length); + } + + private SchemaLookupResult resolve(ArtifactReference artifactReference) { + try { + return getSchemaResolver().resolveSchemaByArtifactReference(artifactReference); + } catch (RuntimeException e) { + // Empty result + return SchemaLookupResult.builder().build(); + } + } + + private String toSchemaId(SchemaLookupResult result) { + ObjectMapper mapper = new ObjectMapper(); + ArtifactReference ref = result.toArtifactReference(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + try (var generator = mapper.createGenerator(out)) { + generator.writeStartObject(); + putEntry(generator, "globalId", ref.getGlobalId()); + putEntry(generator, "contentId", ref.getContentId()); + putEntry(generator, "groupId", ref.getGroupId()); + putEntry(generator, "artifactId", ref.getArtifactId()); + putEntry(generator, "version", ref.getVersion()); + generator.writeEndObject(); + generator.flush(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return Base64.getUrlEncoder().encodeToString(out.toByteArray()); + } + + static void putEntry(JsonGenerator generator, String key, Object value) throws IOException { + if (value instanceof String str) { + generator.writeStringField(key, str); + } else if (value instanceof Long nbr && nbr != 0) { + generator.writeNumberField(key, nbr); + } + } + + @Override + protected RecordData readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + throw new UnsupportedOperationException(); + } + + @Override + protected RecordData readData(Headers headers, ParsedSchema schema, ByteBuffer buffer, int start, int length) { + throw new UnsupportedOperationException(); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java new file mode 100644 index 000000000..a7ccf57bf --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java @@ -0,0 +1,114 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaParser; +import io.apicurio.registry.resolver.data.Record; +import io.apicurio.registry.types.ArtifactType; + +public class MultiformatSchemaParser implements SchemaParser { + + private static final int CACHE_LIMIT = 20; + private static final char[] PROTOBUF_INTRO = "message ".toCharArray(); + + private final Map> delegates; + private final Map schemaCache = new LinkedHashMap<>(CACHE_LIMIT); + + public MultiformatSchemaParser(Set> delegates) { + this.delegates = delegates.stream() + .map(p -> Map.entry(p.artifactType(), p)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public String artifactType() { + throw new UnsupportedOperationException("MultiformatSchemaParser#artifactType()"); + } + + @Override + public Object parseSchema(byte[] rawSchema, Map> resolvedReferences) { + if (schemaCache.containsKey(rawSchema)) { + return schemaCache.get(rawSchema); + } + + Object parsedSchema = loadSchema(rawSchema, resolvedReferences); + + if (schemaCache.size() == CACHE_LIMIT) { + // Remove the oldest entry + var iter = schemaCache.entrySet().iterator(); + iter.next(); + iter.remove(); + } + + schemaCache.put(rawSchema, parsedSchema); + + return parsedSchema; + } + + @Override + public boolean supportsExtractSchemaFromData() { + return false; + } + + @Override + public ParsedSchema getSchemaFromData(Record data) { + throw new UnsupportedOperationException("MultiformatSchemaParser#getSchemaFromData(Record)"); + } + + @Override + public ParsedSchema getSchemaFromData(Record data, boolean dereference) { + throw new UnsupportedOperationException("MultiformatSchemaParser#getSchemaFromData(Record,boolean)"); + } + + @SuppressWarnings("unchecked") + Object loadSchema(byte[] rawSchema, Map> resolvedReferences) { + Object parsedSchema = null; + SchemaParser delegate = null; + + if (delegates.containsKey(ArtifactType.PROTOBUF) && looksLikeProtobuf(rawSchema)) { + delegate = (SchemaParser) delegates.get(ArtifactType.PROTOBUF); + } else if (delegates.containsKey(ArtifactType.AVRO)) { + delegate = (SchemaParser) delegates.get(ArtifactType.AVRO); + } + + if (delegate != null) { + try { + parsedSchema = delegate.parseSchema(rawSchema, resolvedReferences); + } catch (Exception e) { + // Schema is not valid, will be cached as null + } + } + + return parsedSchema; + } + + boolean looksLikeProtobuf(byte[] rawSchema) { + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(rawSchema))) { + int input; + + while ((input = reader.read()) != -1) { + if (Character.isWhitespace(input)) { + continue; + } + char[] buffer = new char[8]; + buffer[0] = (char) input; + + if (reader.read(buffer, 1, 7) == 7 && Arrays.equals(PROTOBUF_INTRO, buffer)) { + return true; + } + } + } catch (IOException e) { + // Ignore + } + return false; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java new file mode 100644 index 000000000..a8e981f9c --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java @@ -0,0 +1,222 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import jakarta.ws.rs.BadRequestException; + +import org.apache.avro.Schema; +import org.apache.kafka.common.header.Headers; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Struct; + +import io.apicurio.registry.resolver.DefaultSchemaResolver; +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaLookupResult; +import io.apicurio.registry.resolver.SchemaParser; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.resolver.SchemaResolverConfig; +import io.apicurio.registry.resolver.data.Record; +import io.apicurio.registry.resolver.strategy.ArtifactReference; +import io.apicurio.registry.resolver.strategy.ArtifactReferenceResolverStrategy; +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.serde.AbstractKafkaSerializer; +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig; +import io.apicurio.registry.serde.avro.AvroKafkaSerializer; +import io.apicurio.registry.serde.config.BaseKafkaSerDeConfig; +import io.apicurio.registry.serde.data.KafkaSerdeMetadata; +import io.apicurio.registry.serde.data.KafkaSerdeRecord; +import io.apicurio.registry.types.ArtifactType; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +public class MultiformatSerializer extends AbstractKafkaSerializer implements ArtifactReferenceResolverStrategy { + + final ObjectMapper objectMapper; + AvroKafkaSerializer avroSerializer; + ProtobufSerializer protobufSerializer; + SchemaParser parser; + + public MultiformatSerializer(RegistryClient client, ObjectMapper objectMapper) { + super(); + setSchemaResolver(newResolver(client)); + this.objectMapper = objectMapper; + + avroSerializer = new AvroKafkaSerializer<>(); + avroSerializer.setSchemaResolver(newResolver(client)); + + this.protobufSerializer = new ProtobufSerializer(newResolver(client)); + } + + static SchemaResolver newResolver(RegistryClient client) { + var resolver = new DefaultSchemaResolver(); + resolver.setClient(client); + return resolver; + } + + @Override + public void configure(Map configs, boolean isKey) { + Map serConfigs = new HashMap<>(configs); + serConfigs.put(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY, this); + + Map avroConfigs = new HashMap<>(serConfigs); + avroConfigs.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, AvroDatumProvider.class.getName()); + avroConfigs.put(SchemaResolverConfig.FIND_LATEST_ARTIFACT, Boolean.TRUE); + avroSerializer.configure(avroConfigs, isKey); + + Map protobufConfigs = new HashMap<>(serConfigs); + protobufConfigs.put(SchemaResolverConfig.FIND_LATEST_ARTIFACT, Boolean.TRUE); + //protobufConfigs.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE); + protobufConfigs.put(SerdeConfig.VALIDATION_ENABLED, Boolean.TRUE); + protobufSerializer.configure(protobufConfigs, isKey); + + parser = new MultiformatSchemaParser<>(Set.of( + avroSerializer.schemaParser(), + protobufSerializer.schemaParser() + )); + + super.configure(new BaseKafkaSerDeConfig(serConfigs), isKey); + } + + @Override + public void close() { + // don't close - serializer will be reused + } + + @Override + public SchemaParser schemaParser() { + return parser; + } + + @Override + public byte[] serialize(String topic, Headers headers, RecordData data) { + // just return null + if (data == null) { + return null; // NOSONAR - we want to return null and not an empty array + } + + KafkaSerdeMetadata resolverMetadata = new KafkaSerdeMetadata(topic, isKey(), headers); + var reference = artifactReference(new KafkaSerdeRecord<>(resolverMetadata, data), null); + SchemaLookupResult schema = null; + Object parsedSchema = null; + + if (reference != null) { + try { + schema = getSchemaResolver().resolveSchemaByArtifactReference(reference); + if (schema != null && schema.getParsedSchema() instanceof ParsedSchema parsed) { + parsedSchema = parsed.getParsedSchema(); + } + } catch (Exception e) { + // bad request ? + } + } + + if (parsedSchema instanceof Schema avroSchema) { + try { + data.meta.put("schema", Map.of( + "type", ArtifactType.AVRO, + "id", toSchemaId(schema), + "name", avroSchema.getFullName())); + return avroSerializer.serialize(topic, headers, data); + } catch (Exception e) { + throw new BadRequestException(e.getMessage()); + } + } else if (parsedSchema instanceof ProtobufSchema protobufSchema) { + var structBuilder = Struct.newBuilder(); + try { + com.google.protobuf.util.JsonFormat.parser() + .ignoringUnknownFields() + .merge(data.dataString(null), structBuilder); + } catch (InvalidProtocolBufferException e) { + throw new BadRequestException(e.getMessage()); + } + data.meta.put("schema", Map.of( + "type", ArtifactType.PROTOBUF, + "id", toSchemaId(schema), + "name", protobufSchema.getFileDescriptor().getFullName())); + return protobufSerializer.serialize(headers, structBuilder.build(), cast(schema)); + } else { + return data.data; + } + } + + @SuppressWarnings("unchecked") + static T cast(Object object) { + return (T) object; + } + + @Override + public ArtifactReference artifactReference(Record data, ParsedSchema parsedSchema) { + KafkaSerdeRecord kdata = (KafkaSerdeRecord) data; + RecordData rData = kdata.payload(); + + return Optional.ofNullable(rData.meta.get("schema")) + .filter(Map.class::isInstance) + .map(Map.class::cast) + .map(schemaInfo -> (String) schemaInfo.get("ref")) + .map(schemaRef -> { + String[] gav = schemaRef.split(":"); + return ArtifactReference.builder() + .groupId(gav[0]) + .artifactId(gav[1]) + .version(gav.length > 2 ? gav[2] : null) + .build(); + }) + .orElse(null); + } + + + private String toSchemaId(SchemaLookupResult result) { + ObjectMapper mapper = new ObjectMapper(); + ArtifactReference ref = result.toArtifactReference(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + try (var generator = mapper.createGenerator(out)) { + generator.writeStartObject(); + putEntry(generator, "globalId", ref.getGlobalId()); + putEntry(generator, "contentId", ref.getContentId()); + putEntry(generator, "groupId", ref.getGroupId()); + putEntry(generator, "artifactId", ref.getArtifactId()); + putEntry(generator, "version", ref.getVersion()); + generator.writeEndObject(); + generator.flush(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return Base64.getUrlEncoder().encodeToString(out.toByteArray()); + } + + static void putEntry(JsonGenerator generator, String key, Object value) throws IOException { + if (value instanceof String str) { + generator.writeStringField(key, str); + } else if (value instanceof Long nbr && nbr != 0) { + generator.writeNumberField(key, nbr); + } + } + + @Override + public boolean loadSchema() { + return false; + } + + @Override + protected void serializeData(ParsedSchema schema, RecordData data, OutputStream out) { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeData(Headers headers, ParsedSchema schema, RecordData data, OutputStream out) { + throw new UnsupportedOperationException(); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java new file mode 100644 index 000000000..1fe3f5d55 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java @@ -0,0 +1,33 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.nio.ByteBuffer; + +import org.apache.kafka.common.header.Headers; + +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +class ProtobufDeserializer extends ProtobufKafkaDeserializer { + ProtobufDeserializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + @Override + public Message readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + return super.readData(schema, buffer, start, length); + } + + @Override + public Message readData(Headers headers, + ParsedSchema schema, + ByteBuffer buffer, + int start, + int length) { + return super.readData(headers, schema, buffer, start, length); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java new file mode 100644 index 000000000..c82e793d1 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java @@ -0,0 +1,44 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; + +import org.apache.kafka.common.header.Headers; + +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.SchemaLookupResult; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +class ProtobufSerializer extends ProtobufKafkaSerializer { + ProtobufSerializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + public byte[] serialize(Headers headers, Message data, SchemaLookupResult schema) { + // just return null + if (data == null) { + return null; // NOSONAR + } + + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + if (headersHandler != null && headers != null) { + headersHandler.writeHeaders(headers, schema.toArtifactReference()); + serializeData(headers, schema.getParsedSchema(), data, out); + } else { + out.write(MAGIC_BYTE); + getIdHandler().writeId(schema.toArtifactReference(), out); + serializeData(schema.getParsedSchema(), data, out); + } + return out.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java new file mode 100644 index 000000000..a55cb4fbc --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java @@ -0,0 +1,86 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; + +import io.apicurio.registry.resolver.ParsedSchema; + +public class RecordData { + + public static final String BINARY_DATA_MESSAGE = "Binary or non-UTF-8 encoded data cannot be displayed"; + static final int REPLACEMENT_CHARACTER = '\uFFFD'; + + public final Map meta = new LinkedHashMap<>(1); + public final String type; + byte[] data; + ParsedSchema schema; + + public RecordData(String type, byte[] data, ParsedSchema schema) { + super(); + this.type = type; + this.data = data; + this.schema = schema; + } + + public RecordData(byte[] data) { + super(); + this.type = null; + this.data = data; + this.schema = null; + } + + public RecordData(String data) { + this(data != null ? data.getBytes(StandardCharsets.UTF_8) : null); + } + + public String type() { + return type; + } + + public ParsedSchema schema() { + return schema; + } + + public String dataString(Integer maxValueLength) { + return bytesToString(data, maxValueLength); + } + + public static String bytesToString(byte[] bytes, Integer maxValueLength) { + if (bytes == null) { + return null; + } + + if (bytes.length == 0) { + return ""; + } + + int bufferSize = maxValueLength != null ? Math.min(maxValueLength, bytes.length) : bytes.length; + StringBuilder buffer = new StringBuilder(bufferSize); + + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) { + int input; + + while ((input = reader.read()) > -1) { + if (input == REPLACEMENT_CHARACTER || !Character.isDefined(input)) { + return BINARY_DATA_MESSAGE; + } + + buffer.append((char) input); + + if (maxValueLength != null && buffer.length() == maxValueLength) { + break; + } + } + + return buffer.toString(); + } catch (IOException e) { + return BINARY_DATA_MESSAGE; + } + } + +} diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index 0378be55a..a96088f0d 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -25,6 +25,7 @@ quarkus.kafka.devservices.enabled=false quarkus.kubernetes-client.devservices.enabled=false quarkus.devservices.enabled=false +quarkus.vertx.event-loops-pool-size=5 quarkus.vertx.max-event-loop-execute-time=4000 mp.openapi.scan.disable=false diff --git a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java index a8285ac6a..9021ef14e 100644 --- a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java @@ -34,7 +34,7 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; -import com.github.streamshub.console.api.service.RecordService; +import com.github.streamshub.console.api.support.serdes.RecordData; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; @@ -340,7 +340,11 @@ void testConsumeRecordsIncludeOnlyHeaders() { .assertThat() .statusCode(is(Status.OK.getStatusCode())) .body("data", hasSize(3)) - .body("data", everyItem(allOf(is(aMapWithSize(2)), hasEntry("type", "records")))) + .body("data", everyItem(allOf( + is(aMapWithSize(3)), + hasEntry("type", "records"), + hasKey("attributes"), + hasKey("relationships")))) .body("data.attributes.headers", everyItem(hasKey("h1"))); } @@ -376,7 +380,7 @@ void testConsumeRecordWithBinaryValue() throws NoSuchAlgorithmException { .assertThat() .statusCode(is(Status.OK.getStatusCode())) .body("data", hasSize(1)) - .body("data[0].attributes.value", is(equalTo(RecordService.BINARY_DATA_MESSAGE))); + .body("data[0].attributes.value", is(equalTo(RecordData.BINARY_DATA_MESSAGE))); } @ParameterizedTest