Skip to content

Commit

Permalink
Apicurio Registry integration with Avro+Protobuf ser/des support
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 14, 2024
1 parent cf93f9c commit 532766e
Show file tree
Hide file tree
Showing 31 changed files with 1,992 additions and 339 deletions.
25 changes: 22 additions & 3 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -84,6 +84,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-annotation</artifactId>
Expand All @@ -109,6 +113,20 @@
<artifactId>kafka-oauth-client</artifactId>
</dependency>

<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-protobuf-serde</artifactId>
<version>${apicurio-registry.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -358,6 +376,7 @@
<systemProperties>
<keycloak.image>${keycloak.image}</keycloak.image>
<strimzi.test-container.kafka.custom.image>${strimzi-kafka.tag}</strimzi.test-container.kafka.custom.image>
<apicurio-registry.version>${apicurio-registry.version}</apicurio-registry.version>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
<quarkus.jacoco.reuse-data-file>true</quarkus.jacoco.reuse-data-file>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
import com.github.streamshub.console.api.support.serdes.RecordData;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;

import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.apicurio.registry.serde.SerdeConfig;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
Expand Down Expand Up @@ -192,6 +196,10 @@ public ConsoleConfig produceConsoleConfig() {
})
.map(consoleConfig -> {
consoleConfig.getKafka().getClusters().stream().forEach(cluster -> {
if (cluster.getSchemaRegistry() != null) {
var registryConfig = cluster.getSchemaRegistry();
registryConfig.setUrl(resolveValue(registryConfig.getUrl()));
}
resolveValues(cluster.getProperties());
resolveValues(cluster.getAdminProperties());
resolveValues(cluster.getProducerProperties());
Expand Down Expand Up @@ -277,7 +285,7 @@ public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) {
findConfig(kafka).ifPresentOrElse(
clusterConfig -> {
String clusterKey = clusterConfig.clusterKey();
String clusterId = clusterId(clusterConfig, Optional.of(kafka));
String clusterId = KafkaContext.clusterId(clusterConfig, Optional.of(kafka));
log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId);
log.debugf("Known KafkaContext identifiers: %s", contexts.keySet());
KafkaContext previous = contexts.remove(clusterId);
Expand Down Expand Up @@ -337,7 +345,14 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
}

String clusterKey = clusterConfig.clusterKey();
String clusterId = clusterId(clusterConfig, kafkaResource);
String clusterId = KafkaContext.clusterId(clusterConfig, kafkaResource);

RegistryClient schemaRegistryClient = null;

if (clusterConfig.getSchemaRegistry() != null) {
String registryUrl = clusterConfig.getSchemaRegistry().getUrl();
schemaRegistryClient = RegistryClientFactory.create(registryUrl); // NOSONAR - closed elsewhere
}

if (!replace && contexts.containsKey(clusterId)) {
log.warnf("""
Expand All @@ -359,7 +374,10 @@ Connection requires trusted certificate(s) which are not present \
}

KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
ctx.schemaRegistryClient(schemaRegistryClient);

log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId);

KafkaContext previous = contexts.put(clusterId, ctx);
Optional.ofNullable(previous).ifPresent(KafkaContext::close);
}
Expand All @@ -370,12 +388,6 @@ boolean defaultedClusterId(KafkaClusterConfig clusterConfig, Optional<Kafka> kaf
return clusterConfig.getId() == null && kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId).isEmpty();
}

String clusterId(KafkaClusterConfig clusterConfig, Optional<Kafka> kafkaResource) {
return Optional.ofNullable(clusterConfig.getId())
.or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId))
.orElseGet(clusterConfig::getName);
}

/**
* Checks whether the previous KafkaContext contained TLS trusted certificates, but due to them being
* removed from the Strimzi Kafka CR being in a transient state, they are no longer present. We will ignore
Expand Down Expand Up @@ -420,6 +432,7 @@ Map<String, Object> requiredConsumerConfig() {
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
configs.put(SerdeConfig.ENABLE_HEADERS, "true");
return configs;
}

Expand Down Expand Up @@ -520,26 +533,31 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk

@Produces
@RequestScoped
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public Consumer<RecordData, RecordData> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
return () -> client;

return new KafkaConsumer<>(
configs,
context.schemaRegistryContext().keyDeserializer(),
context.schemaRegistryContext().valueDeserializer());
}

public void consumerDisposer(@Disposes Supplier<Consumer<byte[], byte[]>> consumer) {
consumer.get().close();
public void disposeConsumerSupplier(@Disposes Consumer<RecordData, RecordData> consumer) {
consumer.close();
}

@Produces
@RequestScoped
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public Producer<RecordData, RecordData> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
Producer<String, String> client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
return () -> client;
return new KafkaProducer<>(
configs,
context.schemaRegistryContext().keySerializer(),
context.schemaRegistryContext().valueSerializer());
}

public void producerDisposer(@Disposes Supplier<Producer<String, String>> producer) {
producer.get().close();
public void disposeProducerSupplier(@Disposes Producer<RecordData, RecordData> producer) {
producer.close();
}

Map<String, Object> maybeAuthenticate(KafkaContext context, Class<?> clientType) {
Expand Down Expand Up @@ -571,6 +589,12 @@ Map<String, Object> buildConfig(Set<String> configNames,
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new));

// Ensure no given properties are skipped. The previous stream processing allows
// for the standard config names to be obtained from the given maps, but also from
// config overrides via MicroProfile Config.
clientProperties.get().forEach(cfg::putIfAbsent);
config.getProperties().forEach(cfg::putIfAbsent);

var listenerSpec = cluster.map(Kafka::getSpec)
.map(KafkaSpec::getKafka)
.map(KafkaClusterSpec::getListeners)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import jakarta.ws.rs.core.CacheControl;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import jakarta.ws.rs.core.UriBuilder;
import jakarta.ws.rs.core.UriInfo;
import jakarta.ws.rs.ext.RuntimeDelegate;
import jakarta.ws.rs.core.Response.Status;

import org.eclipse.microprofile.openapi.annotations.Operation;
import org.eclipse.microprofile.openapi.annotations.enums.Explode;
Expand Down Expand Up @@ -67,12 +67,12 @@ public class RecordsResource {
summary = "Consume records from a topic",
description = "Consume a limited number of records from a topic, optionally specifying a partition and an absolute offset or timestamp as the starting point for message retrieval.")
@APIResponseSchema(
value = KafkaRecord.ListResponse.class,
value = KafkaRecord.KafkaRecordDataList.class,
responseDescription = "List of records matching the request query parameters.")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response consumeRecords(
public CompletionStage<Response> consumeRecords(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,
Expand All @@ -98,7 +98,9 @@ public Response consumeRecords(
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE,
KafkaRecord.Fields.SIZE
KafkaRecord.Fields.SIZE,
KafkaRecord.Fields.KEY_SCHEMA,
KafkaRecord.Fields.VALUE_SCHEMA,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -116,15 +118,27 @@ public Response consumeRecords(
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE,
KafkaRecord.Fields.SIZE
KafkaRecord.Fields.SIZE,
KafkaRecord.Fields.KEY_SCHEMA,
KafkaRecord.Fields.VALUE_SCHEMA,
}))
List<String> fields) {

requestedFields.accept(fields);
var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength());

CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");
return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build();

return recordService.consumeRecords(
topicId,
params.getPartition(),
params.getOffset(),
params.getTimestamp(),
params.getLimit(),
fields,
params.getMaxValueLength())
.thenApply(KafkaRecord.KafkaRecordDataList::new)
.thenApply(Response::ok)
.thenApply(response -> response.cacheControl(noStore))
.thenApply(Response.ResponseBuilder::build);
}

@POST
Expand All @@ -135,7 +149,7 @@ public Response consumeRecords(
description = "Produce (write) a single record to a topic")
@APIResponseSchema(
responseCode = "201",
value = KafkaRecord.KafkaRecordDocument.class,
value = KafkaRecord.KafkaRecordData.class,
responseDescription = "Record was successfully sent to the topic")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
Expand All @@ -151,20 +165,19 @@ public CompletionStage<Response> produceRecord(
String topicId,

@Valid
KafkaRecord.KafkaRecordDocument message) {
KafkaRecord.KafkaRecordData message) {

final UriBuilder location = uriInfo.getRequestUriBuilder();
requestedFields.accept(KafkaRecord.Fields.ALL);

return recordService.produceRecord(topicId, message.getData().getAttributes())
.thenApply(KafkaRecord.KafkaRecordDocument::new)
return recordService.produceRecord(topicId, message.getData())
.thenApply(KafkaRecord.KafkaRecordData::new)
.thenApply(entity -> Response.status(Status.CREATED)
.entity(entity)
.location(location
.queryParam("filter[partition]", entity.getData().getAttributes().getPartition())
.queryParam("filter[offset]", entity.getData().getAttributes().getOffset())
.queryParam("filter[partition]", entity.getData().partition())
.queryParam("filter[offset]", entity.getData().offset())
.build()))
.thenApply(Response.ResponseBuilder::build);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.github.streamshub.console.api;

import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import org.eclipse.microprofile.openapi.annotations.media.Content;
import org.eclipse.microprofile.openapi.annotations.parameters.Parameter;
import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
import org.eclipse.microprofile.openapi.annotations.tags.Tag;
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.serdes.ArtifactReferences;
import com.github.streamshub.console.api.support.serdes.MultiformatSchemaParser;

import io.apicurio.registry.resolver.DefaultSchemaResolver;
import io.apicurio.registry.resolver.SchemaResolver;
import io.apicurio.registry.rest.client.RegistryClient;

@Path("/api/schemas/{schemaId}")
@Tag(name = "Schema Registry Resources")
public class SchemasResource {

@Inject
Logger logger;

@Inject
ObjectMapper objectMapper;

@Inject
Map<String, KafkaContext> kafkaContexts;

@GET
@Produces(MediaType.APPLICATION_JSON)
@APIResponse(responseCode = "200", ref = "Configurations", content = @Content())
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response describeConfigs(
@Parameter(description = "Schema identifier")
@PathParam("schemaId")
String schemaId) {

int clusterTerm = schemaId.indexOf('.');

if (clusterTerm < 0) {
throw new NotFoundException("No such schema");
}

String clusterId = new String(Base64.getUrlDecoder().decode(schemaId.substring(0, clusterTerm)));

if (!kafkaContexts.containsKey(clusterId)) {
throw new NotFoundException("No such schema");
}

RegistryClient registryClient = kafkaContexts.get(clusterId).schemaRegistryContext().registryClient();

if (registryClient == null) {
throw new NotFoundException("Schema not found, no registry is configured");
}

@SuppressWarnings("resource")
SchemaResolver<Object, ?> schemaResolver = new DefaultSchemaResolver<>();
schemaResolver.setClient(registryClient);
schemaResolver.configure(Collections.emptyMap(), new MultiformatSchemaParser<>(Collections.emptySet()));

schemaId = schemaId.substring(clusterTerm + 1);

var reference = ArtifactReferences.fromSchemaId(schemaId, objectMapper);
var schema = schemaResolver.resolveSchemaByArtifactReference(reference);

var response = Optional.ofNullable(schema)
.map(s -> s.getParsedSchema())
.map(s -> s.getRawSchema())
.map(Response::ok)
.orElseThrow(() -> new NotFoundException("No such schema"));

return response.build();
}

}
Loading

0 comments on commit 532766e

Please sign in to comment.