diff --git a/api/pom.xml b/api/pom.xml
index 2298d1bb5..e084f8a58 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -58,15 +58,15 @@
io.quarkus
- quarkus-resteasy-reactive
+ quarkus-rest
io.quarkus
- quarkus-resteasy-reactive-jackson
+ quarkus-rest-jackson
io.quarkus
- quarkus-rest-client-reactive
+ quarkus-rest-client
io.quarkus
@@ -84,6 +84,10 @@
io.quarkus
quarkus-kubernetes-client
+
+ io.quarkus
+ quarkus-apicurio-registry-avro
+
io.smallrye.common
smallrye-common-annotation
@@ -109,6 +113,20 @@
kafka-oauth-client
+
+ io.apicurio
+ apicurio-registry-serdes-avro-serde
+
+
+ io.apicurio
+ apicurio-registry-serdes-protobuf-serde
+ ${apicurio-registry.version}
+
+
+ com.google.protobuf
+ protobuf-java-util
+
+
com.fasterxml.jackson.core
jackson-annotations
@@ -358,6 +376,7 @@
${keycloak.image}
${strimzi-kafka.tag}
+ ${apicurio-registry.version}
org.jboss.logmanager.LogManager
${maven.home}
true
diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
index cccbd5e54..45f57cd64 100644
--- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
+++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
@@ -65,9 +65,13 @@
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
+import com.github.streamshub.console.api.support.serdes.RecordData;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;
+import io.apicurio.registry.rest.client.RegistryClient;
+import io.apicurio.registry.rest.client.RegistryClientFactory;
+import io.apicurio.registry.serde.SerdeConfig;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
@@ -192,6 +196,10 @@ public ConsoleConfig produceConsoleConfig() {
})
.map(consoleConfig -> {
consoleConfig.getKafka().getClusters().stream().forEach(cluster -> {
+ if (cluster.getSchemaRegistry() != null) {
+ var registryConfig = cluster.getSchemaRegistry();
+ registryConfig.setUrl(resolveValue(registryConfig.getUrl()));
+ }
resolveValues(cluster.getProperties());
resolveValues(cluster.getAdminProperties());
resolveValues(cluster.getProducerProperties());
@@ -277,7 +285,7 @@ public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) {
findConfig(kafka).ifPresentOrElse(
clusterConfig -> {
String clusterKey = clusterConfig.clusterKey();
- String clusterId = clusterId(clusterConfig, Optional.of(kafka));
+ String clusterId = KafkaContext.clusterId(clusterConfig, Optional.of(kafka));
log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId);
log.debugf("Known KafkaContext identifiers: %s", contexts.keySet());
KafkaContext previous = contexts.remove(clusterId);
@@ -337,7 +345,14 @@ void putKafkaContext(Map contexts,
}
String clusterKey = clusterConfig.clusterKey();
- String clusterId = clusterId(clusterConfig, kafkaResource);
+ String clusterId = KafkaContext.clusterId(clusterConfig, kafkaResource);
+
+ RegistryClient schemaRegistryClient = null;
+
+ if (clusterConfig.getSchemaRegistry() != null) {
+ String registryUrl = clusterConfig.getSchemaRegistry().getUrl();
+ schemaRegistryClient = RegistryClientFactory.create(registryUrl); // NOSONAR - closed elsewhere
+ }
if (!replace && contexts.containsKey(clusterId)) {
log.warnf("""
@@ -359,7 +374,10 @@ Connection requires trusted certificate(s) which are not present \
}
KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
+ ctx.schemaRegistryClient(schemaRegistryClient);
+
log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId);
+
KafkaContext previous = contexts.put(clusterId, ctx);
Optional.ofNullable(previous).ifPresent(KafkaContext::close);
}
@@ -370,12 +388,6 @@ boolean defaultedClusterId(KafkaClusterConfig clusterConfig, Optional kaf
return clusterConfig.getId() == null && kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId).isEmpty();
}
- String clusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource) {
- return Optional.ofNullable(clusterConfig.getId())
- .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId))
- .orElseGet(clusterConfig::getName);
- }
-
/**
* Checks whether the previous KafkaContext contained TLS trusted certificates, but due to them being
* removed from the Strimzi Kafka CR being in a transient state, they are no longer present. We will ignore
@@ -420,6 +432,7 @@ Map requiredConsumerConfig() {
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
+ configs.put(SerdeConfig.ENABLE_HEADERS, "true");
return configs;
}
@@ -520,26 +533,31 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
+ public Consumer consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
- Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
- return () -> client;
+
+ return new KafkaConsumer<>(
+ configs,
+ context.schemaRegistryContext().keyDeserializer(),
+ context.schemaRegistryContext().valueDeserializer());
}
- public void consumerDisposer(@Disposes Supplier> consumer) {
- consumer.get().close();
+ public void disposeConsumerSupplier(@Disposes Consumer consumer) {
+ consumer.close();
}
@Produces
@RequestScoped
- public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
+ public Producer producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
- Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
- return () -> client;
+ return new KafkaProducer<>(
+ configs,
+ context.schemaRegistryContext().keySerializer(),
+ context.schemaRegistryContext().valueSerializer());
}
- public void producerDisposer(@Disposes Supplier> producer) {
- producer.get().close();
+ public void disposeProducerSupplier(@Disposes Producer producer) {
+ producer.close();
}
Map maybeAuthenticate(KafkaContext context, Class> clientType) {
@@ -571,6 +589,12 @@ Map buildConfig(Set configNames,
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new));
+ // Ensure no given properties are skipped. The previous stream processing allows
+ // for the standard config names to be obtained from the given maps, but also from
+ // config overrides via MicroProfile Config.
+ clientProperties.get().forEach(cfg::putIfAbsent);
+ config.getProperties().forEach(cfg::putIfAbsent);
+
var listenerSpec = cluster.map(Kafka::getSpec)
.map(KafkaSpec::getKafka)
.map(KafkaClusterSpec::getListeners)
diff --git a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java
index ecd388ea5..a16498521 100644
--- a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java
+++ b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java
@@ -19,10 +19,10 @@
import jakarta.ws.rs.core.CacheControl;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
+import jakarta.ws.rs.core.Response.Status;
import jakarta.ws.rs.core.UriBuilder;
import jakarta.ws.rs.core.UriInfo;
import jakarta.ws.rs.ext.RuntimeDelegate;
-import jakarta.ws.rs.core.Response.Status;
import org.eclipse.microprofile.openapi.annotations.Operation;
import org.eclipse.microprofile.openapi.annotations.enums.Explode;
@@ -67,12 +67,12 @@ public class RecordsResource {
summary = "Consume records from a topic",
description = "Consume a limited number of records from a topic, optionally specifying a partition and an absolute offset or timestamp as the starting point for message retrieval.")
@APIResponseSchema(
- value = KafkaRecord.ListResponse.class,
+ value = KafkaRecord.KafkaRecordDataList.class,
responseDescription = "List of records matching the request query parameters.")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
- public Response consumeRecords(
+ public CompletionStage consumeRecords(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,
@@ -98,7 +98,9 @@ public Response consumeRecords(
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE,
- KafkaRecord.Fields.SIZE
+ KafkaRecord.Fields.SIZE,
+ KafkaRecord.Fields.KEY_SCHEMA,
+ KafkaRecord.Fields.VALUE_SCHEMA,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
@@ -116,15 +118,27 @@ public Response consumeRecords(
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE,
- KafkaRecord.Fields.SIZE
+ KafkaRecord.Fields.SIZE,
+ KafkaRecord.Fields.KEY_SCHEMA,
+ KafkaRecord.Fields.VALUE_SCHEMA,
}))
List fields) {
requestedFields.accept(fields);
- var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength());
-
CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");
- return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build();
+
+ return recordService.consumeRecords(
+ topicId,
+ params.getPartition(),
+ params.getOffset(),
+ params.getTimestamp(),
+ params.getLimit(),
+ fields,
+ params.getMaxValueLength())
+ .thenApply(KafkaRecord.KafkaRecordDataList::new)
+ .thenApply(Response::ok)
+ .thenApply(response -> response.cacheControl(noStore))
+ .thenApply(Response.ResponseBuilder::build);
}
@POST
@@ -135,7 +149,7 @@ public Response consumeRecords(
description = "Produce (write) a single record to a topic")
@APIResponseSchema(
responseCode = "201",
- value = KafkaRecord.KafkaRecordDocument.class,
+ value = KafkaRecord.KafkaRecordData.class,
responseDescription = "Record was successfully sent to the topic")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@@ -151,20 +165,19 @@ public CompletionStage produceRecord(
String topicId,
@Valid
- KafkaRecord.KafkaRecordDocument message) {
+ KafkaRecord.KafkaRecordData message) {
final UriBuilder location = uriInfo.getRequestUriBuilder();
requestedFields.accept(KafkaRecord.Fields.ALL);
- return recordService.produceRecord(topicId, message.getData().getAttributes())
- .thenApply(KafkaRecord.KafkaRecordDocument::new)
+ return recordService.produceRecord(topicId, message.getData())
+ .thenApply(KafkaRecord.KafkaRecordData::new)
.thenApply(entity -> Response.status(Status.CREATED)
.entity(entity)
.location(location
- .queryParam("filter[partition]", entity.getData().getAttributes().getPartition())
- .queryParam("filter[offset]", entity.getData().getAttributes().getOffset())
+ .queryParam("filter[partition]", entity.getData().partition())
+ .queryParam("filter[offset]", entity.getData().offset())
.build()))
.thenApply(Response.ResponseBuilder::build);
-
}
}
diff --git a/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java
new file mode 100644
index 000000000..bd44fd98d
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java
@@ -0,0 +1,93 @@
+package com.github.streamshub.console.api;
+
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.NotFoundException;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+
+import org.eclipse.microprofile.openapi.annotations.media.Content;
+import org.eclipse.microprofile.openapi.annotations.parameters.Parameter;
+import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
+import org.eclipse.microprofile.openapi.annotations.tags.Tag;
+import org.jboss.logging.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.streamshub.console.api.support.KafkaContext;
+import com.github.streamshub.console.api.support.serdes.ArtifactReferences;
+import com.github.streamshub.console.api.support.serdes.MultiformatSchemaParser;
+
+import io.apicurio.registry.resolver.DefaultSchemaResolver;
+import io.apicurio.registry.resolver.SchemaResolver;
+import io.apicurio.registry.rest.client.RegistryClient;
+
+@Path("/api/schemas/{schemaId}")
+@Tag(name = "Schema Registry Resources")
+public class SchemasResource {
+
+ @Inject
+ Logger logger;
+
+ @Inject
+ ObjectMapper objectMapper;
+
+ @Inject
+ Map kafkaContexts;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @APIResponse(responseCode = "200", ref = "Configurations", content = @Content())
+ @APIResponse(responseCode = "404", ref = "NotFound")
+ @APIResponse(responseCode = "500", ref = "ServerError")
+ @APIResponse(responseCode = "504", ref = "ServerTimeout")
+ public Response describeConfigs(
+ @Parameter(description = "Schema identifier")
+ @PathParam("schemaId")
+ String schemaId) {
+
+ int clusterTerm = schemaId.indexOf('.');
+
+ if (clusterTerm < 0) {
+ throw new NotFoundException("No such schema");
+ }
+
+ String clusterId = new String(Base64.getUrlDecoder().decode(schemaId.substring(0, clusterTerm)));
+
+ if (!kafkaContexts.containsKey(clusterId)) {
+ throw new NotFoundException("No such schema");
+ }
+
+ RegistryClient registryClient = kafkaContexts.get(clusterId).schemaRegistryContext().registryClient();
+
+ if (registryClient == null) {
+ throw new NotFoundException("Schema not found, no registry is configured");
+ }
+
+ @SuppressWarnings("resource")
+ SchemaResolver