Skip to content

Commit

Permalink
RecordData javadocs, app-scoped ObjectMapper, simplify schema parsing
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 24, 2024
1 parent 180c8c8 commit 6dc57da
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -61,7 +60,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.github.streamshub.console.api.service.KafkaClusterService;
import com.github.streamshub.console.api.support.Holder;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
Expand Down Expand Up @@ -127,7 +125,7 @@ public class ClientFactory {
Config config;

@Inject
ScheduledExecutorService scheduler;
ObjectMapper mapper;

@Inject
@ConfigProperty(name = "console.config-path")
Expand All @@ -136,9 +134,6 @@ public class ClientFactory {
@Inject
Holder<SharedIndexInformer<Kafka>> kafkaInformer;

@Inject
KafkaClusterService kafkaClusterService;

@Inject
ValidationProxy validationService;

Expand Down Expand Up @@ -186,11 +181,10 @@ public ConsoleConfig produceConsoleConfig() {
.filter(Objects::nonNull)
.map(url -> {
log.infof("Loading console configuration from %s", url);

ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
ObjectMapper yamlMapper = mapper.copyWith(new YAMLFactory());

try (InputStream stream = url.openStream()) {
return mapper.readValue(stream, ConsoleConfig.class);
return yamlMapper.readValue(stream, ConsoleConfig.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -362,7 +356,7 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
}

KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
ctx.schemaRegistryClient(schemaRegistryClient);
ctx.schemaRegistryClient(schemaRegistryClient, mapper);

KafkaContext previous = contexts.put(clusterId, ctx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ public boolean applicationScoped() {
return applicationScoped;
}

public void schemaRegistryClient(RegistryClient schemaRegistryClient) {
schemaRegistryContext = new SchemaRegistryContext(schemaRegistryClient);
public void schemaRegistryClient(RegistryClient schemaRegistryClient, ObjectMapper objectMapper) {
schemaRegistryContext = new SchemaRegistryContext(schemaRegistryClient, objectMapper);
}

public SchemaRegistryContext schemaRegistryContext() {
Expand Down Expand Up @@ -173,11 +173,9 @@ public class SchemaRegistryContext implements Closeable {
private final MultiformatSerializer keySerializer;
private final MultiformatSerializer valueSerializer;

SchemaRegistryContext(RegistryClient schemaRegistryClient) {
SchemaRegistryContext(RegistryClient schemaRegistryClient, ObjectMapper objectMapper) {
this.registryClient = schemaRegistryClient;

ObjectMapper objectMapper = new ObjectMapper();

keyDeserializer = new MultiformatDeserializer(registryClient, objectMapper);
keyDeserializer.configure(configs(Consumer.class), true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public void configure(Map<String, ?> configs, boolean isKey) {
protobufDeserializer.configure(protobufConfigs, isKey);

parser = new MultiformatSchemaParser<>(Set.of(
avroDeserializer.schemaParser(),
protobufDeserializer.schemaParser()
cast(avroDeserializer.schemaParser()),
cast(protobufDeserializer.schemaParser())
));

super.configure(new BaseKafkaSerDeConfig(configs), isKey);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,21 @@
package com.github.streamshub.console.api.support.serdes;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import io.apicurio.registry.resolver.ParsedSchema;
import io.apicurio.registry.resolver.SchemaParser;
import io.apicurio.registry.resolver.data.Record;
import io.apicurio.registry.types.ArtifactType;

/**
* Schema parser that delegates to either the Avro or Protobuf schema parser.
* This class attempts to sniff the schema's type using a primitive check for a
* Protobuf message header.
*
* This class can likely be improved with more a advanced/accurate detection
* process.
*/
public class MultiformatSchemaParser<D> implements SchemaParser<Object, D> {

private static final int CACHE_LIMIT = 20;
private static final char[] PROTOBUF_INTRO = "message ".toCharArray();
private final Map<String, SchemaParser<Object, ?>> delegates;

private final Map<String, SchemaParser<?, ?>> delegates;
private final Map<byte[], Object> schemaCache = new LinkedHashMap<>(CACHE_LIMIT);

public MultiformatSchemaParser(Set<SchemaParser<?, ?>> delegates) {
public MultiformatSchemaParser(Set<SchemaParser<Object, ?>> delegates) {
this.delegates = delegates.stream()
.map(p -> Map.entry(p.artifactType(), p))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Expand All @@ -44,22 +28,15 @@ public String artifactType() {

@Override
public Object parseSchema(byte[] rawSchema, Map<String, ParsedSchema<Object>> resolvedReferences) {
if (schemaCache.containsKey(rawSchema)) {
return schemaCache.get(rawSchema);
}

Object parsedSchema = loadSchema(rawSchema, resolvedReferences);

if (schemaCache.size() == CACHE_LIMIT) {
// Remove the oldest entry
var iter = schemaCache.entrySet().iterator();
iter.next();
iter.remove();
for (SchemaParser<Object, ?> delegate : delegates.values()) {
try {
return delegate.parseSchema(rawSchema, resolvedReferences);
} catch (Exception e) {
// Schema is not valid for the delegate parser
}
}

schemaCache.put(rawSchema, parsedSchema);

return parsedSchema;
return null;
}

@Override
Expand All @@ -76,47 +53,4 @@ public ParsedSchema<Object> getSchemaFromData(Record<D> data) {
public ParsedSchema<Object> getSchemaFromData(Record<D> data, boolean dereference) {
throw new UnsupportedOperationException("MultiformatSchemaParser#getSchemaFromData(Record,boolean)");
}

@SuppressWarnings("unchecked")
Object loadSchema(byte[] rawSchema, Map<String, ParsedSchema<Object>> resolvedReferences) {
Object parsedSchema = null;
SchemaParser<Object, RecordData> delegate = null;

if (delegates.containsKey(ArtifactType.PROTOBUF) && looksLikeProtobuf(rawSchema)) {
delegate = (SchemaParser<Object, RecordData>) delegates.get(ArtifactType.PROTOBUF);
} else if (delegates.containsKey(ArtifactType.AVRO)) {
delegate = (SchemaParser<Object, RecordData>) delegates.get(ArtifactType.AVRO);
}

if (delegate != null) {
try {
parsedSchema = delegate.parseSchema(rawSchema, resolvedReferences);
} catch (Exception e) {
// Schema is not valid, will be cached as null
}
}

return parsedSchema;
}

boolean looksLikeProtobuf(byte[] rawSchema) {
try (Reader reader = new InputStreamReader(new ByteArrayInputStream(rawSchema))) {
int input;

while ((input = reader.read()) != -1) {
if (Character.isWhitespace(input)) {
continue;
}
char[] buffer = new char[8];
buffer[0] = (char) input;

if (reader.read(buffer, 1, 7) == 7 && Arrays.equals(PROTOBUF_INTRO, buffer)) {
return true;
}
}
} catch (IOException e) {
// Ignore
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public void configure(Map<String, ?> configs, boolean isKey) {
protobufSerializer.configure(protobufConfigs, isKey);

parser = new MultiformatSchemaParser<>(Set.of(
avroSerializer.schemaParser(),
protobufSerializer.schemaParser()
cast(avroSerializer.schemaParser()),
cast(protobufSerializer.schemaParser())
));

super.configure(new BaseKafkaSerDeConfig(serConfigs), isKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,28 @@ public com.github.streamshub.console.api.model.Error error() {
return error;
}

/**
* Convert this instance's {@link RecordData#data data} bytes to a string
* Optionally, the length of the string will be limited to maxValueLength. When
* invalid characters are detected, the result is replaced with the value of
* {@link #BINARY_DATA_MESSAGE}.
*
* @param maxValueLength maximum length of the result string
* @return the record's data bytes as a string
*/
public String dataString(Integer maxValueLength) {
return bytesToString(data, maxValueLength);
}

/**
* Convert the given bytes to a string Optionally, the length of the string will
* be limited to maxValueLength. When invalid characters are detected, the
* result is replaced with the value of {@link #BINARY_DATA_MESSAGE}.
*
* @param bytes byte array to be converted to a string
* @param maxValueLength maximum length of the result string
* @return the record's data bytes as a string
*/
public static String bytesToString(byte[] bytes, Integer maxValueLength) {
if (bytes == null) {
return null;
Expand Down

0 comments on commit 6dc57da

Please sign in to comment.