diff --git a/api/pom.xml b/api/pom.xml
index 2298d1bb5..138db7487 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -58,15 +58,15 @@
io.quarkus
- quarkus-resteasy-reactive
+ quarkus-rest
io.quarkus
- quarkus-resteasy-reactive-jackson
+ quarkus-rest-jackson
io.quarkus
- quarkus-rest-client-reactive
+ quarkus-rest-client
io.quarkus
@@ -84,6 +84,10 @@
io.quarkus
quarkus-kubernetes-client
+
+ io.quarkus
+ quarkus-apicurio-registry-avro
+
io.smallrye.common
smallrye-common-annotation
@@ -109,6 +113,24 @@
kafka-oauth-client
+
+ io.apicurio
+ apicurio-registry-serdes-avro-serde
+
+
+ io.apicurio
+ apicurio-registry-serdes-jsonschema-serde
+
+
+ io.apicurio
+ apicurio-registry-serdes-protobuf-serde
+ 2.5.8.Final
+
+
+ com.google.protobuf
+ protobuf-java-util
+
+
com.fasterxml.jackson.core
jackson-annotations
diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
index cccbd5e54..91c78153b 100644
--- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
+++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
@@ -16,6 +16,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -53,6 +54,8 @@
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -65,9 +68,11 @@
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
+import com.github.streamshub.console.api.support.serdes.RecordData;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;
+import io.apicurio.registry.serde.SerdeConfig;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
@@ -420,6 +425,7 @@ Map requiredConsumerConfig() {
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
+ configs.put(SerdeConfig.ENABLE_HEADERS, "true");
return configs;
}
@@ -520,26 +526,16 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
+ public BiFunction, Deserializer, Consumer> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
- Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
- return () -> client;
- }
-
- public void consumerDisposer(@Disposes Supplier> consumer) {
- consumer.get().close();
+ return (keyDeser, valueDeser) -> new KafkaConsumer<>(configs, keyDeser, valueDeser); // NOSONAR / closed in consumerDisposer
}
@Produces
@RequestScoped
- public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
+ public BiFunction, Serializer, Producer> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
- Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
- return () -> client;
- }
-
- public void producerDisposer(@Disposes Supplier> producer) {
- producer.get().close();
+ return (keySer, valueSer) -> new KafkaProducer<>(configs, keySer, valueSer); // NOSONAR / closed by service code
}
Map maybeAuthenticate(KafkaContext context, Class> clientType) {
@@ -571,6 +567,12 @@ Map buildConfig(Set configNames,
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new));
+ // Ensure no given properties are skipped. The previous stream processing allows
+ // for the standard config names to be obtained from the given maps, but also from
+ // config overrides via MicroProfile Config.
+ clientProperties.get().forEach(cfg::putIfAbsent);
+ config.getProperties().forEach(cfg::putIfAbsent);
+
var listenerSpec = cluster.map(Kafka::getSpec)
.map(KafkaSpec::getKafka)
.map(KafkaClusterSpec::getListeners)
diff --git a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java
index ecd388ea5..b922304cb 100644
--- a/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java
+++ b/api/src/main/java/com/github/streamshub/console/api/RecordsResource.java
@@ -72,7 +72,7 @@ public class RecordsResource {
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
- public Response consumeRecords(
+ public CompletionStage consumeRecords(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,
@@ -121,10 +121,20 @@ public Response consumeRecords(
List fields) {
requestedFields.accept(fields);
- var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength());
-
CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");
- return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build();
+
+ return recordService.consumeRecords(
+ topicId,
+ params.getPartition(),
+ params.getOffset(),
+ params.getTimestamp(),
+ params.getLimit(),
+ fields,
+ params.getMaxValueLength())
+ .thenApply(KafkaRecord.ListResponse::new)
+ .thenApply(Response::ok)
+ .thenApply(response -> response.cacheControl(noStore))
+ .thenApply(Response.ResponseBuilder::build);
}
@POST
@@ -156,7 +166,7 @@ public CompletionStage produceRecord(
final UriBuilder location = uriInfo.getRequestUriBuilder();
requestedFields.accept(KafkaRecord.Fields.ALL);
- return recordService.produceRecord(topicId, message.getData().getAttributes())
+ return recordService.produceRecord(topicId, message.getData().getAttributes(), message.getData().getMeta())
.thenApply(KafkaRecord.KafkaRecordDocument::new)
.thenApply(entity -> Response.status(Status.CREATED)
.entity(entity)
diff --git a/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java b/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java
index 75d0ad24e..7fd62f51e 100644
--- a/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java
+++ b/api/src/main/java/com/github/streamshub/console/api/model/RecordFilterParams.java
@@ -19,12 +19,18 @@
@Expression(
when = "self.rawTimestamp != null",
value = "self.rawOffset == null",
- node = "filter[offset]",
+ node = RecordFilterParams.FILTER_OFFSET,
message = "Parameter `filter[offset]` must not be used when `filter[timestamp]` is present.",
payload = ErrorCategory.InvalidQueryParameter.class)
public class RecordFilterParams {
- @QueryParam("filter[partition]")
+ static final String FILTER_PARTITION = "filter[partition]";
+ static final String FILTER_OFFSET = "filter[offset]";
+ static final String FILTER_TIMESTAMP = "filter[timestamp]";
+ static final String PAGE_SIZE = "page[size]";
+ static final String MAX_VALUE_LENGTH = "maxValueLength";
+
+ @QueryParam(FILTER_PARTITION)
@Parameter(
description = """
Retrieve messages only from the partition identified by this parameter.
@@ -39,23 +45,23 @@ public class RecordFilterParams {
value = "self.operator == 'eq'",
message = "unsupported filter operator, supported values: [ 'eq' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "filter[partition]")
+ node = FILTER_PARTITION)
@Expression(
when = "self != null",
value = "self.operands.size() == 1",
message = "exactly 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "filter[partition]")
+ node = FILTER_PARTITION)
@Expression(
when = "self != null && self.operator == 'eq' && self.operands.size() == 1",
value = "val = Integer.parseInt(self.firstOperand); val >= 0 && val <= Integer.MAX_VALUE",
exceptionalValue = ExceptionalValue.FALSE,
message = "operand must be an integer between 0 and " + Integer.MAX_VALUE + ", inclusive",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "filter[partition]")
+ node = FILTER_PARTITION)
FetchFilter partition;
- @QueryParam("filter[offset]")
+ @QueryParam(FILTER_OFFSET)
@Parameter(
description = """
Retrieve messages with an offset greater than or equal to the filter
@@ -80,23 +86,23 @@ public class RecordFilterParams {
exceptionalValue = ExceptionalValue.FALSE,
message = "unsupported filter operator, supported values: [ 'gte' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "filter[offset]")
+ node = FILTER_OFFSET)
@Expression(
when = "self != null",
value = "self.operands.size() == 1",
message = "exactly 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "filter[offset]")
+ node = FILTER_OFFSET)
@Expression(
when = "self != null && self.operator == 'gte'",
value = "val = Long.parseLong(self.firstOperand); val >= 0 && val <= Long.MAX_VALUE",
exceptionalValue = ExceptionalValue.FALSE,
message = "operand must be an integer between 0 and " + Long.MAX_VALUE + ", inclusive",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "filter[offset]")
+ node = FILTER_OFFSET)
FetchFilter offset;
- @QueryParam("filter[timestamp]")
+ @QueryParam(FILTER_TIMESTAMP)
@Parameter(
description = """
Retrieve messages with a timestamp greater than or equal to the filter
@@ -120,13 +126,13 @@ public class RecordFilterParams {
value = "self.operator == 'gte'",
message = "unsupported filter operator, supported values: [ 'gte' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "filter[timestamp]")
+ node = FILTER_TIMESTAMP)
@Expression(
when = "self != null",
value = "self.operands.size() == 1",
message = "exactly 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "filter[timestamp]")
+ node = FILTER_TIMESTAMP)
@Expression(
when = "self != null && self.operator == 'gte'",
classImports = "java.time.Instant",
@@ -134,10 +140,10 @@ public class RecordFilterParams {
exceptionalValue = ExceptionalValue.FALSE,
message = "operand must be a valid RFC 3339 date-time no earlier than `1970-01-01T00:00:00Z`",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "filter[timestamp]")
+ node = FILTER_TIMESTAMP)
FetchFilter timestamp;
- @QueryParam("page[size]")
+ @QueryParam(PAGE_SIZE)
@DefaultValue(ListFetchParams.PAGE_SIZE_DEFAULT + "")
@Parameter(
description = "Limit the number of records fetched and returned",
@@ -152,10 +158,10 @@ public class RecordFilterParams {
exceptionalValue = ExceptionalValue.FALSE,
message = "must be an integer between 1 and " + ListFetchParams.PAGE_SIZE_MAX + ", inclusive",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "page[size]")
+ node = PAGE_SIZE)
String pageSize;
- @QueryParam("maxValueLength")
+ @QueryParam(MAX_VALUE_LENGTH)
@Parameter(
description = """
Maximum length of string values returned in the response.
@@ -169,7 +175,7 @@ public class RecordFilterParams {
exceptionalValue = ExceptionalValue.FALSE,
message = "must be an integer between 1 and " + Integer.MAX_VALUE + ", inclusive",
payload = ErrorCategory.InvalidQueryParameter.class,
- node = "maxValueLength")
+ node = MAX_VALUE_LENGTH)
String maxValueLength;
public String getRawOffset() {
diff --git a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java
index 0b8c40a3d..c11f504ea 100644
--- a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java
+++ b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java
@@ -1,12 +1,8 @@
package com.github.streamshub.console.api.service;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -23,11 +19,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -38,6 +36,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@@ -45,21 +44,29 @@
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.context.ThreadContext;
import org.jboss.logging.Logger;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.streamshub.console.api.model.JsonApiMeta;
import com.github.streamshub.console.api.model.KafkaRecord;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.SizeLimitedSortedSet;
+import com.github.streamshub.console.api.support.serdes.MultiformatDeserializer;
+import com.github.streamshub.console.api.support.serdes.MultiformatSerializer;
+import com.github.streamshub.console.api.support.serdes.RecordData;
+
+import io.apicurio.registry.rest.client.RegistryClient;
+import io.apicurio.registry.rest.client.RegistryClientFactory;
import static java.util.Objects.requireNonNullElse;
@ApplicationScoped
public class RecordService {
- public static final String BINARY_DATA_MESSAGE = "Binary or non-UTF-8 encoded data cannot be displayed";
- static final int REPLACEMENT_CHARACTER = '\uFFFD';
-
@Inject
Logger logger;
@@ -67,15 +74,55 @@ public class RecordService {
KafkaContext kafkaContext;
@Inject
- Supplier> consumerSupplier;
+ BiFunction<
+ Deserializer,
+ Deserializer,
+ Consumer
+ > consumerFactory;
@Inject
- Supplier> producerSupplier;
+ BiFunction<
+ Serializer,
+ Serializer,
+ Producer
+ > producerFactory;
@Inject
ThreadContext threadContext;
- public List consumeRecords(String topicId,
+ @Inject
+ ObjectMapper objectMapper;
+
+ RegistryClient registryClient;
+
+ MultiformatDeserializer keyDeserializer;
+ MultiformatDeserializer valueDeserializer;
+
+ MultiformatSerializer keySerializer;
+ MultiformatSerializer valueSerializer;
+
+ @PostConstruct
+ public void initialize() {
+ String registryUrl = ConfigProvider.getConfig().getOptionalValue("console.registry.endpoint", String.class)
+ // TODO: remove default
+ .orElse("http://localhost:9080");
+
+ registryClient = RegistryClientFactory.create(registryUrl);
+
+ keyDeserializer = new MultiformatDeserializer(registryClient);
+ keyDeserializer.configure(kafkaContext.configs(Consumer.class), true);
+
+ valueDeserializer = new MultiformatDeserializer(registryClient);
+ valueDeserializer.configure(kafkaContext.configs(Consumer.class), false);
+
+ keySerializer = new MultiformatSerializer(registryClient, objectMapper);
+ keySerializer.configure(kafkaContext.configs(Producer.class), true);
+
+ valueSerializer = new MultiformatSerializer(registryClient, objectMapper);
+ valueSerializer.configure(kafkaContext.configs(Producer.class), false);
+ }
+
+ public CompletionStage> consumeRecords(String topicId,
Integer partition,
Long offset,
Instant timestamp,
@@ -83,71 +130,64 @@ public List consumeRecords(String topicId,
List include,
Integer maxValueLength) {
- List partitions = topicNameForId(topicId)
- .thenApplyAsync(
- topicName -> consumerSupplier.get().partitionsFor(topicName),
- threadContext.currentContextExecutor())
- .toCompletableFuture()
- .join();
+ return topicNameForId(topicId).thenApplyAsync(topicName -> {
+ Consumer consumer = consumerFactory.apply(keyDeserializer, valueDeserializer);
+ List partitions = consumer.partitionsFor(topicName);
+ List assignments = partitions.stream()
+ .filter(p -> partition == null || partition.equals(p.partition()))
+ .map(p -> new TopicPartition(p.topic(), p.partition()))
+ .toList();
- List assignments = partitions.stream()
- .filter(p -> partition == null || partition.equals(p.partition()))
- .map(p -> new TopicPartition(p.topic(), p.partition()))
- .toList();
-
- if (assignments.isEmpty()) {
- return Collections.emptyList();
- }
+ if (assignments.isEmpty()) {
+ return Collections.emptyList();
+ }
- Consumer consumer = consumerSupplier.get();
- consumer.assign(assignments);
- var endOffsets = consumer.endOffsets(assignments);
+ consumer.assign(assignments);
+ var endOffsets = consumer.endOffsets(assignments);
- if (timestamp != null) {
- seekToTimestamp(consumer, assignments, timestamp);
- } else {
- seekToOffset(consumer, assignments, endOffsets, offset, limit);
- }
+ if (timestamp != null) {
+ seekToTimestamp(consumer, assignments, timestamp);
+ } else {
+ seekToOffset(consumer, assignments, endOffsets, offset, limit);
+ }
- Iterable> poll = () -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit);
- var limitSet = new SizeLimitedSortedSet>(buildComparator(timestamp, offset), limit);
+ Iterable> poll = () -> new ConsumerRecordsIterator<>(consumer, endOffsets, limit);
+ var limitSet = new SizeLimitedSortedSet>(buildComparator(timestamp, offset), limit);
- return StreamSupport.stream(poll.spliterator(), false)
- .flatMap(records -> StreamSupport.stream(records.spliterator(), false))
- .collect(Collectors.toCollection(() -> limitSet))
- .stream()
- .map(rec -> getItems(rec, topicId, include, maxValueLength))
- .toList();
+ return StreamSupport.stream(poll.spliterator(), false)
+ .flatMap(records -> StreamSupport.stream(records.spliterator(), false))
+ .collect(Collectors.toCollection(() -> limitSet))
+ .stream()
+ .map(rec -> getItems(rec, topicId, include, maxValueLength))
+ .toList();
+ }, threadContext.currentContextExecutor());
}
- public CompletionStage produceRecord(String topicId, KafkaRecord input) {
+ public CompletionStage produceRecord(String topicId, KafkaRecord input, JsonApiMeta meta) {
CompletableFuture promise = new CompletableFuture<>();
Executor asyncExec = threadContext.currentContextExecutor();
- topicNameForId(topicId)
- .thenApplyAsync(
- topicName -> producerSupplier.get().partitionsFor(topicName),
- asyncExec)
- .thenAcceptAsync(
- partitions -> {
- Producer producer = producerSupplier.get();
- String topicName = partitions.iterator().next().topic();
- Integer partition = input.getPartition();
-
- if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) {
- promise.completeExceptionally(invalidPartition(topicId, partition));
- } else {
- send(topicName, input, producer, promise);
- }
- },
- asyncExec);
+ topicNameForId(topicId).thenAcceptAsync(topicName -> {
+ Producer producer = producerFactory.apply(keySerializer, valueSerializer);
+ List partitions = producer.partitionsFor(topicName);
+ Integer partition = input.getPartition();
+
+ if (partition != null && partitions.stream().noneMatch(p -> partition.equals(p.partition()))) {
+ promise.completeExceptionally(invalidPartition(topicId, partition));
+ } else {
+ send(topicName, (String) meta.get("format-value"), input, producer, promise);
+ }
+
+ promise.whenComplete((kafkaRecord, error) -> producer.close());
+ }, asyncExec).exceptionally(e -> {
+ promise.completeExceptionally(e);
+ return null;
+ });
return promise;
}
- void send(String topicName, KafkaRecord input, Producer producer, CompletableFuture promise) {
- String key = input.getKey();
-
+ void send(String topicName, String format, KafkaRecord input, Producer producer, CompletableFuture promise) {
List headers = Optional.ofNullable(input.getHeaders())
.orElseGet(Collections::emptyMap)
.entrySet()
@@ -164,15 +204,31 @@ public byte[] value() {
}
})
.map(Header.class::cast)
- .toList();
+ .collect(Collectors.toCollection(ArrayList::new));
+
+ if (format != null) {
+ headers.add(new Header() {
+ @Override
+ public String key() {
+ return "com.github.streamshub.console.message-format-value";
+ }
+
+ @Override
+ public byte[] value() {
+ return format != null ? format.getBytes() : null;
+ }
+ });
+ }
Long timestamp = Optional.ofNullable(input.getTimestamp()).map(Instant::toEpochMilli).orElse(null);
+ var key = new RecordData(input.getKey());
+ var value = new RecordData(input.getValue());
- ProducerRecord request = new ProducerRecord<>(topicName,
+ ProducerRecord request = new ProducerRecord<>(topicName,
input.getPartition(),
timestamp,
key,
- input.getValue(),
+ value,
headers);
producer.send(request, (meta, exception) -> {
@@ -187,9 +243,10 @@ public byte[] value() {
if (meta.hasTimestamp()) {
result.setTimestamp(Instant.ofEpochMilli(meta.timestamp()));
}
- result.setKey(input.getKey());
- result.setValue(input.getValue());
+ result.setKey(key.dataString(null));
+ result.setValue(value.dataString(null));
result.setHeaders(input.getHeaders());
+ result.setSize(sizeOf(meta, request.headers()));
promise.complete(result);
}
});
@@ -210,7 +267,7 @@ CompletionStage topicNameForId(String topicId) {
.orElseThrow(() -> noSuchTopic(topicId)));
}
- void seekToTimestamp(Consumer consumer, List assignments, Instant timestamp) {
+ void seekToTimestamp(Consumer consumer, List assignments, Instant timestamp) {
Long tsMillis = timestamp.toEpochMilli();
Map timestampsToSearch = assignments.stream()
.collect(Collectors.toMap(Function.identity(), p -> tsMillis));
@@ -237,7 +294,7 @@ void seekToTimestamp(Consumer consumer, List ass
});
}
- void seekToOffset(Consumer consumer, List assignments, Map endOffsets, Long offset, int limit) {
+ void seekToOffset(Consumer consumer, List assignments, Map endOffsets, Long offset, int limit) {
var beginningOffsets = consumer.beginningOffsets(assignments);
assignments.forEach(p -> {
@@ -263,9 +320,9 @@ void seekToOffset(Consumer consumer, List assign
});
}
- Comparator> buildComparator(Instant timestamp, Long offset) {
- Comparator> comparator = Comparator
- .>comparingLong(ConsumerRecord::timestamp)
+ Comparator> buildComparator(Instant timestamp, Long offset) {
+ Comparator> comparator = Comparator
+ .>comparingLong(ConsumerRecord::timestamp)
.thenComparingInt(ConsumerRecord::partition)
.thenComparingLong(ConsumerRecord::offset);
@@ -277,15 +334,15 @@ Comparator> buildComparator(Instant timestamp, Lo
return comparator;
}
- KafkaRecord getItems(ConsumerRecord rec, String topicId, List include, Integer maxValueLength) {
+ KafkaRecord getItems(ConsumerRecord rec, String topicId, List include, Integer maxValueLength) {
KafkaRecord item = new KafkaRecord(topicId);
setProperty(KafkaRecord.Fields.PARTITION, include, rec::partition, item::setPartition);
setProperty(KafkaRecord.Fields.OFFSET, include, rec::offset, item::setOffset);
setProperty(KafkaRecord.Fields.TIMESTAMP, include, () -> Instant.ofEpochMilli(rec.timestamp()), item::setTimestamp);
setProperty(KafkaRecord.Fields.TIMESTAMP_TYPE, include, rec.timestampType()::name, item::setTimestampType);
- setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.setKey(bytesToString(k, maxValueLength)));
- setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength)));
+ setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.setKey(k.dataString(maxValueLength)));
+ setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.setValue(v.dataString(maxValueLength)));
setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders);
setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::setSize);
@@ -294,49 +351,27 @@ KafkaRecord getItems(ConsumerRecord rec, String topicId, List void setProperty(String fieldName, List include, Supplier source, java.util.function.Consumer target) {
if (include.contains(fieldName)) {
- target.accept(source.get());
- }
- }
-
- String bytesToString(byte[] bytes, Integer maxValueLength) {
- if (bytes == null) {
- return null;
- }
-
- if (bytes.length == 0) {
- return "";
- }
-
- int bufferSize = maxValueLength != null ? Math.min(maxValueLength, bytes.length) : bytes.length;
- StringBuilder buffer = new StringBuilder(bufferSize);
-
- try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) {
- int input;
-
- while ((input = reader.read()) > -1) {
- if (input == REPLACEMENT_CHARACTER || !Character.isDefined(input)) {
- return BINARY_DATA_MESSAGE;
- }
-
- buffer.append((char) input);
-
- if (maxValueLength != null && buffer.length() == maxValueLength) {
- break;
- }
+ T value = source.get();
+ if (value != null) {
+ target.accept(value);
}
-
- return buffer.toString();
- } catch (IOException e) {
- return BINARY_DATA_MESSAGE;
}
}
Map headersToMap(Headers headers, Integer maxValueLength) {
Map headerMap = new LinkedHashMap<>();
- headers.iterator().forEachRemaining(h -> headerMap.put(h.key(), bytesToString(h.value(), maxValueLength)));
+ headers.iterator().forEachRemaining(h -> headerMap.put(h.key(), RecordData.bytesToString(h.value(), maxValueLength)));
return headerMap;
}
+ long sizeOf(RecordMetadata meta, Headers headers) {
+ return meta.serializedKeySize() +
+ meta.serializedValueSize() +
+ Arrays.stream(headers.toArray())
+ .mapToLong(h -> h.key().length() + h.value().length)
+ .sum();
+ }
+
long sizeOf(ConsumerRecord, ?> rec) {
return rec.serializedKeySize() +
rec.serializedValueSize() +
diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java
new file mode 100644
index 000000000..0e256ec89
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java
@@ -0,0 +1,73 @@
+package com.github.streamshub.console.api.support.serdes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import io.apicurio.registry.serde.avro.DefaultAvroDatumProvider;
+import io.apicurio.registry.types.ArtifactType;
+
+public class AvroDatumProvider extends DefaultAvroDatumProvider {
+ @Override
+ public DatumWriter createDatumWriter(RecordData data, Schema schema) {
+ GenericDatumWriter