Skip to content

Commit

Permalink
Apicurio Registry integration with Avro+Protobuf ser/des support
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 11, 2024
1 parent 38a23c7 commit ceaf4a2
Show file tree
Hide file tree
Showing 19 changed files with 1,455 additions and 277 deletions.
28 changes: 25 additions & 3 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -84,6 +84,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-annotation</artifactId>
Expand All @@ -109,6 +113,24 @@
<artifactId>kafka-oauth-client</artifactId>
</dependency>

<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-jsonschema-serde</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-protobuf-serde</artifactId>
<version>2.5.8.Final</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -53,6 +54,8 @@
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand All @@ -65,9 +68,11 @@
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
import com.github.streamshub.console.api.support.serdes.RecordData;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;

import io.apicurio.registry.serde.SerdeConfig;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
Expand Down Expand Up @@ -420,6 +425,7 @@ Map<String, Object> requiredConsumerConfig() {
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
configs.put(SerdeConfig.ENABLE_HEADERS, "true");
return configs;
}

Expand Down Expand Up @@ -520,26 +526,16 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk

@Produces
@RequestScoped
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public BiFunction<Deserializer<RecordData>, Deserializer<RecordData>, Consumer<RecordData, RecordData>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
return () -> client;
}

public void consumerDisposer(@Disposes Supplier<Consumer<byte[], byte[]>> consumer) {
consumer.get().close();
return (keyDeser, valueDeser) -> new KafkaConsumer<>(configs, keyDeser, valueDeser); // NOSONAR / closed in consumerDisposer
}

@Produces
@RequestScoped
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public BiFunction<Serializer<RecordData>, Serializer<RecordData>, Producer<RecordData, RecordData>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
Producer<String, String> client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
return () -> client;
}

public void producerDisposer(@Disposes Supplier<Producer<String, String>> producer) {
producer.get().close();
return (keySer, valueSer) -> new KafkaProducer<>(configs, keySer, valueSer); // NOSONAR / closed by service code
}

Map<String, Object> maybeAuthenticate(KafkaContext context, Class<?> clientType) {
Expand Down Expand Up @@ -571,6 +567,12 @@ Map<String, Object> buildConfig(Set<String> configNames,
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new));

// Ensure no given properties are skipped. The previous stream processing allows
// for the standard config names to be obtained from the given maps, but also from
// config overrides via MicroProfile Config.
clientProperties.get().forEach(cfg::putIfAbsent);
config.getProperties().forEach(cfg::putIfAbsent);

var listenerSpec = cluster.map(Kafka::getSpec)
.map(KafkaSpec::getKafka)
.map(KafkaClusterSpec::getListeners)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public class RecordsResource {
summary = "Consume records from a topic",
description = "Consume a limited number of records from a topic, optionally specifying a partition and an absolute offset or timestamp as the starting point for message retrieval.")
@APIResponseSchema(
value = KafkaRecord.ListResponse.class,
value = KafkaRecord.KafkaRecordDataList.class,
responseDescription = "List of records matching the request query parameters.")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response consumeRecords(
public CompletionStage<Response> consumeRecords(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,
Expand All @@ -98,7 +98,9 @@ public Response consumeRecords(
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE,
KafkaRecord.Fields.SIZE
KafkaRecord.Fields.SIZE,
KafkaRecord.Fields.KEY_SCHEMA,
KafkaRecord.Fields.VALUE_SCHEMA,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -116,15 +118,27 @@ public Response consumeRecords(
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE,
KafkaRecord.Fields.SIZE
KafkaRecord.Fields.SIZE,
KafkaRecord.Fields.KEY_SCHEMA,
KafkaRecord.Fields.VALUE_SCHEMA,
}))
List<String> fields) {

requestedFields.accept(fields);
var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength());

CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");
return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build();

return recordService.consumeRecords(
topicId,
params.getPartition(),
params.getOffset(),
params.getTimestamp(),
params.getLimit(),
fields,
params.getMaxValueLength())
.thenApply(KafkaRecord.KafkaRecordDataList::new)
.thenApply(Response::ok)
.thenApply(response -> response.cacheControl(noStore))
.thenApply(Response.ResponseBuilder::build);
}

@POST
Expand All @@ -135,7 +149,7 @@ public Response consumeRecords(
description = "Produce (write) a single record to a topic")
@APIResponseSchema(
responseCode = "201",
value = KafkaRecord.KafkaRecordDocument.class,
value = KafkaRecord.KafkaRecordData.class,
responseDescription = "Record was successfully sent to the topic")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
Expand All @@ -151,20 +165,19 @@ public CompletionStage<Response> produceRecord(
String topicId,

@Valid
KafkaRecord.KafkaRecordDocument message) {
KafkaRecord.KafkaRecordData message) {

final UriBuilder location = uriInfo.getRequestUriBuilder();
requestedFields.accept(KafkaRecord.Fields.ALL);

return recordService.produceRecord(topicId, message.getData().getAttributes())
.thenApply(KafkaRecord.KafkaRecordDocument::new)
return recordService.produceRecord(topicId, message.getData())
.thenApply(KafkaRecord.KafkaRecordData::new)
.thenApply(entity -> Response.status(Status.CREATED)
.entity(entity)
.location(location
.queryParam("filter[partition]", entity.getData().getAttributes().getPartition())
.queryParam("filter[offset]", entity.getData().getAttributes().getOffset())
.queryParam("filter[partition]", entity.getData().partition())
.queryParam("filter[offset]", entity.getData().offset())
.build()))
.thenApply(Response.ResponseBuilder::build);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.github.streamshub.console.api;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Base64;
import java.util.Collections;
import java.util.Optional;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;

import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.openapi.annotations.media.Content;
import org.eclipse.microprofile.openapi.annotations.parameters.Parameter;
import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
import org.eclipse.microprofile.openapi.annotations.tags.Tag;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.streamshub.console.api.support.serdes.MultiformatSchemaParser;

import io.apicurio.registry.resolver.DefaultSchemaResolver;
import io.apicurio.registry.resolver.SchemaResolver;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.apicurio.registry.serde.strategy.ArtifactReference;

@Path("/api/schemas/{schemaId}")
@Tag(name = "Schema Registry Resources")
public class SchemasResource {

@Inject
ObjectMapper objectMapper;

SchemaResolver<Object, ?> schemaResolver;

@PostConstruct
void initialize() {
String registryUrl = ConfigProvider.getConfig().getOptionalValue("console.registry.endpoint", String.class)
// TODO: remove default
.orElse("http://localhost:9080");

RegistryClient registryClient = RegistryClientFactory.create(registryUrl);
schemaResolver = new DefaultSchemaResolver<>();
schemaResolver.setClient(registryClient);
schemaResolver.configure(Collections.emptyMap(), new MultiformatSchemaParser<>(Collections.emptySet()));
}

@GET
@Produces(MediaType.APPLICATION_JSON)
@APIResponse(responseCode = "200", ref = "Configurations", content = @Content())
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response describeConfigs(
@Parameter(description = "Schema identifier")
@PathParam("schemaId")
String schemaId) {



InputStream in = new ByteArrayInputStream(Base64.getUrlDecoder().decode(schemaId));
JsonNode id;

try {
id = objectMapper.readTree(in);
} catch (IOException e) {
throw new BadRequestException("Schema id could not be parsed");
}

var builder = ArtifactReference.builder();

if (id.has("globalId")) {
builder.globalId(id.get("globalId").asLong());
}
if (id.has("contentId")) {
builder.contentId(id.get("contentId").asLong());
}
if (id.has("groupId")) {
builder.groupId(id.get("groupId").asText());
}
if (id.has("artifactId")) {
builder.artifactId(id.get("artifactId").asText());
}
if (id.has("version")) {
builder.version(id.get("version").asText());
}

var schema = schemaResolver.resolveSchemaByArtifactReference(builder.build());

var response = Optional.ofNullable(schema)
.map(s -> s.getParsedSchema())
.map(s -> s.getRawSchema())
.map(Response::ok)
.orElseGet(() -> Response.status(Status.NOT_FOUND));

return response.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Base class for all JSON API request and response bodies.
Expand Down
Loading

0 comments on commit ceaf4a2

Please sign in to comment.