Skip to content

Commit

Permalink
Apicurio Registry integration with Avro/Protobuf ser/des support
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 10, 2024
1 parent 45f2f5a commit 9359be7
Show file tree
Hide file tree
Showing 17 changed files with 1,099 additions and 266 deletions.
28 changes: 25 additions & 3 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -84,6 +84,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-annotation</artifactId>
Expand All @@ -109,6 +113,24 @@
<artifactId>kafka-oauth-client</artifactId>
</dependency>

<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-jsonschema-serde</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-protobuf-serde</artifactId>
<version>2.5.8.Final</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -53,6 +54,8 @@
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand All @@ -65,9 +68,11 @@
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
import com.github.streamshub.console.api.support.serdes.RecordData;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;

import io.apicurio.registry.serde.SerdeConfig;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
Expand Down Expand Up @@ -420,6 +425,7 @@ Map<String, Object> requiredConsumerConfig() {
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
configs.put(SerdeConfig.ENABLE_HEADERS, "true");
return configs;
}

Expand Down Expand Up @@ -520,26 +526,16 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk

@Produces
@RequestScoped
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public BiFunction<Deserializer<RecordData>, Deserializer<RecordData>, Consumer<RecordData, RecordData>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
return () -> client;
}

public void consumerDisposer(@Disposes Supplier<Consumer<byte[], byte[]>> consumer) {
consumer.get().close();
return (keyDeser, valueDeser) -> new KafkaConsumer<>(configs, keyDeser, valueDeser); // NOSONAR / closed in consumerDisposer
}

@Produces
@RequestScoped
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public BiFunction<Serializer<RecordData>, Serializer<RecordData>, Producer<RecordData, RecordData>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
Producer<String, String> client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
return () -> client;
}

public void producerDisposer(@Disposes Supplier<Producer<String, String>> producer) {
producer.get().close();
return (keySer, valueSer) -> new KafkaProducer<>(configs, keySer, valueSer); // NOSONAR / closed by service code
}

Map<String, Object> maybeAuthenticate(KafkaContext context, Class<?> clientType) {
Expand Down Expand Up @@ -571,6 +567,12 @@ Map<String, Object> buildConfig(Set<String> configNames,
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new));

// Ensure no given properties are skipped. The previous stream processing allows
// for the standard config names to be obtained from the given maps, but also from
// config overrides via MicroProfile Config.
clientProperties.get().forEach(cfg::putIfAbsent);
config.getProperties().forEach(cfg::putIfAbsent);

var listenerSpec = cluster.map(Kafka::getSpec)
.map(KafkaSpec::getKafka)
.map(KafkaClusterSpec::getListeners)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public class RecordsResource {
summary = "Consume records from a topic",
description = "Consume a limited number of records from a topic, optionally specifying a partition and an absolute offset or timestamp as the starting point for message retrieval.")
@APIResponseSchema(
value = KafkaRecord.ListResponse.class,
value = KafkaRecord.KafkaRecordDataList.class,
responseDescription = "List of records matching the request query parameters.")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response consumeRecords(
public CompletionStage<Response> consumeRecords(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,
Expand Down Expand Up @@ -121,10 +121,20 @@ public Response consumeRecords(
List<String> fields) {

requestedFields.accept(fields);
var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength());

CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");
return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build();

return recordService.consumeRecords(
topicId,
params.getPartition(),
params.getOffset(),
params.getTimestamp(),
params.getLimit(),
fields,
params.getMaxValueLength())
.thenApply(KafkaRecord.KafkaRecordDataList::new)
.thenApply(Response::ok)
.thenApply(response -> response.cacheControl(noStore))
.thenApply(Response.ResponseBuilder::build);
}

@POST
Expand All @@ -135,7 +145,7 @@ public Response consumeRecords(
description = "Produce (write) a single record to a topic")
@APIResponseSchema(
responseCode = "201",
value = KafkaRecord.KafkaRecordDocument.class,
value = KafkaRecord.KafkaRecordData.class,
responseDescription = "Record was successfully sent to the topic")
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
Expand All @@ -151,20 +161,19 @@ public CompletionStage<Response> produceRecord(
String topicId,

@Valid
KafkaRecord.KafkaRecordDocument message) {
KafkaRecord.KafkaRecordData message) {

final UriBuilder location = uriInfo.getRequestUriBuilder();
requestedFields.accept(KafkaRecord.Fields.ALL);

return recordService.produceRecord(topicId, message.getData().getAttributes())
.thenApply(KafkaRecord.KafkaRecordDocument::new)
return recordService.produceRecord(topicId, message.getData())
.thenApply(KafkaRecord.KafkaRecordData::new)
.thenApply(entity -> Response.status(Status.CREATED)
.entity(entity)
.location(location
.queryParam("filter[partition]", entity.getData().getAttributes().getPartition())
.queryParam("filter[offset]", entity.getData().getAttributes().getOffset())
.queryParam("filter[partition]", entity.getData().partition())
.queryParam("filter[offset]", entity.getData().offset())
.build()))
.thenApply(Response.ResponseBuilder::build);

}
}
Loading

0 comments on commit 9359be7

Please sign in to comment.