From 6dc57daec2b87d6838b45998766f46710d59dbfe Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Thu, 24 Oct 2024 09:10:19 -0400 Subject: [PATCH] RecordData javadocs, app-scoped ObjectMapper, simplify schema parsing Signed-off-by: Michael Edgar --- .../streamshub/console/api/ClientFactory.java | 14 +--- .../console/api/support/KafkaContext.java | 8 +- .../serdes/MultiformatDeserializer.java | 4 +- .../serdes/MultiformatSchemaParser.java | 84 ++----------------- .../support/serdes/MultiformatSerializer.java | 4 +- .../api/support/serdes/RecordData.java | 18 ++++ 6 files changed, 38 insertions(+), 94 deletions(-) diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index d1fc69144..8db059f41 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -15,7 +15,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; @@ -61,7 +60,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.github.streamshub.console.api.service.KafkaClusterService; import com.github.streamshub.console.api.support.Holder; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.TrustAllCertificateManager; @@ -127,7 +125,7 @@ public class ClientFactory { Config config; @Inject - ScheduledExecutorService scheduler; + ObjectMapper mapper; @Inject @ConfigProperty(name = "console.config-path") @@ -136,9 +134,6 @@ public class ClientFactory { @Inject Holder> kafkaInformer; - @Inject - KafkaClusterService kafkaClusterService; - @Inject ValidationProxy validationService; @@ -186,11 +181,10 @@ public ConsoleConfig produceConsoleConfig() { .filter(Objects::nonNull) .map(url -> { log.infof("Loading console configuration from %s", url); - - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + ObjectMapper yamlMapper = mapper.copyWith(new YAMLFactory()); try (InputStream stream = url.openStream()) { - return mapper.readValue(stream, ConsoleConfig.class); + return yamlMapper.readValue(stream, ConsoleConfig.class); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -362,7 +356,7 @@ void putKafkaContext(Map contexts, } KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); - ctx.schemaRegistryClient(schemaRegistryClient); + ctx.schemaRegistryClient(schemaRegistryClient, mapper); KafkaContext previous = contexts.put(clusterId, ctx); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index b5dae1e55..1a8498cec 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -127,8 +127,8 @@ public boolean applicationScoped() { return applicationScoped; } - public void schemaRegistryClient(RegistryClient schemaRegistryClient) { - schemaRegistryContext = new SchemaRegistryContext(schemaRegistryClient); + public void schemaRegistryClient(RegistryClient schemaRegistryClient, ObjectMapper objectMapper) { + schemaRegistryContext = new SchemaRegistryContext(schemaRegistryClient, objectMapper); } public SchemaRegistryContext schemaRegistryContext() { @@ -173,11 +173,9 @@ public class SchemaRegistryContext implements Closeable { private final MultiformatSerializer keySerializer; private final MultiformatSerializer valueSerializer; - SchemaRegistryContext(RegistryClient schemaRegistryClient) { + SchemaRegistryContext(RegistryClient schemaRegistryClient, ObjectMapper objectMapper) { this.registryClient = schemaRegistryClient; - ObjectMapper objectMapper = new ObjectMapper(); - keyDeserializer = new MultiformatDeserializer(registryClient, objectMapper); keyDeserializer.configure(configs(Consumer.class), true); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java index 49173ce9a..59f65f77d 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java @@ -89,8 +89,8 @@ public void configure(Map configs, boolean isKey) { protobufDeserializer.configure(protobufConfigs, isKey); parser = new MultiformatSchemaParser<>(Set.of( - avroDeserializer.schemaParser(), - protobufDeserializer.schemaParser() + cast(avroDeserializer.schemaParser()), + cast(protobufDeserializer.schemaParser()) )); super.configure(new BaseKafkaSerDeConfig(configs), isKey); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java index 2de9cbf2e..79eb510d5 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java @@ -1,11 +1,5 @@ package com.github.streamshub.console.api.support.serdes; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -13,25 +7,15 @@ import io.apicurio.registry.resolver.ParsedSchema; import io.apicurio.registry.resolver.SchemaParser; import io.apicurio.registry.resolver.data.Record; -import io.apicurio.registry.types.ArtifactType; /** * Schema parser that delegates to either the Avro or Protobuf schema parser. - * This class attempts to sniff the schema's type using a primitive check for a - * Protobuf message header. - * - * This class can likely be improved with more a advanced/accurate detection - * process. */ public class MultiformatSchemaParser implements SchemaParser { - private static final int CACHE_LIMIT = 20; - private static final char[] PROTOBUF_INTRO = "message ".toCharArray(); + private final Map> delegates; - private final Map> delegates; - private final Map schemaCache = new LinkedHashMap<>(CACHE_LIMIT); - - public MultiformatSchemaParser(Set> delegates) { + public MultiformatSchemaParser(Set> delegates) { this.delegates = delegates.stream() .map(p -> Map.entry(p.artifactType(), p)) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); @@ -44,22 +28,15 @@ public String artifactType() { @Override public Object parseSchema(byte[] rawSchema, Map> resolvedReferences) { - if (schemaCache.containsKey(rawSchema)) { - return schemaCache.get(rawSchema); - } - - Object parsedSchema = loadSchema(rawSchema, resolvedReferences); - - if (schemaCache.size() == CACHE_LIMIT) { - // Remove the oldest entry - var iter = schemaCache.entrySet().iterator(); - iter.next(); - iter.remove(); + for (SchemaParser delegate : delegates.values()) { + try { + return delegate.parseSchema(rawSchema, resolvedReferences); + } catch (Exception e) { + // Schema is not valid for the delegate parser + } } - schemaCache.put(rawSchema, parsedSchema); - - return parsedSchema; + return null; } @Override @@ -76,47 +53,4 @@ public ParsedSchema getSchemaFromData(Record data) { public ParsedSchema getSchemaFromData(Record data, boolean dereference) { throw new UnsupportedOperationException("MultiformatSchemaParser#getSchemaFromData(Record,boolean)"); } - - @SuppressWarnings("unchecked") - Object loadSchema(byte[] rawSchema, Map> resolvedReferences) { - Object parsedSchema = null; - SchemaParser delegate = null; - - if (delegates.containsKey(ArtifactType.PROTOBUF) && looksLikeProtobuf(rawSchema)) { - delegate = (SchemaParser) delegates.get(ArtifactType.PROTOBUF); - } else if (delegates.containsKey(ArtifactType.AVRO)) { - delegate = (SchemaParser) delegates.get(ArtifactType.AVRO); - } - - if (delegate != null) { - try { - parsedSchema = delegate.parseSchema(rawSchema, resolvedReferences); - } catch (Exception e) { - // Schema is not valid, will be cached as null - } - } - - return parsedSchema; - } - - boolean looksLikeProtobuf(byte[] rawSchema) { - try (Reader reader = new InputStreamReader(new ByteArrayInputStream(rawSchema))) { - int input; - - while ((input = reader.read()) != -1) { - if (Character.isWhitespace(input)) { - continue; - } - char[] buffer = new char[8]; - buffer[0] = (char) input; - - if (reader.read(buffer, 1, 7) == 7 && Arrays.equals(PROTOBUF_INTRO, buffer)) { - return true; - } - } - } catch (IOException e) { - // Ignore - } - return false; - } } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java index c37c17655..618d80943 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java @@ -95,8 +95,8 @@ public void configure(Map configs, boolean isKey) { protobufSerializer.configure(protobufConfigs, isKey); parser = new MultiformatSchemaParser<>(Set.of( - avroSerializer.schemaParser(), - protobufSerializer.schemaParser() + cast(avroSerializer.schemaParser()), + cast(protobufSerializer.schemaParser()) )); super.configure(new BaseKafkaSerDeConfig(serConfigs), isKey); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java index 05f62dbbb..22d8b6581 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java @@ -38,10 +38,28 @@ public com.github.streamshub.console.api.model.Error error() { return error; } + /** + * Convert this instance's {@link RecordData#data data} bytes to a string + * Optionally, the length of the string will be limited to maxValueLength. When + * invalid characters are detected, the result is replaced with the value of + * {@link #BINARY_DATA_MESSAGE}. + * + * @param maxValueLength maximum length of the result string + * @return the record's data bytes as a string + */ public String dataString(Integer maxValueLength) { return bytesToString(data, maxValueLength); } + /** + * Convert the given bytes to a string Optionally, the length of the string will + * be limited to maxValueLength. When invalid characters are detected, the + * result is replaced with the value of {@link #BINARY_DATA_MESSAGE}. + * + * @param bytes byte array to be converted to a string + * @param maxValueLength maximum length of the result string + * @return the record's data bytes as a string + */ public static String bytesToString(byte[] bytes, Integer maxValueLength) { if (bytes == null) { return null;