From 532766e16d6ea177719e22a25b7d22a2f890e4ee Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Fri, 6 Sep 2024 07:21:22 -0400 Subject: [PATCH] Apicurio Registry integration with Avro+Protobuf ser/des support Signed-off-by: Michael Edgar --- api/pom.xml | 25 +- .../streamshub/console/api/ClientFactory.java | 60 +++- .../console/api/RecordsResource.java | 43 ++- .../console/api/SchemasResource.java | 93 ++++++ .../console/api/model/HasLinks.java | 26 ++ .../streamshub/console/api/model/HasMeta.java | 39 +++ .../console/api/model/JsonApiDocument.java | 28 +- .../api/model/JsonApiRelationship.java | 43 +++ .../console/api/model/KafkaCluster.java | 2 +- .../console/api/model/KafkaRebalance.java | 9 +- .../console/api/model/KafkaRecord.java | 221 ++++++------ .../console/api/model/RecordFilterParams.java | 40 ++- .../console/api/model/Resource.java | 37 +-- .../console/api/service/RecordService.java | 314 ++++++++++-------- .../console/api/support/KafkaContext.java | 105 ++++++ .../support/serdes/ArtifactReferences.java | 91 +++++ .../api/support/serdes/AvroDatumProvider.java | 72 ++++ .../api/support/serdes/AvroDeserializer.java | 31 ++ .../api/support/serdes/ForceCloseable.java | 9 + .../serdes/MultiformatDeserializer.java | 239 +++++++++++++ .../serdes/MultiformatSchemaParser.java | 114 +++++++ .../support/serdes/MultiformatSerializer.java | 239 +++++++++++++ .../support/serdes/ProtobufDeserializer.java | 33 ++ .../support/serdes/ProtobufSerializer.java | 44 +++ .../api/support/serdes/RecordData.java | 79 +++++ api/src/main/resources/application.properties | 7 + .../console/api/RecordsResourceIT.java | 255 +++++++++++++- .../kafka/systemtest/TestPlainProfile.java | 6 + .../console/config/KafkaClusterConfig.java | 9 + .../console/config/SchemaRegistryConfig.java | 17 + pom.xml | 1 + 31 files changed, 1992 insertions(+), 339 deletions(-) create mode 100644 api/src/main/java/com/github/streamshub/console/api/SchemasResource.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/model/HasLinks.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/model/HasMeta.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/ArtifactReferences.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/ForceCloseable.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java create mode 100644 common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java diff --git a/api/pom.xml b/api/pom.xml index 2298d1bb5..e084f8a58 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -58,15 +58,15 @@ io.quarkus - quarkus-resteasy-reactive + quarkus-rest io.quarkus - quarkus-resteasy-reactive-jackson + quarkus-rest-jackson io.quarkus - quarkus-rest-client-reactive + quarkus-rest-client io.quarkus @@ -84,6 +84,10 @@ io.quarkus quarkus-kubernetes-client + + io.quarkus + quarkus-apicurio-registry-avro + io.smallrye.common smallrye-common-annotation @@ -109,6 +113,20 @@ kafka-oauth-client + + io.apicurio + apicurio-registry-serdes-avro-serde + + + io.apicurio + apicurio-registry-serdes-protobuf-serde + ${apicurio-registry.version} + + + com.google.protobuf + protobuf-java-util + + com.fasterxml.jackson.core jackson-annotations @@ -358,6 +376,7 @@ ${keycloak.image} ${strimzi-kafka.tag} + ${apicurio-registry.version} org.jboss.logmanager.LogManager ${maven.home} true diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index cccbd5e54..45f57cd64 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -65,9 +65,13 @@ import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.TrustAllCertificateManager; import com.github.streamshub.console.api.support.ValidationProxy; +import com.github.streamshub.console.api.support.serdes.RecordData; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.client.RegistryClientFactory; +import io.apicurio.registry.serde.SerdeConfig; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; @@ -192,6 +196,10 @@ public ConsoleConfig produceConsoleConfig() { }) .map(consoleConfig -> { consoleConfig.getKafka().getClusters().stream().forEach(cluster -> { + if (cluster.getSchemaRegistry() != null) { + var registryConfig = cluster.getSchemaRegistry(); + registryConfig.setUrl(resolveValue(registryConfig.getUrl())); + } resolveValues(cluster.getProperties()); resolveValues(cluster.getAdminProperties()); resolveValues(cluster.getProducerProperties()); @@ -277,7 +285,7 @@ public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) { findConfig(kafka).ifPresentOrElse( clusterConfig -> { String clusterKey = clusterConfig.clusterKey(); - String clusterId = clusterId(clusterConfig, Optional.of(kafka)); + String clusterId = KafkaContext.clusterId(clusterConfig, Optional.of(kafka)); log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId); log.debugf("Known KafkaContext identifiers: %s", contexts.keySet()); KafkaContext previous = contexts.remove(clusterId); @@ -337,7 +345,14 @@ void putKafkaContext(Map contexts, } String clusterKey = clusterConfig.clusterKey(); - String clusterId = clusterId(clusterConfig, kafkaResource); + String clusterId = KafkaContext.clusterId(clusterConfig, kafkaResource); + + RegistryClient schemaRegistryClient = null; + + if (clusterConfig.getSchemaRegistry() != null) { + String registryUrl = clusterConfig.getSchemaRegistry().getUrl(); + schemaRegistryClient = RegistryClientFactory.create(registryUrl); // NOSONAR - closed elsewhere + } if (!replace && contexts.containsKey(clusterId)) { log.warnf(""" @@ -359,7 +374,10 @@ Connection requires trusted certificate(s) which are not present \ } KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); + ctx.schemaRegistryClient(schemaRegistryClient); + log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId); + KafkaContext previous = contexts.put(clusterId, ctx); Optional.ofNullable(previous).ifPresent(KafkaContext::close); } @@ -370,12 +388,6 @@ boolean defaultedClusterId(KafkaClusterConfig clusterConfig, Optional kaf return clusterConfig.getId() == null && kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId).isEmpty(); } - String clusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource) { - return Optional.ofNullable(clusterConfig.getId()) - .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) - .orElseGet(clusterConfig::getName); - } - /** * Checks whether the previous KafkaContext contained TLS trusted certificates, but due to them being * removed from the Strimzi Kafka CR being in a transient state, they are no longer present. We will ignore @@ -420,6 +432,7 @@ Map requiredConsumerConfig() { configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000); + configs.put(SerdeConfig.ENABLE_HEADERS, "true"); return configs; } @@ -520,26 +533,31 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + public Consumer consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { var configs = maybeAuthenticate(context, Consumer.class); - Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer - return () -> client; + + return new KafkaConsumer<>( + configs, + context.schemaRegistryContext().keyDeserializer(), + context.schemaRegistryContext().valueDeserializer()); } - public void consumerDisposer(@Disposes Supplier> consumer) { - consumer.get().close(); + public void disposeConsumerSupplier(@Disposes Consumer consumer) { + consumer.close(); } @Produces @RequestScoped - public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + public Producer producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { var configs = maybeAuthenticate(context, Producer.class); - Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer - return () -> client; + return new KafkaProducer<>( + configs, + context.schemaRegistryContext().keySerializer(), + context.schemaRegistryContext().valueSerializer()); } - public void producerDisposer(@Disposes Supplier> producer) { - producer.get().close(); + public void disposeProducerSupplier(@Disposes Producer producer) { + producer.close(); } Map maybeAuthenticate(KafkaContext context, Class clientType) { @@ -571,6 +589,12 @@ Map buildConfig(Set configNames, .map(Optional::get) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new)); + // Ensure no given properties are skipped. The previous stream processing allows + // for the standard config names to be obtained from the given maps, but also from + // config overrides via MicroProfile Config. + clientProperties.get().forEach(cfg::putIfAbsent); + config.getProperties().forEach(cfg::putIfAbsent); + var listenerSpec = cluster.map(Kafka::getSpec) .map(KafkaSpec::getKafka) .map(KafkaClusterSpec::getListeners) diff --git a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java index ecd388ea5..a16498521 100644 --- a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java @@ -19,10 +19,10 @@ import jakarta.ws.rs.core.CacheControl; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.Response.Status; import jakarta.ws.rs.core.UriBuilder; import jakarta.ws.rs.core.UriInfo; import jakarta.ws.rs.ext.RuntimeDelegate; -import jakarta.ws.rs.core.Response.Status; import org.eclipse.microprofile.openapi.annotations.Operation; import org.eclipse.microprofile.openapi.annotations.enums.Explode; @@ -67,12 +67,12 @@ public class RecordsResource { summary = "Consume records from a topic", description = "Consume a limited number of records from a topic, optionally specifying a partition and an absolute offset or timestamp as the starting point for message retrieval.") @APIResponseSchema( - value = KafkaRecord.ListResponse.class, + value = KafkaRecord.KafkaRecordDataList.class, responseDescription = "List of records matching the request query parameters.") @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") - public Response consumeRecords( + public CompletionStage consumeRecords( @Parameter(description = "Cluster identifier") @PathParam("clusterId") String clusterId, @@ -98,7 +98,9 @@ public Response consumeRecords( KafkaRecord.Fields.HEADERS, KafkaRecord.Fields.KEY, KafkaRecord.Fields.VALUE, - KafkaRecord.Fields.SIZE + KafkaRecord.Fields.SIZE, + KafkaRecord.Fields.KEY_SCHEMA, + KafkaRecord.Fields.VALUE_SCHEMA, }, payload = ErrorCategory.InvalidQueryParameter.class) @Parameter( @@ -116,15 +118,27 @@ public Response consumeRecords( KafkaRecord.Fields.HEADERS, KafkaRecord.Fields.KEY, KafkaRecord.Fields.VALUE, - KafkaRecord.Fields.SIZE + KafkaRecord.Fields.SIZE, + KafkaRecord.Fields.KEY_SCHEMA, + KafkaRecord.Fields.VALUE_SCHEMA, })) List fields) { requestedFields.accept(fields); - var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength()); - CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store"); - return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build(); + + return recordService.consumeRecords( + topicId, + params.getPartition(), + params.getOffset(), + params.getTimestamp(), + params.getLimit(), + fields, + params.getMaxValueLength()) + .thenApply(KafkaRecord.KafkaRecordDataList::new) + .thenApply(Response::ok) + .thenApply(response -> response.cacheControl(noStore)) + .thenApply(Response.ResponseBuilder::build); } @POST @@ -135,7 +149,7 @@ public Response consumeRecords( description = "Produce (write) a single record to a topic") @APIResponseSchema( responseCode = "201", - value = KafkaRecord.KafkaRecordDocument.class, + value = KafkaRecord.KafkaRecordData.class, responseDescription = "Record was successfully sent to the topic") @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @@ -151,20 +165,19 @@ public CompletionStage produceRecord( String topicId, @Valid - KafkaRecord.KafkaRecordDocument message) { + KafkaRecord.KafkaRecordData message) { final UriBuilder location = uriInfo.getRequestUriBuilder(); requestedFields.accept(KafkaRecord.Fields.ALL); - return recordService.produceRecord(topicId, message.getData().getAttributes()) - .thenApply(KafkaRecord.KafkaRecordDocument::new) + return recordService.produceRecord(topicId, message.getData()) + .thenApply(KafkaRecord.KafkaRecordData::new) .thenApply(entity -> Response.status(Status.CREATED) .entity(entity) .location(location - .queryParam("filter[partition]", entity.getData().getAttributes().getPartition()) - .queryParam("filter[offset]", entity.getData().getAttributes().getOffset()) + .queryParam("filter[partition]", entity.getData().partition()) + .queryParam("filter[offset]", entity.getData().offset()) .build())) .thenApply(Response.ResponseBuilder::build); - } } diff --git a/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java new file mode 100644 index 000000000..bd44fd98d --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java @@ -0,0 +1,93 @@ +package com.github.streamshub.console.api; + +import java.util.Base64; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +import org.eclipse.microprofile.openapi.annotations.media.Content; +import org.eclipse.microprofile.openapi.annotations.parameters.Parameter; +import org.eclipse.microprofile.openapi.annotations.responses.APIResponse; +import org.eclipse.microprofile.openapi.annotations.tags.Tag; +import org.jboss.logging.Logger; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.streamshub.console.api.support.KafkaContext; +import com.github.streamshub.console.api.support.serdes.ArtifactReferences; +import com.github.streamshub.console.api.support.serdes.MultiformatSchemaParser; + +import io.apicurio.registry.resolver.DefaultSchemaResolver; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.rest.client.RegistryClient; + +@Path("/api/schemas/{schemaId}") +@Tag(name = "Schema Registry Resources") +public class SchemasResource { + + @Inject + Logger logger; + + @Inject + ObjectMapper objectMapper; + + @Inject + Map kafkaContexts; + + @GET + @Produces(MediaType.APPLICATION_JSON) + @APIResponse(responseCode = "200", ref = "Configurations", content = @Content()) + @APIResponse(responseCode = "404", ref = "NotFound") + @APIResponse(responseCode = "500", ref = "ServerError") + @APIResponse(responseCode = "504", ref = "ServerTimeout") + public Response describeConfigs( + @Parameter(description = "Schema identifier") + @PathParam("schemaId") + String schemaId) { + + int clusterTerm = schemaId.indexOf('.'); + + if (clusterTerm < 0) { + throw new NotFoundException("No such schema"); + } + + String clusterId = new String(Base64.getUrlDecoder().decode(schemaId.substring(0, clusterTerm))); + + if (!kafkaContexts.containsKey(clusterId)) { + throw new NotFoundException("No such schema"); + } + + RegistryClient registryClient = kafkaContexts.get(clusterId).schemaRegistryContext().registryClient(); + + if (registryClient == null) { + throw new NotFoundException("Schema not found, no registry is configured"); + } + + @SuppressWarnings("resource") + SchemaResolver schemaResolver = new DefaultSchemaResolver<>(); + schemaResolver.setClient(registryClient); + schemaResolver.configure(Collections.emptyMap(), new MultiformatSchemaParser<>(Collections.emptySet())); + + schemaId = schemaId.substring(clusterTerm + 1); + + var reference = ArtifactReferences.fromSchemaId(schemaId, objectMapper); + var schema = schemaResolver.resolveSchemaByArtifactReference(reference); + + var response = Optional.ofNullable(schema) + .map(s -> s.getParsedSchema()) + .map(s -> s.getRawSchema()) + .map(Response::ok) + .orElseThrow(() -> new NotFoundException("No such schema")); + + return response.build(); + } + +} diff --git a/api/src/main/java/com/github/streamshub/console/api/model/HasLinks.java b/api/src/main/java/com/github/streamshub/console/api/model/HasLinks.java new file mode 100644 index 000000000..dde84d213 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/model/HasLinks.java @@ -0,0 +1,26 @@ +package com.github.streamshub.console.api.model; + +import java.util.LinkedHashMap; +import java.util.Map; + +interface HasLinks { + + Map links(); + + void links(Map links); + + default T addLink(String key, String value) { + links(addEntry(links(), key, value)); + @SuppressWarnings("unchecked") + T t = (T) this; + return t; + } + + private static Map addEntry(Map map, K key, V value) { + if (map == null) { + map = new LinkedHashMap<>(); + } + map.put(key, value); + return map; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/model/HasMeta.java b/api/src/main/java/com/github/streamshub/console/api/model/HasMeta.java new file mode 100644 index 000000000..ff3db623a --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/model/HasMeta.java @@ -0,0 +1,39 @@ +package com.github.streamshub.console.api.model; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +interface HasMeta { + + JsonApiMeta meta(); + + void meta(JsonApiMeta meta); + + default JsonApiMeta metaFactory() { + return new JsonApiMeta(); + } + + @JsonIgnore + default JsonApiMeta getOrCreateMeta() { + JsonApiMeta meta = meta(); + + if (meta == null) { + meta = metaFactory(); + meta(meta); + } + + return meta; + } + + default Object meta(String key) { + JsonApiMeta meta = meta(); + return meta != null ? meta.get(key) : null; + } + + default T addMeta(String key, Object value) { + JsonApiMeta meta = getOrCreateMeta(); + meta.put(key, value); + @SuppressWarnings("unchecked") + T t = (T) this; + return t; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiDocument.java b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiDocument.java index 0b2d9eb70..f0101d70b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiDocument.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiDocument.java @@ -1,11 +1,10 @@ package com.github.streamshub.console.api.model; -import java.util.LinkedHashMap; import java.util.Map; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; /** * Base class for all JSON API request and response bodies. @@ -13,31 +12,18 @@ * @see JSON API Document Structure, 7.1 Top Level */ @JsonInclude(value = Include.NON_NULL) -public abstract class JsonApiDocument { +public abstract class JsonApiDocument implements HasLinks, HasMeta { private JsonApiMeta meta; private Map links; - static Map addEntry(Map map, K key, V value) { - if (map == null) { - map = new LinkedHashMap<>(); - } - map.put(key, value); - return map; - } - @JsonProperty public JsonApiMeta meta() { return meta; } - public Object meta(String key) { - return meta != null ? meta.get(key) : null; - } - - public JsonApiDocument addMeta(String key, Object value) { - meta = JsonApiMeta.put(meta, key, value); - return this; + public void meta(JsonApiMeta meta) { + this.meta = meta; } @JsonProperty @@ -45,9 +31,7 @@ public Map links() { return links; } - public JsonApiDocument addLink(String key, String value) { - links = addEntry(links, key, value); - return this; + public void links(Map links) { + this.links = links; } - } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java new file mode 100644 index 000000000..49ccf44df --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java @@ -0,0 +1,43 @@ +package com.github.streamshub.console.api.model; + +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonInclude(value = Include.NON_NULL) +public class JsonApiRelationship implements HasLinks, HasMeta { + + private JsonApiMeta meta; + private Identifier data; + private Map links; + + @JsonProperty + public JsonApiMeta meta() { + return meta; + } + + public void meta(JsonApiMeta meta) { + this.meta = meta; + } + + @JsonProperty + public Map links() { + return links; + } + + public void links(Map links) { + this.links = links; + } + + @JsonProperty + public Identifier data() { + return data; + } + + public void data(Identifier data) { + this.data = data; + } + +} diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java index 64795ff0d..c40db318b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java @@ -244,7 +244,7 @@ public void conditions(List conditions) { @JsonIgnore public boolean isConfigured() { - return Boolean.TRUE.equals(getMeta("configured")); + return Boolean.TRUE.equals(meta("configured")); } @JsonIgnore diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java index 8fbb3df3a..c4cf514a5 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java @@ -254,7 +254,7 @@ static class Attributes { } public KafkaRebalance(String id) { - super(id, API_TYPE, Meta::new, new Attributes()); + super(id, API_TYPE, new Attributes()); } @JsonCreator @@ -270,6 +270,11 @@ public static KafkaRebalance fromCursor(JsonObject cursor) { return PaginatedKubeResource.fromCursor(cursor, KafkaRebalance::new); } + @Override + public JsonApiMeta metaFactory() { + return new Meta(); + } + public List allowedActions() { return ((Meta) getOrCreateMeta()).allowedActions; } @@ -279,7 +284,7 @@ public void autoApproval(boolean autoApproval) { } public String action() { - return Optional.ofNullable(getMeta()) + return Optional.ofNullable(meta()) .map(Meta.class::cast) .map(meta -> meta.action) .orElse(null); diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java index 1b27fca62..b2788977e 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRecord.java @@ -19,9 +19,17 @@ import io.xlate.validation.constraints.Expression; -@Schema(name = "KafkaRecordAttributes") -@JsonFilter("fieldFilter") -public class KafkaRecord { +@Schema(name = "KafkaRecord") +@Expression( + when = "self.type != null", + value = "self.type == '" + KafkaRecord.API_TYPE + "'", + message = "resource type conflicts with operation", + node = "type", + payload = ErrorCategory.ResourceConflict.class) +public class KafkaRecord extends RelatableResource { + + public static final String API_TYPE = "records"; + public static final String FIELDS_PARAM = "fields[" + API_TYPE + "]"; public static final class Fields { public static final String PARTITION = "partition"; @@ -32,6 +40,8 @@ public static final class Fields { public static final String KEY = "key"; public static final String VALUE = "value"; public static final String SIZE = "size"; + public static final String KEY_SCHEMA = "keySchema"; + public static final String VALUE_SCHEMA = "valueSchema"; public static final String DEFAULT = PARTITION + @@ -41,7 +51,9 @@ public static final class Fields { ", " + HEADERS + ", " + KEY + ", " + VALUE + - ", " + SIZE; + ", " + SIZE + + ", " + KEY_SCHEMA + + ", " + VALUE_SCHEMA; public static final List ALL = List.of( PARTITION, @@ -51,180 +63,197 @@ public static final class Fields { HEADERS, KEY, VALUE, - SIZE); + SIZE, + KEY_SCHEMA, + VALUE_SCHEMA); private Fields() { // Prevent instances } } - @Schema(name = "KafkaRecordListResponse") - public static final class ListResponse extends DataList { - public ListResponse(List data) { - super(data.stream().map(RecordResource::new).toList()); + @Schema(name = "KafkaRecordDataList") + public static final class KafkaRecordDataList extends DataList { + public KafkaRecordDataList(List data) { + super(data); } } - @Schema(name = "KafkaRecordDocument") - public static final class KafkaRecordDocument extends DataSingleton { + @Schema(name = "KafkaRecordData") + public static final class KafkaRecordData extends DataSingleton { @JsonCreator - public KafkaRecordDocument(@JsonProperty("data") RecordResource data) { + public KafkaRecordData(@JsonProperty("data") KafkaRecord data) { super(data); } - - public KafkaRecordDocument(KafkaRecord data) { - super(new RecordResource(data)); - } } - @Schema(name = "KafkaRecord") - @Expression( - when = "self.type != null", - value = "self.type == 'records'", - message = "resource type conflicts with operation", - node = "type", - payload = ErrorCategory.ResourceConflict.class - ) - public static final class RecordResource extends Resource { - @JsonCreator - public RecordResource(String type, KafkaRecord attributes) { - super(null, type, attributes); - } + @JsonFilter("fieldFilter") + @Schema(name = "KafkaRecordAttributes") + static class Attributes { + @JsonIgnore + String topic; - public RecordResource(KafkaRecord attributes) { - super(null, "records", attributes); - } - } + @JsonProperty + @Schema(description = "The record's partition within the topic") + Integer partition; - @JsonIgnore - String topic; + @JsonProperty + @Schema(readOnly = true, description = "The record's offset within the topic partition") + Long offset; - @Schema(description = "The record's partition within the topic") - Integer partition; + @JsonProperty + @Schema(description = "Timestamp associated with the record. The type is indicated by `timestampType`. When producing a record, this value will be used as the record's `CREATE_TIME`.", format = "date-time") + Instant timestamp; - @Schema(readOnly = true, description = "The record's offset within the topic partition") - Long offset; + @JsonProperty + @Schema(readOnly = true, description = "Type of timestamp associated with the record") + String timestampType; - @Schema(description = "Timestamp associated with the record. The type is indicated by `timestampType`. When producing a record, this value will be used as the record's `CREATE_TIME`.", format = "date-time") - Instant timestamp; + @JsonProperty + @Schema(description = "Record headers, key/value pairs") + Map headers; - @Schema(readOnly = true, description = "Type of timestamp associated with the record") - String timestampType; + @JsonProperty + @Schema(description = "Record key") + String key; - @Schema(description = "Record headers, key/value pairs") - Map headers; + @JsonProperty + @NotNull + @Schema(description = "Record value") + String value; - @Schema(description = "Record key") - String key; + @JsonProperty + @Schema(readOnly = true, description = "Size of the uncompressed record, not including the overhead of the record in the log segment.") + Long size; + } - @NotNull - @Schema(description = "Record value") - String value; + @JsonFilter("fieldFilter") + static class Relationships { + @JsonProperty + JsonApiRelationship keySchema; - @Schema(readOnly = true, description = "Size of the uncompressed record, not including the overhead of the record in the log segment.") - Long size; + @JsonProperty + JsonApiRelationship valueSchema; + } + @JsonCreator public KafkaRecord() { - super(); + super(null, API_TYPE, new Attributes(), new Relationships()); } public KafkaRecord(String topic) { - this.topic = topic; + this(); + attributes.topic = topic; } public KafkaRecord(String topic, Integer partition, Instant timestamp, Map headers, String key, String value, Long size) { this(topic); - this.partition = partition; - this.timestamp = timestamp; - this.headers = headers; - this.key = key; - this.value = value; - this.size = size; + attributes.partition = partition; + attributes.timestamp = timestamp; + attributes.headers = headers; + attributes.key = key; + attributes.value = value; + attributes.size = size; } @JsonIgnore public URI buildUri(UriBuilder builder, String topicName) { - builder.queryParam(Fields.PARTITION, partition); - builder.queryParam(Fields.OFFSET, offset); + builder.queryParam(Fields.PARTITION, attributes.partition); + builder.queryParam(Fields.OFFSET, attributes.offset); return builder.build(topicName); } @AssertTrue(message = "invalid timestamp") @JsonIgnore public boolean isTimestampValid() { - if (timestamp == null) { + if (attributes.timestamp == null) { return true; } try { - return timestamp.isAfter(Instant.ofEpochMilli(-1)); + return attributes.timestamp.isAfter(Instant.ofEpochMilli(-1)); } catch (Exception e) { return false; } } - public Integer getPartition() { - return partition; + public Integer partition() { + return attributes.partition; + } + + public void partition(Integer partition) { + attributes.partition = partition; + } + + public Long offset() { + return attributes.offset; + } + + public void offset(Long offset) { + attributes.offset = offset; } - public void setPartition(Integer partition) { - this.partition = partition; + public Instant timestamp() { + return attributes.timestamp; } - public Long getOffset() { - return offset; + public void timestamp(Instant timestamp) { + attributes.timestamp = timestamp; } - public void setOffset(Long offset) { - this.offset = offset; + public String timestampType() { + return attributes.timestampType; } - public Instant getTimestamp() { - return timestamp; + public void timestampType(String timestampType) { + attributes.timestampType = timestampType; } - public void setTimestamp(Instant timestamp) { - this.timestamp = timestamp; + public Map headers() { + return attributes.headers; } - public String getTimestampType() { - return timestampType; + public void headers(Map headers) { + attributes.headers = headers; } - public void setTimestampType(String timestampType) { - this.timestampType = timestampType; + public String key() { + return attributes.key; } - public Map getHeaders() { - return headers; + public void key(String key) { + attributes.key = key; } - public void setHeaders(Map headers) { - this.headers = headers; + public String value() { + return attributes.value; } - public String getKey() { - return key; + public void value(String value) { + attributes.value = value; } - public void setKey(String key) { - this.key = key; + public Long size() { + return attributes.size; } - public String getValue() { - return value; + public void size(Long size) { + attributes.size = size; } - public void setValue(String value) { - this.value = value; + public JsonApiRelationship keySchema() { + return relationships.keySchema; } - public Long getSize() { - return size; + public void keySchema(JsonApiRelationship keySchema) { + relationships.keySchema = keySchema; } - public void setSize(Long size) { - this.size = size; + public JsonApiRelationship valueSchema() { + return relationships.valueSchema; } + public void valueSchema(JsonApiRelationship valueSchema) { + relationships.valueSchema = valueSchema; + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java b/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java index 75d0ad24e..7fd62f51e 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java @@ -19,12 +19,18 @@ @Expression( when = "self.rawTimestamp != null", value = "self.rawOffset == null", - node = "filter[offset]", + node = RecordFilterParams.FILTER_OFFSET, message = "Parameter `filter[offset]` must not be used when `filter[timestamp]` is present.", payload = ErrorCategory.InvalidQueryParameter.class) public class RecordFilterParams { - @QueryParam("filter[partition]") + static final String FILTER_PARTITION = "filter[partition]"; + static final String FILTER_OFFSET = "filter[offset]"; + static final String FILTER_TIMESTAMP = "filter[timestamp]"; + static final String PAGE_SIZE = "page[size]"; + static final String MAX_VALUE_LENGTH = "maxValueLength"; + + @QueryParam(FILTER_PARTITION) @Parameter( description = """ Retrieve messages only from the partition identified by this parameter. @@ -39,23 +45,23 @@ public class RecordFilterParams { value = "self.operator == 'eq'", message = "unsupported filter operator, supported values: [ 'eq' ]", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[partition]") + node = FILTER_PARTITION) @Expression( when = "self != null", value = "self.operands.size() == 1", message = "exactly 1 operand is required", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[partition]") + node = FILTER_PARTITION) @Expression( when = "self != null && self.operator == 'eq' && self.operands.size() == 1", value = "val = Integer.parseInt(self.firstOperand); val >= 0 && val <= Integer.MAX_VALUE", exceptionalValue = ExceptionalValue.FALSE, message = "operand must be an integer between 0 and " + Integer.MAX_VALUE + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[partition]") + node = FILTER_PARTITION) FetchFilter partition; - @QueryParam("filter[offset]") + @QueryParam(FILTER_OFFSET) @Parameter( description = """ Retrieve messages with an offset greater than or equal to the filter @@ -80,23 +86,23 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "unsupported filter operator, supported values: [ 'gte' ]", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[offset]") + node = FILTER_OFFSET) @Expression( when = "self != null", value = "self.operands.size() == 1", message = "exactly 1 operand is required", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[offset]") + node = FILTER_OFFSET) @Expression( when = "self != null && self.operator == 'gte'", value = "val = Long.parseLong(self.firstOperand); val >= 0 && val <= Long.MAX_VALUE", exceptionalValue = ExceptionalValue.FALSE, message = "operand must be an integer between 0 and " + Long.MAX_VALUE + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[offset]") + node = FILTER_OFFSET) FetchFilter offset; - @QueryParam("filter[timestamp]") + @QueryParam(FILTER_TIMESTAMP) @Parameter( description = """ Retrieve messages with a timestamp greater than or equal to the filter @@ -120,13 +126,13 @@ public class RecordFilterParams { value = "self.operator == 'gte'", message = "unsupported filter operator, supported values: [ 'gte' ]", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[timestamp]") + node = FILTER_TIMESTAMP) @Expression( when = "self != null", value = "self.operands.size() == 1", message = "exactly 1 operand is required", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[timestamp]") + node = FILTER_TIMESTAMP) @Expression( when = "self != null && self.operator == 'gte'", classImports = "java.time.Instant", @@ -134,10 +140,10 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "operand must be a valid RFC 3339 date-time no earlier than `1970-01-01T00:00:00Z`", payload = ErrorCategory.InvalidQueryParameter.class, - node = "filter[timestamp]") + node = FILTER_TIMESTAMP) FetchFilter timestamp; - @QueryParam("page[size]") + @QueryParam(PAGE_SIZE) @DefaultValue(ListFetchParams.PAGE_SIZE_DEFAULT + "") @Parameter( description = "Limit the number of records fetched and returned", @@ -152,10 +158,10 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "must be an integer between 1 and " + ListFetchParams.PAGE_SIZE_MAX + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "page[size]") + node = PAGE_SIZE) String pageSize; - @QueryParam("maxValueLength") + @QueryParam(MAX_VALUE_LENGTH) @Parameter( description = """ Maximum length of string values returned in the response. @@ -169,7 +175,7 @@ public class RecordFilterParams { exceptionalValue = ExceptionalValue.FALSE, message = "must be an integer between 1 and " + Integer.MAX_VALUE + ", inclusive", payload = ErrorCategory.InvalidQueryParameter.class, - node = "maxValueLength") + node = MAX_VALUE_LENGTH) String maxValueLength; public String getRawOffset() { diff --git a/api/src/main/java/com/github/streamshub/console/api/model/Resource.java b/api/src/main/java/com/github/streamshub/console/api/model/Resource.java index 84beccea5..aed51567b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/Resource.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/Resource.java @@ -1,13 +1,11 @@ package com.github.streamshub.console.api.model; -import java.util.function.Supplier; - import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; import com.github.streamshub.console.api.support.ErrorCategory; /** @@ -18,7 +16,7 @@ * @param the type of the attribute model */ @JsonInclude(Include.NON_NULL) -public abstract class Resource { +public abstract class Resource implements HasMeta { protected String id; @@ -32,28 +30,19 @@ public abstract class Resource { @NotNull(payload = ErrorCategory.InvalidResource.class) protected final T attributes; - @JsonIgnore - private final Supplier metaFactory; - protected Resource(String id, String type, JsonApiMeta meta, T attributes) { this.id = id; this.type = type; this.meta = meta; - this.metaFactory = JsonApiMeta::new; this.attributes = attributes; } - protected Resource(String id, String type, Supplier metaFactory, T attributes) { + protected Resource(String id, String type, T attributes) { this.id = id; this.type = type; - this.metaFactory = metaFactory; this.attributes = attributes; } - protected Resource(String id, String type, T attributes) { - this(id, type, JsonApiMeta::new, attributes); - } - public String getId() { return id; } @@ -66,24 +55,12 @@ public T getAttributes() { return attributes; } - public JsonApiMeta getMeta() { + @JsonProperty + public JsonApiMeta meta() { return meta; } - @JsonIgnore - public JsonApiMeta getOrCreateMeta() { - if (meta == null) { - meta = metaFactory.get(); - } - return meta; - } - - public Object getMeta(String key) { - return meta != null ? meta.get(key) : null; - } - - public Resource addMeta(String key, Object value) { - getOrCreateMeta().put(key, value); - return this; + public void meta(JsonApiMeta meta) { + this.meta = meta; } } diff --git a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java index 0b8c40a3d..0a2fcd670 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java @@ -1,13 +1,10 @@ package com.github.streamshub.console.api.service; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -18,11 +15,12 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -38,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -48,18 +47,18 @@ import org.eclipse.microprofile.context.ThreadContext; import org.jboss.logging.Logger; +import com.github.streamshub.console.api.model.Identifier; +import com.github.streamshub.console.api.model.JsonApiRelationship; import com.github.streamshub.console.api.model.KafkaRecord; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.SizeLimitedSortedSet; +import com.github.streamshub.console.api.support.serdes.RecordData; import static java.util.Objects.requireNonNullElse; @ApplicationScoped public class RecordService { - public static final String BINARY_DATA_MESSAGE = "Binary or non-UTF-8 encoded data cannot be displayed"; - static final int REPLACEMENT_CHARACTER = '\uFFFD'; - @Inject Logger logger; @@ -67,15 +66,15 @@ public class RecordService { KafkaContext kafkaContext; @Inject - Supplier> consumerSupplier; + Consumer consumer; @Inject - Supplier> producerSupplier; + Producer producer; @Inject ThreadContext threadContext; - public List consumeRecords(String topicId, + public CompletionStage> consumeRecords(String topicId, Integer partition, Long offset, Instant timestamp, @@ -83,72 +82,62 @@ public List consumeRecords(String topicId, List include, Integer maxValueLength) { - List partitions = topicNameForId(topicId) - .thenApplyAsync( - topicName -> consumerSupplier.get().partitionsFor(topicName), - threadContext.currentContextExecutor()) - .toCompletableFuture() - .join(); - - List assignments = partitions.stream() - .filter(p -> partition == null || partition.equals(p.partition())) - .map(p -> new TopicPartition(p.topic(), p.partition())) - .toList(); + return topicNameForId(topicId).thenApplyAsync(topicName -> { + List partitions = consumer.partitionsFor(topicName); + List assignments = partitions.stream() + .filter(p -> partition == null || partition.equals(p.partition())) + .map(p -> new TopicPartition(p.topic(), p.partition())) + .toList(); - if (assignments.isEmpty()) { - return Collections.emptyList(); - } + if (assignments.isEmpty()) { + return Collections.emptyList(); + } - Consumer consumer = consumerSupplier.get(); - consumer.assign(assignments); - var endOffsets = consumer.endOffsets(assignments); + consumer.assign(assignments); + var endOffsets = consumer.endOffsets(assignments); - if (timestamp != null) { - seekToTimestamp(consumer, assignments, timestamp); - } else { - seekToOffset(consumer, assignments, endOffsets, offset, limit); - } + if (timestamp != null) { + seekToTimestamp(consumer, assignments, timestamp); + } else { + seekToOffset(consumer, assignments, endOffsets, offset, limit); + } - Iterable> poll = () -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit); - var limitSet = new SizeLimitedSortedSet>(buildComparator(timestamp, offset), limit); + Iterable> poll = () -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit); + var limitSet = new SizeLimitedSortedSet>(buildComparator(timestamp, offset), limit); - return StreamSupport.stream(poll.spliterator(), false) - .flatMap(records -> StreamSupport.stream(records.spliterator(), false)) - .collect(Collectors.toCollection(() -> limitSet)) - .stream() - .map(rec -> getItems(rec, topicId, include, maxValueLength)) - .toList(); + return StreamSupport.stream(poll.spliterator(), false) + .flatMap(records -> StreamSupport.stream(records.spliterator(), false)) + .collect(Collectors.toCollection(() -> limitSet)) + .stream() + .map(rec -> getItems(rec, topicId, include, maxValueLength)) + .toList(); + }, threadContext.currentContextExecutor()); } public CompletionStage produceRecord(String topicId, KafkaRecord input) { CompletableFuture promise = new CompletableFuture<>(); - Executor asyncExec = threadContext.currentContextExecutor(); - - topicNameForId(topicId) - .thenApplyAsync( - topicName -> producerSupplier.get().partitionsFor(topicName), - asyncExec) - .thenAcceptAsync( - partitions -> { - Producer producer = producerSupplier.get(); - String topicName = partitions.iterator().next().topic(); - Integer partition = input.getPartition(); - - if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) { - promise.completeExceptionally(invalidPartition(topicId, partition)); - } else { - send(topicName, input, producer, promise); - } - }, - asyncExec); + + topicNameForId(topicId).thenAcceptAsync(topicName -> { + List partitions = producer.partitionsFor(topicName); + Integer partition = input.partition(); + + if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) { + promise.completeExceptionally(invalidPartition(topicId, partition)); + } else { + send(topicName, input, producer, promise); + } + + promise.whenComplete((kafkaRecord, error) -> producer.close()); + }, threadContext.currentContextExecutor()).exceptionally(e -> { + promise.completeExceptionally(e); + return null; + }); return promise; } - void send(String topicName, KafkaRecord input, Producer producer, CompletableFuture promise) { - String key = input.getKey(); - - List
headers = Optional.ofNullable(input.getHeaders()) + void send(String topicName, KafkaRecord input, Producer producer, CompletableFuture promise) { + List
headers = Optional.ofNullable(input.headers()) .orElseGet(Collections::emptyMap) .entrySet() .stream() @@ -164,35 +153,78 @@ public byte[] value() { } }) .map(Header.class::cast) - .toList(); + .collect(Collectors.toCollection(ArrayList::new)); - Long timestamp = Optional.ofNullable(input.getTimestamp()).map(Instant::toEpochMilli).orElse(null); + Long timestamp = Optional.ofNullable(input.timestamp()).map(Instant::toEpochMilli).orElse(null); + var key = new RecordData(input.key()); + setSchemaMeta(input.keySchema(), key); - ProducerRecord request = new ProducerRecord<>(topicName, - input.getPartition(), + var value = new RecordData(input.value()); + setSchemaMeta(input.valueSchema(), value); + + ProducerRecord request = new ProducerRecord<>(topicName, + input.partition(), timestamp, key, - input.getValue(), + value, headers); - producer.send(request, (meta, exception) -> { - if (exception != null) { - promise.completeExceptionally(exception); - } else { + CompletableFuture.completedFuture(null) + .thenApplyAsync(nothing -> { + try { + return producer.send(request).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CompletionException("Error occurred while sending record to Kafka cluster", e); + } catch (Exception e) { + throw new CompletionException("Error occurred while sending record to Kafka cluster", e); + } + }, threadContext.currentContextExecutor()) + .thenApply(meta -> { KafkaRecord result = new KafkaRecord(); - result.setPartition(meta.partition()); + result.partition(meta.partition()); + if (meta.hasOffset()) { - result.setOffset(meta.offset()); + result.offset(meta.offset()); } + if (meta.hasTimestamp()) { - result.setTimestamp(Instant.ofEpochMilli(meta.timestamp())); + result.timestamp(Instant.ofEpochMilli(meta.timestamp())); } - result.setKey(input.getKey()); - result.setValue(input.getValue()); - result.setHeaders(input.getHeaders()); - promise.complete(result); - } - }); + + result.key(key.dataString(null)); + result.value(value.dataString(null)); + result.headers(Arrays.stream(request.headers().toArray()) + .collect( + // Duplicate headers will be overwritten + LinkedHashMap::new, + (map, hdr) -> map.put(hdr.key(), headerValue(hdr, null)), + HashMap::putAll)); + result.size(sizeOf(meta, request.headers())); + maybeRelate(key).ifPresent(result::keySchema); + maybeRelate(value).ifPresent(result::valueSchema); + + return result; + }) + .thenAccept(promise::complete) + .exceptionally(exception -> { + promise.completeExceptionally(exception); + return null; + }); + } + + void setSchemaMeta(JsonApiRelationship schemaRelationship, RecordData data) { + schemaMeta(schemaRelationship, "coordinates").ifPresent(gav -> data.meta.put("schema-gav", gav)); + schemaMeta(schemaRelationship, "messageType").ifPresent(type -> data.meta.put("message-type", type)); + } + + Optional schemaMeta(JsonApiRelationship schemaRelationship, String key) { + return Optional.ofNullable(schemaRelationship) + .map(JsonApiRelationship::meta) + .map(meta -> { + Object value = meta.get(key); + return (value instanceof String stringValue) ? stringValue : null; + }); } CompletionStage topicNameForId(String topicId) { @@ -210,7 +242,7 @@ CompletionStage topicNameForId(String topicId) { .orElseThrow(() -> noSuchTopic(topicId))); } - void seekToTimestamp(Consumer consumer, List assignments, Instant timestamp) { + void seekToTimestamp(Consumer consumer, List assignments, Instant timestamp) { Long tsMillis = timestamp.toEpochMilli(); Map timestampsToSearch = assignments.stream() .collect(Collectors.toMap(Function.identity(), p -> tsMillis)); @@ -237,7 +269,7 @@ void seekToTimestamp(Consumer consumer, List ass }); } - void seekToOffset(Consumer consumer, List assignments, Map endOffsets, Long offset, int limit) { + void seekToOffset(Consumer consumer, List assignments, Map endOffsets, Long offset, int limit) { var beginningOffsets = consumer.beginningOffsets(assignments); assignments.forEach(p -> { @@ -263,9 +295,9 @@ void seekToOffset(Consumer consumer, List assign }); } - Comparator> buildComparator(Instant timestamp, Long offset) { - Comparator> comparator = Comparator - .>comparingLong(ConsumerRecord::timestamp) + Comparator> buildComparator(Instant timestamp, Long offset) { + Comparator> comparator = Comparator + .>comparingLong(ConsumerRecord::timestamp) .thenComparingInt(ConsumerRecord::partition) .thenComparingLong(ConsumerRecord::offset); @@ -277,71 +309,89 @@ Comparator> buildComparator(Instant timestamp, Lo return comparator; } - KafkaRecord getItems(ConsumerRecord rec, String topicId, List include, Integer maxValueLength) { + KafkaRecord getItems(ConsumerRecord rec, String topicId, List include, Integer maxValueLength) { KafkaRecord item = new KafkaRecord(topicId); - setProperty(KafkaRecord.Fields.PARTITION, include, rec::partition, item::setPartition); - setProperty(KafkaRecord.Fields.OFFSET, include, rec::offset, item::setOffset); - setProperty(KafkaRecord.Fields.TIMESTAMP, include, () -> Instant.ofEpochMilli(rec.timestamp()), item::setTimestamp); - setProperty(KafkaRecord.Fields.TIMESTAMP_TYPE, include, rec.timestampType()::name, item::setTimestampType); - setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.setKey(bytesToString(k, maxValueLength))); - setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength))); - setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders); - setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::setSize); + setProperty(KafkaRecord.Fields.PARTITION, include, rec::partition, item::partition); + setProperty(KafkaRecord.Fields.OFFSET, include, rec::offset, item::offset); + setProperty(KafkaRecord.Fields.TIMESTAMP, include, () -> Instant.ofEpochMilli(rec.timestamp()), item::timestamp); + setProperty(KafkaRecord.Fields.TIMESTAMP_TYPE, include, rec.timestampType()::name, item::timestampType); + setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.key(k.dataString(maxValueLength))); + setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.value(v.dataString(maxValueLength))); + setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::headers); + setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::size); + + maybeRelate(rec.key()).ifPresent(item::keySchema); + maybeRelate(rec.value()).ifPresent(item::valueSchema); return item; } + Optional maybeRelate(RecordData data) { + return Optional.ofNullable(data) + .map(d -> d.meta) + .filter(recordMeta -> recordMeta.containsKey("schema-id")) + .map(recordMeta -> { + String artifactType = recordMeta.get("schema-type"); + String clusterIdEnc = Base64.getUrlEncoder().encodeToString(kafkaContext.clusterId().getBytes()); + String schemaId = clusterIdEnc + '.' + recordMeta.get("schema-id"); + String name = recordMeta.get("schema-name"); + + var relationship = new JsonApiRelationship(); + relationship.addMeta("artifactType", artifactType); + relationship.addMeta("name", name); + relationship.data(new Identifier("schemas", schemaId)); + relationship.addLink("content", "/api/schemas/" + schemaId); + + return relationship; + }) + .filter(Objects::nonNull); + } + void setProperty(String fieldName, List include, Supplier source, java.util.function.Consumer target) { if (include.contains(fieldName)) { - target.accept(source.get()); + T value = source.get(); + if (value != null) { + target.accept(value); + } } } - String bytesToString(byte[] bytes, Integer maxValueLength) { - if (bytes == null) { - return null; - } - - if (bytes.length == 0) { - return ""; - } - - int bufferSize = maxValueLength != null ? Math.min(maxValueLength, bytes.length) : bytes.length; - StringBuilder buffer = new StringBuilder(bufferSize); - - try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) { - int input; + Map headersToMap(Headers headers, Integer maxValueLength) { + Map headerMap = new LinkedHashMap<>(); + headers.iterator().forEachRemaining(h -> headerMap.put(h.key(), headerValue(h, maxValueLength))); + return headerMap; + } - while ((input = reader.read()) > -1) { - if (input == REPLACEMENT_CHARACTER || !Character.isDefined(input)) { - return BINARY_DATA_MESSAGE; - } + static String headerValue(Header header, Integer maxValueLength) { + byte[] value = header.value(); - buffer.append((char) input); + if (value != null) { + int length; - if (maxValueLength != null && buffer.length() == maxValueLength) { - break; - } + if (maxValueLength == null) { + length = value.length; + } else { + length = Integer.min(maxValueLength, value.length); } - return buffer.toString(); - } catch (IOException e) { - return BINARY_DATA_MESSAGE; + return new String(value, 0, length); } + + return null; } - Map headersToMap(Headers headers, Integer maxValueLength) { - Map headerMap = new LinkedHashMap<>(); - headers.iterator().forEachRemaining(h -> headerMap.put(h.key(), bytesToString(h.value(), maxValueLength))); - return headerMap; + static long sizeOf(RecordMetadata meta, Headers headers) { + return sizeOf(meta.serializedKeySize(), meta.serializedValueSize(), headers); + } + + static long sizeOf(ConsumerRecord rec) { + return sizeOf(rec.serializedKeySize(), rec.serializedValueSize(), rec.headers()); } - long sizeOf(ConsumerRecord rec) { - return rec.serializedKeySize() + - rec.serializedValueSize() + - Arrays.stream(rec.headers().toArray()) - .mapToLong(h -> h.key().length() + h.value().length) + static long sizeOf(int keySize, int valueSize, Headers headers) { + return keySize + valueSize + Arrays.stream(headers.toArray()) + .mapToLong(h -> h.key().length() + (h.value() != null ? h.value().length : 0)) .sum(); } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index 2c505f1f4..51dec27ac 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -1,6 +1,7 @@ package com.github.streamshub.console.api.support; import java.io.Closeable; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -9,13 +10,22 @@ import java.util.stream.Stream; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.config.SaslConfigs; +import org.jboss.logging.Logger; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.streamshub.console.api.support.serdes.ForceCloseable; +import com.github.streamshub.console.api.support.serdes.MultiformatDeserializer; +import com.github.streamshub.console.api.support.serdes.MultiformatSerializer; import com.github.streamshub.console.config.KafkaClusterConfig; +import io.apicurio.registry.rest.client.RegistryClient; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec; import io.strimzi.api.kafka.model.kafka.KafkaSpec; +import io.strimzi.api.kafka.model.kafka.KafkaStatus; import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth; import io.strimzi.kafka.oauth.client.ClientConfig; @@ -23,12 +33,14 @@ public class KafkaContext implements Closeable { public static final KafkaContext EMPTY = new KafkaContext(null, null, Collections.emptyMap(), null); + private static final Logger LOGGER = Logger.getLogger(KafkaContext.class); final KafkaClusterConfig clusterConfig; final Kafka resource; final Map, Map> configs; final Admin admin; boolean applicationScoped; + SchemaRegistryContext schemaRegistryContext; public KafkaContext(KafkaClusterConfig clusterConfig, Kafka resource, Map, Map> configs, Admin admin) { this.clusterConfig = clusterConfig; @@ -41,6 +53,13 @@ public KafkaContext(KafkaClusterConfig clusterConfig, Kafka resource, Map kafkaResource) { + return Optional.ofNullable(clusterConfig.getId()) + .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) + .orElseGet(clusterConfig::getName); } @Override @@ -66,6 +85,17 @@ public void close() { if (admin != null) { admin.close(); } + if (applicationScoped && schemaRegistryContext != null) { + try { + schemaRegistryContext.close(); + } catch (IOException e) { + LOGGER.warnf("Exception closing schema registry context: %s", e.getMessage()); + } + } + } + + public String clusterId() { + return clusterId(clusterConfig, Optional.ofNullable(resource)); } public KafkaClusterConfig clusterConfig() { @@ -92,6 +122,14 @@ public boolean applicationScoped() { return applicationScoped; } + public void schemaRegistryClient(RegistryClient schemaRegistryClient) { + schemaRegistryContext = new SchemaRegistryContext(schemaRegistryClient); + } + + public SchemaRegistryContext schemaRegistryContext() { + return schemaRegistryContext; + } + public String saslMechanism(Class clientType) { return configs(clientType).get(SaslConfigs.SASL_MECHANISM) instanceof String auth ? auth : ""; } @@ -115,4 +153,71 @@ public Optional tokenUrl() { .map(KafkaListenerAuthenticationOAuth.class::cast) .map(KafkaListenerAuthenticationOAuth::getTokenEndpointUri)); } + + public class SchemaRegistryContext implements Closeable { + private final RegistryClient registryClient; + private final MultiformatDeserializer keyDeserializer; + private final MultiformatDeserializer valueDeserializer; + private final MultiformatSerializer keySerializer; + private final MultiformatSerializer valueSerializer; + + SchemaRegistryContext(RegistryClient schemaRegistryClient) { + this.registryClient = schemaRegistryClient; + + ObjectMapper objectMapper = new ObjectMapper(); + + keyDeserializer = new MultiformatDeserializer(registryClient, objectMapper); + keyDeserializer.configure(configs(Consumer.class), true); + + valueDeserializer = new MultiformatDeserializer(registryClient, objectMapper); + valueDeserializer.configure(configs(Consumer.class), false); + + keySerializer = new MultiformatSerializer(registryClient, objectMapper); + keySerializer.configure(configs(Producer.class), true); + + valueSerializer = new MultiformatSerializer(registryClient, objectMapper); + valueSerializer.configure(configs(Producer.class), false); + } + + public RegistryClient registryClient() { + return registryClient; + } + + public MultiformatDeserializer keyDeserializer() { + return keyDeserializer; + } + + public MultiformatDeserializer valueDeserializer() { + return valueDeserializer; + } + + public MultiformatSerializer keySerializer() { + return keySerializer; + } + + public MultiformatSerializer valueSerializer() { + return valueSerializer; + } + + @Override + public void close() throws IOException { + if (registryClient != null) { + // nothing to close otherwise + closeOptionally("key deserializer", keyDeserializer); + closeOptionally("value deserializer", valueDeserializer); + closeOptionally("key serializer", keySerializer); + closeOptionally("value serializer", valueSerializer); + } + } + + private void closeOptionally(String name, ForceCloseable closeable) { + if (closeable != null) { + try { + closeable.forceClose(); + } catch (Exception e) { + LOGGER.infof("Exception closing resource %s: %s", name, e.getMessage()); + } + } + } + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ArtifactReferences.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ArtifactReferences.java new file mode 100644 index 000000000..74cff29c4 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ArtifactReferences.java @@ -0,0 +1,91 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Base64; + +import jakarta.ws.rs.NotFoundException; + +import org.jboss.logging.Logger; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.apicurio.registry.resolver.strategy.ArtifactReference; + +public class ArtifactReferences { + + private static final Logger LOGGER = Logger.getLogger(ArtifactReferences.class); + private static final String GLOBAL_ID = "globalId"; + private static final String CONTENT_ID = "contentId"; + private static final String GROUP_ID = "groupId"; + private static final String ARTIFACT_ID = "artifactId"; + private static final String VERSION = "version"; + + private ArtifactReferences() { + } + + public static String toSchemaId(ArtifactReference reference, ObjectMapper objectMapper) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + try (var generator = objectMapper.createGenerator(out)) { + generator.writeStartObject(); + putEntry(generator, GLOBAL_ID, reference.getGlobalId()); + putEntry(generator, CONTENT_ID, reference.getContentId()); + putEntry(generator, GROUP_ID, reference.getGroupId()); + putEntry(generator, ARTIFACT_ID, reference.getArtifactId()); + putEntry(generator, VERSION, reference.getVersion()); + generator.writeEndObject(); + generator.flush(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return Base64.getUrlEncoder().encodeToString(out.toByteArray()); + } + + public static ArtifactReference fromSchemaId(String schemaId, ObjectMapper objectMapper) { + JsonNode id; + + try { + InputStream in = new ByteArrayInputStream(Base64.getUrlDecoder().decode(schemaId)); + id = objectMapper.readTree(in); + } catch (IOException e) { + LOGGER.debugf("Failed to decode or parse schemaId '%s': %s", schemaId, e.getMessage()); + throw new NotFoundException("No such schema"); + } + + var builder = ArtifactReference.builder(); + + if (id.has(GLOBAL_ID)) { + builder.globalId(id.get(GLOBAL_ID).asLong()); + } + if (id.has(CONTENT_ID)) { + builder.contentId(id.get(CONTENT_ID).asLong()); + } + if (id.has(GROUP_ID)) { + builder.groupId(id.get(GROUP_ID).asText()); + } + if (id.has(ARTIFACT_ID)) { + builder.artifactId(id.get(ARTIFACT_ID).asText()); + } + if (id.has(VERSION)) { + builder.version(id.get(VERSION).asText()); + } + + return builder.build(); + } + + static void putEntry(JsonGenerator generator, String key, Object value) throws IOException { + if (value instanceof String str) { + generator.writeStringField(key, str); + } else if (value instanceof Long nbr && nbr != 0) { + generator.writeNumberField(key, nbr); + } + } + +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java new file mode 100644 index 000000000..f486ecb2d --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java @@ -0,0 +1,72 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; + +import io.apicurio.registry.serde.avro.DefaultAvroDatumProvider; + +public class AvroDatumProvider extends DefaultAvroDatumProvider { + @Override + public DatumWriter createDatumWriter(RecordData data, Schema schema) { + GenericDatumWriter writer = new GenericDatumWriter<>(schema); + + return new DatumWriter() { + @Override + public void write(RecordData data, org.apache.avro.io.Encoder out) throws IOException { + final DatumReader reader = new GenericDatumReader<>(schema); + final InputStream dataStream = new ByteArrayInputStream(data.data); + final Decoder jsonDecoder = DecoderFactory.get().jsonDecoder(schema, dataStream); + final Object datum = reader.read(null, jsonDecoder); + writer.write(datum, out); + + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, buffer); + writer.write(datum, jsonEncoder); + jsonEncoder.flush(); + data.data = buffer.toByteArray(); + } + + @Override + public void setSchema(Schema schema) { + // No-op + } + }; + } + + @Override + public DatumReader createDatumReader(Schema schema) { + GenericDatumReader target = new GenericDatumReader<>(schema); + + return new DatumReader() { + @Override + public RecordData read(RecordData reuse, Decoder in) throws IOException { + final Object datum = target.read(reuse, in); + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + final DatumWriter writer = new GenericDatumWriter<>(schema); + final Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, buffer); + writer.write(datum, jsonEncoder); + jsonEncoder.flush(); + + return new RecordData(buffer.toByteArray(), null); + } + + @Override + public void setSchema(Schema schema) { + // No-op + } + }; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java new file mode 100644 index 000000000..906c997b8 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java @@ -0,0 +1,31 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.nio.ByteBuffer; + +import org.apache.avro.Schema; +import org.apache.kafka.common.header.Headers; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.avro.AvroKafkaDeserializer; + +class AvroDeserializer extends AvroKafkaDeserializer { + AvroDeserializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + @Override + public RecordData readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + return super.readData(schema, buffer, start, length); + } + + @Override + public RecordData readData(Headers headers, + ParsedSchema schema, + ByteBuffer buffer, + int start, + int length) { + return super.readData(headers, schema, buffer, start, length); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ForceCloseable.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ForceCloseable.java new file mode 100644 index 000000000..905590d1a --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ForceCloseable.java @@ -0,0 +1,9 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.IOException; + +public interface ForceCloseable { + + void forceClose() throws IOException; + +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java new file mode 100644 index 000000000..05266c516 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java @@ -0,0 +1,239 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.kafka.common.header.Headers; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.DefaultSchemaResolver; +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaLookupResult; +import io.apicurio.registry.resolver.SchemaParser; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.resolver.strategy.ArtifactReference; +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.serde.AbstractKafkaDeserializer; +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig; +import io.apicurio.registry.serde.config.BaseKafkaSerDeConfig; +import io.apicurio.registry.types.ArtifactType; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +public class MultiformatDeserializer extends AbstractKafkaDeserializer implements ForceCloseable { + + private static final SchemaLookupResult EMPTY_RESULT = SchemaLookupResult.builder().build(); + + private final ObjectMapper objectMapper; + AvroDeserializer avroDeserializer; + ProtobufDeserializer protobufDeserializer; + SchemaParser parser; + + public MultiformatDeserializer(RegistryClient client, ObjectMapper objectMapper) { + super(); + this.objectMapper = objectMapper; + + if (client != null) { + setSchemaResolver(newResolver(client)); + avroDeserializer = new AvroDeserializer(newResolver(client)); + protobufDeserializer = new ProtobufDeserializer(newResolver(client)); + } + } + + static SchemaResolver newResolver(RegistryClient client) { + var resolver = new DefaultSchemaResolver(); + resolver.setClient(client); + return resolver; + } + + @Override + public void configure(Map configs, boolean isKey) { + if (getSchemaResolver() == null) { + // Do not attempt to configure anything if we will not be making remote calls to registry + return; + } + + Map avroConfigs = new HashMap<>(configs); + avroConfigs.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, AvroDatumProvider.class.getName()); + avroDeserializer.configure(avroConfigs, isKey); + + Map protobufConfigs = new HashMap<>(configs); + protobufConfigs.put(SerdeConfig.DESERIALIZER_SPECIFIC_KEY_RETURN_CLASS, DynamicMessage.class); + protobufConfigs.put(SerdeConfig.DESERIALIZER_SPECIFIC_VALUE_RETURN_CLASS, DynamicMessage.class); + protobufDeserializer.configure(protobufConfigs, isKey); + + parser = new MultiformatSchemaParser<>(Set.of( + avroDeserializer.schemaParser(), + protobufDeserializer.schemaParser() + )); + + super.configure(new BaseKafkaSerDeConfig(configs), isKey); + } + + @Override + public void close() { + // don't close - deserializer will be reused + } + + public void forceClose() { + super.close(); + } + + @Override + public SchemaParser schemaParser() { + return parser; + } + + protected RecordData readData(Headers headers, SchemaLookupResult schemaResult, ByteBuffer buffer, int start, int length) { + ParsedSchema schema = Optional.ofNullable(schemaResult) + .map(s -> s.getParsedSchema()) + .orElse(null); + Object parsedSchema = null; + + if (schemaResult != null && schemaResult.getParsedSchema() != null) { + parsedSchema = schemaResult.getParsedSchema().getParsedSchema(); + } + + RecordData result; + + if (parsedSchema instanceof Schema avroSchema) { + try { + if (headers != null) { + result = avroDeserializer.readData(headers, cast(schema), buffer, start, length); + } else { + result = avroDeserializer.readData(cast(schema), buffer, start, length); + } + result.schema = schema; + result.meta.put("schema-type", ArtifactType.AVRO); + result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper)); + result.meta.put("schema-name", avroSchema.getFullName()); + } catch (Exception e) { + result = new RecordData(null, schema); + result.meta.put("error", e.getMessage()); + } + } else if (parsedSchema instanceof ProtobufSchema) { + try { + Message msg; + if (headers != null) { + msg = protobufDeserializer.readData(headers, cast(schema), buffer, start, length); + } else { + msg = protobufDeserializer.readData(cast(schema), buffer, start, length); + } + byte[] data = com.google.protobuf.util.JsonFormat.printer() + .omittingInsignificantWhitespace() + .print(msg) + .getBytes(); + result = new RecordData(data, schema); + result.meta.put("schema-type", ArtifactType.PROTOBUF); + result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper)); + result.meta.put("schema-name", msg.getDescriptorForType().getFullName()); + } catch (Exception e) { + result = new RecordData(null, schema); + result.meta.put("error", e.getMessage()); + } + } else { + byte[] bytes = new byte[length]; + System.arraycopy(buffer.array(), start, bytes, 0, length); + result = new RecordData(bytes, null); + } + + return result; + } + + @SuppressWarnings("unchecked") + static T cast(Object object) { + return (T) object; + } + + @Override + public RecordData deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + ByteBuffer buffer; + SchemaLookupResult schema; + int length; + + if (data[0] == MAGIC_BYTE) { + buffer = getByteBuffer(data); + ArtifactReference artifactReference = getIdHandler().readId(buffer); + schema = resolve(artifactReference); + length = buffer.limit() - 1 - getIdHandler().idSize(); + } else { + buffer = ByteBuffer.wrap(data); + // Empty schema + schema = EMPTY_RESULT; + length = buffer.limit() - 1; + } + + int start = buffer.position() + buffer.arrayOffset(); + + return readData(null, schema, buffer, start, length); + } + + @Override + public RecordData deserialize(String topic, Headers headers, byte[] data) { + if (data == null) { + return null; + } + ArtifactReference artifactReference = null; + if (headersHandler != null && headers != null) { + artifactReference = headersHandler.readHeaders(headers); + + if (artifactReference.hasValue()) { + return readData(headers, data, artifactReference); + } + } + + if (headers == null) { + return deserialize(topic, data); + } else { + //try to read data even if artifactReference has no value, maybe there is a fallbackArtifactProvider configured + return readData(headers, data, artifactReference); + } + } + + private RecordData readData(Headers headers, byte[] data, ArtifactReference artifactReference) { + SchemaLookupResult schema = resolve(artifactReference); + + ByteBuffer buffer = ByteBuffer.wrap(data); + int length = buffer.limit(); + int start = buffer.position(); + + return readData(headers, schema, buffer, start, length); + } + + private SchemaLookupResult resolve(ArtifactReference artifactReference) { + var schemaResolver = getSchemaResolver(); + + if (schemaResolver == null) { + // Empty result + return EMPTY_RESULT; + } + + try { + return getSchemaResolver().resolveSchemaByArtifactReference(artifactReference); + } catch (RuntimeException e) { + // Empty result + return EMPTY_RESULT; + } + } + + @Override + protected RecordData readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + throw new UnsupportedOperationException(); + } + + @Override + protected RecordData readData(Headers headers, ParsedSchema schema, ByteBuffer buffer, int start, int length) { + throw new UnsupportedOperationException(); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java new file mode 100644 index 000000000..a7ccf57bf --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java @@ -0,0 +1,114 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaParser; +import io.apicurio.registry.resolver.data.Record; +import io.apicurio.registry.types.ArtifactType; + +public class MultiformatSchemaParser implements SchemaParser { + + private static final int CACHE_LIMIT = 20; + private static final char[] PROTOBUF_INTRO = "message ".toCharArray(); + + private final Map> delegates; + private final Map schemaCache = new LinkedHashMap<>(CACHE_LIMIT); + + public MultiformatSchemaParser(Set> delegates) { + this.delegates = delegates.stream() + .map(p -> Map.entry(p.artifactType(), p)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public String artifactType() { + throw new UnsupportedOperationException("MultiformatSchemaParser#artifactType()"); + } + + @Override + public Object parseSchema(byte[] rawSchema, Map> resolvedReferences) { + if (schemaCache.containsKey(rawSchema)) { + return schemaCache.get(rawSchema); + } + + Object parsedSchema = loadSchema(rawSchema, resolvedReferences); + + if (schemaCache.size() == CACHE_LIMIT) { + // Remove the oldest entry + var iter = schemaCache.entrySet().iterator(); + iter.next(); + iter.remove(); + } + + schemaCache.put(rawSchema, parsedSchema); + + return parsedSchema; + } + + @Override + public boolean supportsExtractSchemaFromData() { + return false; + } + + @Override + public ParsedSchema getSchemaFromData(Record data) { + throw new UnsupportedOperationException("MultiformatSchemaParser#getSchemaFromData(Record)"); + } + + @Override + public ParsedSchema getSchemaFromData(Record data, boolean dereference) { + throw new UnsupportedOperationException("MultiformatSchemaParser#getSchemaFromData(Record,boolean)"); + } + + @SuppressWarnings("unchecked") + Object loadSchema(byte[] rawSchema, Map> resolvedReferences) { + Object parsedSchema = null; + SchemaParser delegate = null; + + if (delegates.containsKey(ArtifactType.PROTOBUF) && looksLikeProtobuf(rawSchema)) { + delegate = (SchemaParser) delegates.get(ArtifactType.PROTOBUF); + } else if (delegates.containsKey(ArtifactType.AVRO)) { + delegate = (SchemaParser) delegates.get(ArtifactType.AVRO); + } + + if (delegate != null) { + try { + parsedSchema = delegate.parseSchema(rawSchema, resolvedReferences); + } catch (Exception e) { + // Schema is not valid, will be cached as null + } + } + + return parsedSchema; + } + + boolean looksLikeProtobuf(byte[] rawSchema) { + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(rawSchema))) { + int input; + + while ((input = reader.read()) != -1) { + if (Character.isWhitespace(input)) { + continue; + } + char[] buffer = new char[8]; + buffer[0] = (char) input; + + if (reader.read(buffer, 1, 7) == 7 && Arrays.equals(PROTOBUF_INTRO, buffer)) { + return true; + } + } + } catch (IOException e) { + // Ignore + } + return false; + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java new file mode 100644 index 000000000..4e29199bc --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java @@ -0,0 +1,239 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import jakarta.ws.rs.BadRequestException; + +import org.apache.avro.Schema; +import org.apache.kafka.common.header.Headers; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.DefaultSchemaResolver; +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaLookupResult; +import io.apicurio.registry.resolver.SchemaParser; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.resolver.SchemaResolverConfig; +import io.apicurio.registry.resolver.data.Record; +import io.apicurio.registry.resolver.strategy.ArtifactReference; +import io.apicurio.registry.resolver.strategy.ArtifactReferenceResolverStrategy; +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.serde.AbstractKafkaSerializer; +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig; +import io.apicurio.registry.serde.avro.AvroKafkaSerializer; +import io.apicurio.registry.serde.config.BaseKafkaSerDeConfig; +import io.apicurio.registry.serde.data.KafkaSerdeMetadata; +import io.apicurio.registry.serde.data.KafkaSerdeRecord; +import io.apicurio.registry.types.ArtifactType; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +public class MultiformatSerializer extends AbstractKafkaSerializer implements ArtifactReferenceResolverStrategy, ForceCloseable { + + private static final SchemaLookupResult EMPTY_RESULT = SchemaLookupResult.builder().build(); + + final ObjectMapper objectMapper; + AvroKafkaSerializer avroSerializer; + ProtobufSerializer protobufSerializer; + SchemaParser parser; + + public MultiformatSerializer(RegistryClient client, ObjectMapper objectMapper) { + super(); + this.objectMapper = objectMapper; + + if (client != null) { + setSchemaResolver(newResolver(client)); + avroSerializer = new AvroKafkaSerializer<>(); + avroSerializer.setSchemaResolver(newResolver(client)); + protobufSerializer = new ProtobufSerializer(newResolver(client)); + } + } + + static SchemaResolver newResolver(RegistryClient client) { + var resolver = new DefaultSchemaResolver(); + resolver.setClient(client); + return resolver; + } + + @Override + public void configure(Map configs, boolean isKey) { + if (getSchemaResolver() == null) { + // Do not attempt to configure anything if we will not be making remote calls to registry + return; + } + + Map serConfigs = new HashMap<>(configs); + serConfigs.put(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY, this); + + Map avroConfigs = new HashMap<>(serConfigs); + avroConfigs.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, AvroDatumProvider.class.getName()); + avroConfigs.put(SchemaResolverConfig.FIND_LATEST_ARTIFACT, Boolean.TRUE); + avroSerializer.configure(avroConfigs, isKey); + + Map protobufConfigs = new HashMap<>(serConfigs); + protobufConfigs.put(SchemaResolverConfig.FIND_LATEST_ARTIFACT, Boolean.TRUE); + protobufConfigs.put(SerdeConfig.VALIDATION_ENABLED, Boolean.TRUE); + protobufSerializer.configure(protobufConfigs, isKey); + + parser = new MultiformatSchemaParser<>(Set.of( + avroSerializer.schemaParser(), + protobufSerializer.schemaParser() + )); + + super.configure(new BaseKafkaSerDeConfig(serConfigs), isKey); + } + + @Override + public void close() { + // don't close - serializer will be reused + } + + public void forceClose() { + super.close(); + } + + @Override + public SchemaParser schemaParser() { + return parser; + } + + @Override + public byte[] serialize(String topic, Headers headers, RecordData data) { + // just return null + if (data == null) { + return null; // NOSONAR - we want to return null and not an empty array + } + + SchemaLookupResult schema = resolveSchema(topic, headers, data); + Object parsedSchema = null; + + if (schema != null && schema.getParsedSchema() != null) { + parsedSchema = schema.getParsedSchema().getParsedSchema(); + } + + byte[] serialized; + + if (parsedSchema instanceof Schema avroSchema) { + try { + serialized = avroSerializer.serialize(topic, headers, data); + setSchemaMeta(data, schema, ArtifactType.AVRO, avroSchema.getFullName()); + } catch (Exception e) { + throw new BadRequestException(e.getMessage(), e); + } + } else if (parsedSchema instanceof ProtobufSchema protobufSchema) { + Message msg; + String schemaRef = schemaMeta(data, "schema-gav").orElseThrow(); // we know it's non-null because we have a schema + String messageType = schemaMeta(data, "message-type").orElse(null); + Descriptor descriptor; + + if (messageType != null) { + descriptor = protobufSchema.getFileDescriptor().findMessageTypeByName(messageType); + if (descriptor == null) { + throw new BadRequestException("No such message type %s for schema %s" + .formatted(messageType, schemaRef)); + } + } else if (protobufSchema.getFileDescriptor().getMessageTypes().size() == 1) { + descriptor = protobufSchema.getFileDescriptor().getMessageTypes().get(0); + } else { + throw new BadRequestException("Unable to determine message type to use from schema %s" + .formatted(schemaRef)); + } + + try { + var builder = DynamicMessage.newBuilder(descriptor); + com.google.protobuf.util.JsonFormat.parser() + .ignoringUnknownFields() + .merge(data.dataString(null), builder); + msg = builder.build(); + } catch (InvalidProtocolBufferException e) { + throw new BadRequestException(e.getMessage(), e); + } + + serialized = protobufSerializer.serialize(headers, msg, cast(schema)); + setSchemaMeta(data, schema, ArtifactType.PROTOBUF, descriptor.getFullName()); + } else { + data.meta.remove("schema"); // Remove schema meta so it is not returned with 201 response + serialized = data.data; + } + + return serialized; + } + + SchemaLookupResult resolveSchema(String topic, Headers headers, RecordData data) { + if (getSchemaResolver() == null) { + return EMPTY_RESULT; + } + + KafkaSerdeMetadata resolverMetadata = new KafkaSerdeMetadata(topic, isKey(), headers); + var reference = artifactReference(new KafkaSerdeRecord<>(resolverMetadata, data), null); + SchemaLookupResult schema = null; + + if (reference != null) { + try { + schema = getSchemaResolver().resolveSchemaByArtifactReference(reference); + } catch (Exception e) { + schema = EMPTY_RESULT; + } + } + + return schema; + } + + @SuppressWarnings("unchecked") + static T cast(Object object) { + return (T) object; + } + + private void setSchemaMeta(RecordData data, SchemaLookupResult schema, String type, String name) { + String id = ArtifactReferences.toSchemaId(schema.toArtifactReference(), objectMapper); + + data.meta.put("schema-type", type); + data.meta.put("schema-id", id); + data.meta.put("schema-name", name); + } + + @Override + public ArtifactReference artifactReference(Record data, ParsedSchema parsedSchema) { + KafkaSerdeRecord kdata = (KafkaSerdeRecord) data; + RecordData rData = kdata.payload(); + + return schemaMeta(rData, "schema-gav") + .map(schemaRef -> { + String[] gav = schemaRef.split(":"); + return ArtifactReference.builder() + .groupId(gav[0]) + .artifactId(gav[1]) + .version(gav.length > 2 ? gav[2] : null) + .build(); + }) + .orElse(null); + } + + private Optional schemaMeta(RecordData data, String metaProperty) { + return Optional.ofNullable(data.meta.get(metaProperty)); + } + + @Override + public boolean loadSchema() { + return false; + } + + @Override + protected void serializeData(ParsedSchema schema, RecordData data, OutputStream out) { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeData(Headers headers, ParsedSchema schema, RecordData data, OutputStream out) { + throw new UnsupportedOperationException(); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java new file mode 100644 index 000000000..1fe3f5d55 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java @@ -0,0 +1,33 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.nio.ByteBuffer; + +import org.apache.kafka.common.header.Headers; + +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.ParsedSchema; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +class ProtobufDeserializer extends ProtobufKafkaDeserializer { + ProtobufDeserializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + @Override + public Message readData(ParsedSchema schema, ByteBuffer buffer, int start, int length) { + return super.readData(schema, buffer, start, length); + } + + @Override + public Message readData(Headers headers, + ParsedSchema schema, + ByteBuffer buffer, + int start, + int length) { + return super.readData(headers, schema, buffer, start, length); + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java new file mode 100644 index 000000000..c82e793d1 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java @@ -0,0 +1,44 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; + +import org.apache.kafka.common.header.Headers; + +import com.google.protobuf.Message; + +import io.apicurio.registry.resolver.SchemaLookupResult; +import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; + +class ProtobufSerializer extends ProtobufKafkaSerializer { + ProtobufSerializer(SchemaResolver schemaResolver) { + super(); + setSchemaResolver(schemaResolver); + } + + public byte[] serialize(Headers headers, Message data, SchemaLookupResult schema) { + // just return null + if (data == null) { + return null; // NOSONAR + } + + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + if (headersHandler != null && headers != null) { + headersHandler.writeHeaders(headers, schema.toArtifactReference()); + serializeData(headers, schema.getParsedSchema(), data, out); + } else { + out.write(MAGIC_BYTE); + getIdHandler().writeId(schema.toArtifactReference(), out); + serializeData(schema.getParsedSchema(), data, out); + } + return out.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java new file mode 100644 index 000000000..4f47f8c83 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java @@ -0,0 +1,79 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; + +import io.apicurio.registry.resolver.ParsedSchema; + +public class RecordData { + + public static final String BINARY_DATA_MESSAGE = "Binary or non-UTF-8 encoded data cannot be displayed"; + static final int REPLACEMENT_CHARACTER = '\uFFFD'; + + public final Map meta = new LinkedHashMap<>(1); + byte[] data; + ParsedSchema schema; + + public RecordData(byte[] data, ParsedSchema schema) { + super(); + this.data = data; + this.schema = schema; + } + + public RecordData(byte[] data) { + super(); + this.data = data; + this.schema = null; + } + + public RecordData(String data) { + this(data != null ? data.getBytes(StandardCharsets.UTF_8) : null); + } + + public ParsedSchema schema() { + return schema; + } + + public String dataString(Integer maxValueLength) { + return bytesToString(data, maxValueLength); + } + + public static String bytesToString(byte[] bytes, Integer maxValueLength) { + if (bytes == null) { + return null; + } + + if (bytes.length == 0) { + return ""; + } + + int bufferSize = maxValueLength != null ? Math.min(maxValueLength, bytes.length) : bytes.length; + StringBuilder buffer = new StringBuilder(bufferSize); + + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) { + int input; + + while ((input = reader.read()) > -1) { + if (input == REPLACEMENT_CHARACTER || !Character.isDefined(input)) { + return BINARY_DATA_MESSAGE; + } + + buffer.append((char) input); + + if (maxValueLength != null && buffer.length() == maxValueLength) { + break; + } + } + + return buffer.toString(); + } catch (IOException e) { + return BINARY_DATA_MESSAGE; + } + } + +} diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index 0378be55a..ab851f035 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -25,6 +25,7 @@ quarkus.kafka.devservices.enabled=false quarkus.kubernetes-client.devservices.enabled=false quarkus.devservices.enabled=false +quarkus.vertx.event-loops-pool-size=5 quarkus.vertx.max-event-loop-execute-time=4000 mp.openapi.scan.disable=false @@ -67,10 +68,16 @@ console.kafka.admin.default.api.timeout.ms=10000 %dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF %dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG +# %dev.quarkus.apicurio-registry.devservices.enabled=true +# %dev.apicurio.rest.client.disable-auto-basepath-append=true +# %dev.quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.6.x-release + ######## %testplain.quarkus.devservices.enabled=true %testplain.quarkus.kubernetes-client.devservices.enabled=true %testplain.quarkus.kubernetes-client.devservices.override-kubeconfig=true +%testplain.quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.6.x-release +%testplain.apicurio.rest.client.disable-auto-basepath-append=true #%testplain.quarkus.http.auth.proactive=false #%testplain.quarkus.http.auth.permission."oidc".policy=permit diff --git a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java index a8285ac6a..3b5e31f81 100644 --- a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java @@ -1,5 +1,6 @@ package com.github.streamshub.console.api; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.URI; import java.security.NoSuchAlgorithmException; @@ -13,10 +14,12 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.stream.Stream; import jakarta.inject.Inject; import jakarta.json.Json; +import jakarta.json.JsonObject; import jakarta.json.JsonValue; import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MediaType; @@ -33,8 +36,10 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; +import org.skyscreamer.jsonassert.JSONAssert; -import com.github.streamshub.console.api.service.RecordService; +import com.github.streamshub.console.api.support.KafkaContext; +import com.github.streamshub.console.api.support.serdes.RecordData; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; @@ -42,10 +47,13 @@ import com.github.streamshub.console.test.TestHelper; import com.github.streamshub.console.test.TopicHelper; +import io.apicurio.registry.types.ArtifactType; import io.fabric8.kubernetes.client.KubernetesClient; import io.quarkus.test.common.http.TestHTTPEndpoint; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; +import io.restassured.RestAssured; +import io.restassured.response.ExtractableResponse; import io.strimzi.api.kafka.model.kafka.Kafka; import static com.github.streamshub.console.test.TestHelper.whenRequesting; @@ -59,6 +67,7 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; @QuarkusTest @TestHTTPEndpoint(RecordsResource.class) @@ -71,6 +80,9 @@ class RecordsResourceIT { @Inject ConsoleConfig consoleConfig; + @Inject + Map kafkaContexts; + @Inject KubernetesClient client; @@ -340,7 +352,11 @@ void testConsumeRecordsIncludeOnlyHeaders() { .assertThat() .statusCode(is(Status.OK.getStatusCode())) .body("data", hasSize(3)) - .body("data", everyItem(allOf(is(aMapWithSize(2)), hasEntry("type", "records")))) + .body("data", everyItem(allOf( + is(aMapWithSize(3)), + hasEntry("type", "records"), + hasKey("attributes"), + hasKey("relationships")))) .body("data.attributes.headers", everyItem(hasKey("h1"))); } @@ -376,7 +392,7 @@ void testConsumeRecordWithBinaryValue() throws NoSuchAlgorithmException { .assertThat() .statusCode(is(Status.OK.getStatusCode())) .body("data", hasSize(1)) - .body("data[0].attributes.value", is(equalTo(RecordService.BINARY_DATA_MESSAGE))); + .body("data[0].attributes.value", is(equalTo(RecordData.BINARY_DATA_MESSAGE))); } @ParameterizedTest @@ -582,4 +598,237 @@ void testProduceRecordWithInvalidPartition() { .body("errors[0].code", is("4003")) .body("errors[0].source.pointer", is("/data/attributes/partition")); } + + @Test + void testProduceRecordWithAvroFormat() { + var registryClient = kafkaContexts.get(clusterId1).schemaRegistryContext().registryClient(); + + final String keyArtifactId = UUID.randomUUID().toString().replace("-", ""); + final String keySchema = """ + { + "namespace": "console.avro", + "type": "record", + "name": "name_%s", + "fields": [ + { + "name": "key1", + "type": "string" + } + ] + } + """ + .formatted(keyArtifactId); + + final String valueArtifactId = UUID.randomUUID().toString().replace("-", ""); + final String valueSchema = """ + { + "namespace": "console.avro", + "type": "record", + "name": "name_%s", + "fields": [ + { + "name": "value1", + "type": "string" + } + ] + } + """ + .formatted(valueArtifactId); + + registryClient.createArtifact("default", keyArtifactId, ArtifactType.AVRO, new ByteArrayInputStream(keySchema + .getBytes())); + + registryClient.createArtifact("default", valueArtifactId, ArtifactType.AVRO, new ByteArrayInputStream(valueSchema + .getBytes())); + + + final String topicName = UUID.randomUUID().toString(); + var topicIds = topicUtils.createTopics(clusterId1, List.of(topicName), 1); + JsonObject requestBody = Json.createObjectBuilder() + .add("data", Json.createObjectBuilder() + .add("type", "records") + .add("relationships", Json.createObjectBuilder() + .add("keySchema", Json.createObjectBuilder() + .add("meta", Json.createObjectBuilder() + .add("coordinates", "default:" + keyArtifactId))) + .add("valueSchema", Json.createObjectBuilder() + .add("meta", Json.createObjectBuilder() + .add("coordinates", "default:" + valueArtifactId)))) + .add("attributes", Json.createObjectBuilder() + .add("key", """ + { + "key1": "value-of-key1", + "field2": "field-not-in-the-schema" + }""") + .add("value", """ + { + "value1": "value-of-value1", + "field2": "field-not-in-the-schema" + }"""))) + .build(); + + whenRequesting(req -> req + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON) + .body(requestBody.toString()) + .post("", clusterId1, topicIds.get(topicName))) + .assertThat() + .statusCode(is(Status.CREATED.getStatusCode())) + .body("data.attributes.partition", is(0)) + .body("data.attributes.offset", is(0)) + .body("data.relationships.keySchema.meta.artifactType", is(ArtifactType.AVRO)) + .body("data.relationships.keySchema.meta.name", + // fully qualified + is("console.avro.name_" + keyArtifactId)) + .body("data.relationships.valueSchema.meta.artifactType", is(ArtifactType.AVRO)) + .body("data.relationships.valueSchema.meta.name", + // fully qualified + is("console.avro.name_" + valueArtifactId)); + + var recordsResponse = whenRequesting(req -> req.get("", clusterId1, topicIds.get(topicName))) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data", hasSize(1)) + .body("data[0].attributes.partition", is(0)) + .body("data[0].attributes.offset", is(0)) + .body("data[0].relationships.keySchema.meta.artifactType", is(ArtifactType.AVRO)) + .body("data[0].relationships.keySchema.meta.name", + // fully qualified + is("console.avro.name_" + keyArtifactId)) + .body("data[0].relationships.valueSchema.meta.artifactType", is(ArtifactType.AVRO)) + .body("data[0].relationships.valueSchema.meta.name", + // fully qualified + is("console.avro.name_" + valueArtifactId)) + .body("data[0].attributes.key", is(""" + {"key1":"value-of-key1"}""")) + .body("data[0].attributes.value", is(""" + {"value1":"value-of-value1"}""")) + .extract(); + + assertSchemaContent(keySchema, valueSchema, recordsResponse, (exp, act) -> { + JSONAssert.assertEquals(exp, act, true); + }); + } + + @Test + void testProduceRecordWithProtobufFormat() { + var registryClient = kafkaContexts.get(clusterId1).schemaRegistryContext().registryClient(); + + final String keyArtifactId = UUID.randomUUID().toString().replace("-", ""); + final String keySchema = """ + message name_%s { + optional string key1 = 1; + } + """ + .formatted(keyArtifactId); + + final String valueArtifactId = UUID.randomUUID().toString().replace("-", ""); + final String valueSchema = """ + message some_other_name_%s { + optional string field1 = 1; + } + message name_%s { + optional string value1 = 1; + } + """ + .formatted(valueArtifactId, valueArtifactId); + + registryClient.createArtifact("default", keyArtifactId, ArtifactType.PROTOBUF, new ByteArrayInputStream(keySchema + .getBytes())); + + registryClient.createArtifact("default", valueArtifactId, ArtifactType.PROTOBUF, new ByteArrayInputStream(valueSchema + .getBytes())); + + + final String topicName = UUID.randomUUID().toString(); + var topicIds = topicUtils.createTopics(clusterId1, List.of(topicName), 1); + JsonObject requestBody = Json.createObjectBuilder() + .add("data", Json.createObjectBuilder() + .add("type", "records") + .add("relationships", Json.createObjectBuilder() + .add("keySchema", Json.createObjectBuilder() + .add("meta", Json.createObjectBuilder() + // messageType omitted since there is only 1 in the key schema + .add("coordinates", "default:" + keyArtifactId))) + .add("valueSchema", Json.createObjectBuilder() + .add("meta", Json.createObjectBuilder() + .add("coordinates", "default:" + valueArtifactId) + .add("messageType", "name_" + valueArtifactId)))) + .add("attributes", Json.createObjectBuilder() + .add("key", """ + { + "key1": "value-of-key1", + "field2": "field-not-in-the-schema" + }""") + .add("value", """ + { + "value1": "value-of-value1", + "field2": "field-not-in-the-schema" + }"""))) + .build(); + + whenRequesting(req -> req + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON) + .body(requestBody.toString()) + .post("", clusterId1, topicIds.get(topicName))) + .assertThat() + .statusCode(is(Status.CREATED.getStatusCode())) + .body("data.attributes.partition", is(0)) + .body("data.attributes.offset", is(0)) + .body("data.relationships.keySchema.meta.artifactType", is(ArtifactType.PROTOBUF)) + .body("data.relationships.keySchema.meta.name", is("name_" + keyArtifactId)) + .body("data.relationships.valueSchema.meta.artifactType", is(ArtifactType.PROTOBUF)) + .body("data.relationships.valueSchema.meta.name", is("name_" + valueArtifactId)); + + var recordsResponse = whenRequesting(req -> req.get("", clusterId1, topicIds.get(topicName))) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data", hasSize(1)) + .body("data[0].attributes.partition", is(0)) + .body("data[0].attributes.offset", is(0)) + .body("data[0].relationships.keySchema.meta.artifactType", is(ArtifactType.PROTOBUF)) + .body("data[0].relationships.keySchema.meta.name", is("name_" + keyArtifactId)) + .body("data[0].attributes.key", is(""" + {"key1":"value-of-key1"}""")) + .body("data[0].relationships.valueSchema.meta.artifactType", is(ArtifactType.PROTOBUF)) + .body("data[0].relationships.valueSchema.meta.name", is("name_" + valueArtifactId)) + .body("data[0].attributes.value", is(""" + {"value1":"value-of-value1"}""")) + .extract(); + + assertSchemaContent(keySchema, valueSchema, recordsResponse, (exp, act) -> { + assertEquals(exp, act); + }); + } + + private void assertSchemaContent( + String expectedKeySchema, + String expectedValueSchema, + ExtractableResponse response, + BiConsumer assertion) { + String keySchemaLink = response.jsonPath().getString("data[0].relationships.keySchema.links.content"); + + String actualKeySchema = RestAssured.given() + .basePath(keySchemaLink) + .when().get() + .then() + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .extract() + .asString(); + + assertion.accept(expectedKeySchema, actualKeySchema); + + String valueSchemaLink = response.jsonPath().getString("data[0].relationships.valueSchema.links.content"); + + String actualValueSchema = RestAssured.given() + .basePath(valueSchemaLink) + .when().get() + .then() + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .extract() + .asString(); + + assertion.accept(expectedValueSchema, actualValueSchema); + } } diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java index 2f1b5e290..607bd0f65 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java @@ -41,6 +41,12 @@ public Map getConfigOverrides() { - name: test-kafka1 namespace: default id: k1-id + schemaRegistry: + ### + # This is the property used by Dev Services for Apicurio Registry + # https://quarkus.io/guides/apicurio-registry-dev-services + ### + url: ${mp.messaging.connector.smallrye-kafka.apicurio.registry.url} properties: bootstrap.servers: ${console.test.external-bootstrap} - name: test-kafka2 diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index b5a8334a9..ca78ce5a8 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -14,6 +14,7 @@ public class KafkaClusterConfig { private String name; private String namespace; private String listener; + private SchemaRegistryConfig schemaRegistry; private Map properties = new LinkedHashMap<>(); private Map adminProperties = new LinkedHashMap<>(); private Map consumerProperties = new LinkedHashMap<>(); @@ -61,6 +62,14 @@ public void setListener(String listener) { this.listener = listener; } + public SchemaRegistryConfig getSchemaRegistry() { + return schemaRegistry; + } + + public void setSchemaRegistry(SchemaRegistryConfig schemaRegistry) { + this.schemaRegistry = schemaRegistry; + } + public Map getProperties() { return properties; } diff --git a/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java b/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java new file mode 100644 index 000000000..0bcff0b4c --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java @@ -0,0 +1,17 @@ +package com.github.streamshub.console.config; + +import jakarta.validation.constraints.NotBlank; + +public class SchemaRegistryConfig { + + @NotBlank(message = "Schema registry `url` is required") + String url; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } +} diff --git a/pom.xml b/pom.xml index 77727c45f..60eca8bea 100644 --- a/pom.xml +++ b/pom.xml @@ -19,6 +19,7 @@ 3.15.1 0.43.0 0.15.0 + 2.6.2.Final 3.0