Skip to content

Commit

Permalink
poc: Avro message serialization/de-serialization
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 9, 2024
1 parent 45f2f5a commit 8d5ecb5
Show file tree
Hide file tree
Showing 17 changed files with 1,003 additions and 150 deletions.
28 changes: 25 additions & 3 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -84,6 +84,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-annotation</artifactId>
Expand All @@ -109,6 +113,24 @@
<artifactId>kafka-oauth-client</artifactId>
</dependency>

<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-jsonschema-serde</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-protobuf-serde</artifactId>
<version>2.5.8.Final</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -53,6 +54,8 @@
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand All @@ -65,9 +68,11 @@
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
import com.github.streamshub.console.api.support.serdes.RecordData;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;

import io.apicurio.registry.serde.SerdeConfig;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
Expand Down Expand Up @@ -420,6 +425,7 @@ Map<String, Object> requiredConsumerConfig() {
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
configs.put(SerdeConfig.ENABLE_HEADERS, "true");
return configs;
}

Expand Down Expand Up @@ -520,26 +526,16 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk

@Produces
@RequestScoped
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public BiFunction<Deserializer<RecordData>, Deserializer<RecordData>, Consumer<RecordData, RecordData>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
return () -> client;
}

public void consumerDisposer(@Disposes Supplier<Consumer<byte[], byte[]>> consumer) {
consumer.get().close();
return (keyDeser, valueDeser) -> new KafkaConsumer<>(configs, keyDeser, valueDeser); // NOSONAR / closed in consumerDisposer
}

@Produces
@RequestScoped
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public BiFunction<Serializer<RecordData>, Serializer<RecordData>, Producer<RecordData, RecordData>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
Producer<String, String> client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
return () -> client;
}

public void producerDisposer(@Disposes Supplier<Producer<String, String>> producer) {
producer.get().close();
return (keySer, valueSer) -> new KafkaProducer<>(configs, keySer, valueSer); // NOSONAR / closed by service code
}

Map<String, Object> maybeAuthenticate(KafkaContext context, Class<?> clientType) {
Expand Down Expand Up @@ -571,6 +567,12 @@ Map<String, Object> buildConfig(Set<String> configNames,
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new));

// Ensure no given properties are skipped. The previous stream processing allows
// for the standard config names to be obtained from the given maps, but also from
// config overrides via MicroProfile Config.
clientProperties.get().forEach(cfg::putIfAbsent);
config.getProperties().forEach(cfg::putIfAbsent);

var listenerSpec = cluster.map(Kafka::getSpec)
.map(KafkaSpec::getKafka)
.map(KafkaClusterSpec::getListeners)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class RecordsResource {
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response consumeRecords(
public CompletionStage<Response> consumeRecords(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,
Expand Down Expand Up @@ -121,10 +121,20 @@ public Response consumeRecords(
List<String> fields) {

requestedFields.accept(fields);
var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength());

CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");
return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build();

return recordService.consumeRecords(
topicId,
params.getPartition(),
params.getOffset(),
params.getTimestamp(),
params.getLimit(),
fields,
params.getMaxValueLength())
.thenApply(KafkaRecord.ListResponse::new)
.thenApply(Response::ok)
.thenApply(response -> response.cacheControl(noStore))
.thenApply(Response.ResponseBuilder::build);
}

@POST
Expand Down Expand Up @@ -156,7 +166,7 @@ public CompletionStage<Response> produceRecord(
final UriBuilder location = uriInfo.getRequestUriBuilder();
requestedFields.accept(KafkaRecord.Fields.ALL);

return recordService.produceRecord(topicId, message.getData().getAttributes())
return recordService.produceRecord(topicId, message.getData().getAttributes(), message.getData().getMeta())
.thenApply(KafkaRecord.KafkaRecordDocument::new)
.thenApply(entity -> Response.status(Status.CREATED)
.entity(entity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@
@Expression(
when = "self.rawTimestamp != null",
value = "self.rawOffset == null",
node = "filter[offset]",
node = RecordFilterParams.FILTER_OFFSET,
message = "Parameter `filter[offset]` must not be used when `filter[timestamp]` is present.",
payload = ErrorCategory.InvalidQueryParameter.class)
public class RecordFilterParams {

@QueryParam("filter[partition]")
static final String FILTER_PARTITION = "filter[partition]";
static final String FILTER_OFFSET = "filter[offset]";
static final String FILTER_TIMESTAMP = "filter[timestamp]";
static final String PAGE_SIZE = "page[size]";
static final String MAX_VALUE_LENGTH = "maxValueLength";

@QueryParam(FILTER_PARTITION)
@Parameter(
description = """
Retrieve messages only from the partition identified by this parameter.
Expand All @@ -39,23 +45,23 @@ public class RecordFilterParams {
value = "self.operator == 'eq'",
message = "unsupported filter operator, supported values: [ 'eq' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[partition]")
node = FILTER_PARTITION)
@Expression(
when = "self != null",
value = "self.operands.size() == 1",
message = "exactly 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[partition]")
node = FILTER_PARTITION)
@Expression(
when = "self != null && self.operator == 'eq' && self.operands.size() == 1",
value = "val = Integer.parseInt(self.firstOperand); val >= 0 && val <= Integer.MAX_VALUE",
exceptionalValue = ExceptionalValue.FALSE,
message = "operand must be an integer between 0 and " + Integer.MAX_VALUE + ", inclusive",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[partition]")
node = FILTER_PARTITION)
FetchFilter partition;

@QueryParam("filter[offset]")
@QueryParam(FILTER_OFFSET)
@Parameter(
description = """
Retrieve messages with an offset greater than or equal to the filter
Expand All @@ -80,23 +86,23 @@ public class RecordFilterParams {
exceptionalValue = ExceptionalValue.FALSE,
message = "unsupported filter operator, supported values: [ 'gte' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[offset]")
node = FILTER_OFFSET)
@Expression(
when = "self != null",
value = "self.operands.size() == 1",
message = "exactly 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[offset]")
node = FILTER_OFFSET)
@Expression(
when = "self != null && self.operator == 'gte'",
value = "val = Long.parseLong(self.firstOperand); val >= 0 && val <= Long.MAX_VALUE",
exceptionalValue = ExceptionalValue.FALSE,
message = "operand must be an integer between 0 and " + Long.MAX_VALUE + ", inclusive",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[offset]")
node = FILTER_OFFSET)
FetchFilter offset;

@QueryParam("filter[timestamp]")
@QueryParam(FILTER_TIMESTAMP)
@Parameter(
description = """
Retrieve messages with a timestamp greater than or equal to the filter
Expand All @@ -120,24 +126,24 @@ public class RecordFilterParams {
value = "self.operator == 'gte'",
message = "unsupported filter operator, supported values: [ 'gte' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[timestamp]")
node = FILTER_TIMESTAMP)
@Expression(
when = "self != null",
value = "self.operands.size() == 1",
message = "exactly 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[timestamp]")
node = FILTER_TIMESTAMP)
@Expression(
when = "self != null && self.operator == 'gte'",
classImports = "java.time.Instant",
value = "Instant.parse(self.firstOperand) >= Instant.EPOCH",
exceptionalValue = ExceptionalValue.FALSE,
message = "operand must be a valid RFC 3339 date-time no earlier than `1970-01-01T00:00:00Z`",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[timestamp]")
node = FILTER_TIMESTAMP)
FetchFilter timestamp;

@QueryParam("page[size]")
@QueryParam(PAGE_SIZE)
@DefaultValue(ListFetchParams.PAGE_SIZE_DEFAULT + "")
@Parameter(
description = "Limit the number of records fetched and returned",
Expand All @@ -152,10 +158,10 @@ public class RecordFilterParams {
exceptionalValue = ExceptionalValue.FALSE,
message = "must be an integer between 1 and " + ListFetchParams.PAGE_SIZE_MAX + ", inclusive",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "page[size]")
node = PAGE_SIZE)
String pageSize;

@QueryParam("maxValueLength")
@QueryParam(MAX_VALUE_LENGTH)
@Parameter(
description = """
Maximum length of string values returned in the response.
Expand All @@ -169,7 +175,7 @@ public class RecordFilterParams {
exceptionalValue = ExceptionalValue.FALSE,
message = "must be an integer between 1 and " + Integer.MAX_VALUE + ", inclusive",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "maxValueLength")
node = MAX_VALUE_LENGTH)
String maxValueLength;

public String getRawOffset() {
Expand Down
Loading

0 comments on commit 8d5ecb5

Please sign in to comment.