diff --git a/api/pom.xml b/api/pom.xml
index 2298d1bb5..138db7487 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -58,15 +58,15 @@
io.quarkus
- quarkus-resteasy-reactive
+ quarkus-rest
io.quarkus
- quarkus-resteasy-reactive-jackson
+ quarkus-rest-jackson
io.quarkus
- quarkus-rest-client-reactive
+ quarkus-rest-client
io.quarkus
@@ -84,6 +84,10 @@
io.quarkus
quarkus-kubernetes-client
+
+ io.quarkus
+ quarkus-apicurio-registry-avro
+
io.smallrye.common
smallrye-common-annotation
@@ -109,6 +113,24 @@
kafka-oauth-client
+
+ io.apicurio
+ apicurio-registry-serdes-avro-serde
+
+
+ io.apicurio
+ apicurio-registry-serdes-jsonschema-serde
+
+
+ io.apicurio
+ apicurio-registry-serdes-protobuf-serde
+ 2.5.8.Final
+
+
+ com.google.protobuf
+ protobuf-java-util
+
+
com.fasterxml.jackson.core
jackson-annotations
diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
index cccbd5e54..91c78153b 100644
--- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
+++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
@@ -16,6 +16,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -53,6 +54,8 @@
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -65,9 +68,11 @@
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
+import com.github.streamshub.console.api.support.serdes.RecordData;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;
+import io.apicurio.registry.serde.SerdeConfig;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
@@ -420,6 +425,7 @@ Map requiredConsumerConfig() {
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
+ configs.put(SerdeConfig.ENABLE_HEADERS, "true");
return configs;
}
@@ -520,26 +526,16 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
+ public BiFunction, Deserializer, Consumer> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
- Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
- return () -> client;
- }
-
- public void consumerDisposer(@Disposes Supplier> consumer) {
- consumer.get().close();
+ return (keyDeser, valueDeser) -> new KafkaConsumer<>(configs, keyDeser, valueDeser); // NOSONAR / closed in consumerDisposer
}
@Produces
@RequestScoped
- public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
+ public BiFunction, Serializer, Producer> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
- Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
- return () -> client;
- }
-
- public void producerDisposer(@Disposes Supplier> producer) {
- producer.get().close();
+ return (keySer, valueSer) -> new KafkaProducer<>(configs, keySer, valueSer); // NOSONAR / closed by service code
}
Map maybeAuthenticate(KafkaContext context, Class> clientType) {
@@ -571,6 +567,12 @@ Map buildConfig(Set configNames,
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new));
+ // Ensure no given properties are skipped. The previous stream processing allows
+ // for the standard config names to be obtained from the given maps, but also from
+ // config overrides via MicroProfile Config.
+ clientProperties.get().forEach(cfg::putIfAbsent);
+ config.getProperties().forEach(cfg::putIfAbsent);
+
var listenerSpec = cluster.map(Kafka::getSpec)
.map(KafkaSpec::getKafka)
.map(KafkaClusterSpec::getListeners)
diff --git a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java
index ecd388ea5..a82df5829 100644
--- a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java
+++ b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java
@@ -67,12 +67,12 @@ public class RecordsResource {
summary = "Consume records from a topic",
description = "Consume a limited number of records from a topic, optionally specifying a partition and an absolute offset or timestamp as the starting point for message retrieval.")
@APIResponseSchema(
- value = KafkaRecord.ListResponse.class,
+ value = KafkaRecord.KafkaRecordDataList.class,
responseDescription = "List of records matching the request query parameters.")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
- public Response consumeRecords(
+ public CompletionStage consumeRecords(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,
@@ -98,7 +98,9 @@ public Response consumeRecords(
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE,
- KafkaRecord.Fields.SIZE
+ KafkaRecord.Fields.SIZE,
+ KafkaRecord.Fields.KEY_SCHEMA,
+ KafkaRecord.Fields.VALUE_SCHEMA,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
@@ -116,15 +118,27 @@ public Response consumeRecords(
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE,
- KafkaRecord.Fields.SIZE
+ KafkaRecord.Fields.SIZE,
+ KafkaRecord.Fields.KEY_SCHEMA,
+ KafkaRecord.Fields.VALUE_SCHEMA,
}))
List fields) {
requestedFields.accept(fields);
- var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength());
-
CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");
- return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build();
+
+ return recordService.consumeRecords(
+ topicId,
+ params.getPartition(),
+ params.getOffset(),
+ params.getTimestamp(),
+ params.getLimit(),
+ fields,
+ params.getMaxValueLength())
+ .thenApply(KafkaRecord.KafkaRecordDataList::new)
+ .thenApply(Response::ok)
+ .thenApply(response -> response.cacheControl(noStore))
+ .thenApply(Response.ResponseBuilder::build);
}
@POST
@@ -135,7 +149,7 @@ public Response consumeRecords(
description = "Produce (write) a single record to a topic")
@APIResponseSchema(
responseCode = "201",
- value = KafkaRecord.KafkaRecordDocument.class,
+ value = KafkaRecord.KafkaRecordData.class,
responseDescription = "Record was successfully sent to the topic")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@@ -151,20 +165,19 @@ public CompletionStage produceRecord(
String topicId,
@Valid
- KafkaRecord.KafkaRecordDocument message) {
+ KafkaRecord.KafkaRecordData message) {
final UriBuilder location = uriInfo.getRequestUriBuilder();
requestedFields.accept(KafkaRecord.Fields.ALL);
- return recordService.produceRecord(topicId, message.getData().getAttributes())
- .thenApply(KafkaRecord.KafkaRecordDocument::new)
+ return recordService.produceRecord(topicId, message.getData())
+ .thenApply(KafkaRecord.KafkaRecordData::new)
.thenApply(entity -> Response.status(Status.CREATED)
.entity(entity)
.location(location
- .queryParam("filter[partition]", entity.getData().getAttributes().getPartition())
- .queryParam("filter[offset]", entity.getData().getAttributes().getOffset())
+ .queryParam("filter[partition]", entity.getData().partition())
+ .queryParam("filter[offset]", entity.getData().offset())
.build()))
.thenApply(Response.ResponseBuilder::build);
-
}
}
diff --git a/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java
new file mode 100644
index 000000000..6182bb191
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java
@@ -0,0 +1,75 @@
+package com.github.streamshub.console.api;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.NotFoundException;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.eclipse.microprofile.openapi.annotations.media.Content;
+import org.eclipse.microprofile.openapi.annotations.parameters.Parameter;
+import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
+import org.eclipse.microprofile.openapi.annotations.tags.Tag;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.streamshub.console.api.support.serdes.ArtifactReferences;
+import com.github.streamshub.console.api.support.serdes.MultiformatSchemaParser;
+
+import io.apicurio.registry.resolver.DefaultSchemaResolver;
+import io.apicurio.registry.resolver.SchemaResolver;
+import io.apicurio.registry.rest.client.RegistryClient;
+import io.apicurio.registry.rest.client.RegistryClientFactory;
+
+@Path("/api/schemas/{schemaId}")
+@Tag(name = "Schema Registry Resources")
+public class SchemasResource {
+
+ @Inject
+ ObjectMapper objectMapper;
+
+ SchemaResolver