Skip to content

Commit

Permalink
JavaDocs and update examples
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 21, 2024
1 parent 9c28ed7 commit 3628c4b
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;

/**
* Base class for JSON API meta data attached to the document root, resources, or relationships.
*
* @see <a href="https://jsonapi.org/format/#document-meta">JSON API Document Structure, 7.5 Meta Information</a>
*/
@Schema(additionalProperties = Object.class)
public class JsonApiMeta {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Representation of a JSON API resource relationship linking a resource to another or
* providing meta information about the relationship.
*
* @see <a href="https://jsonapi.org/format/#document-resource-object-relationships">JSON API Document Structure, 7.2.2.2 Relationships</a>
*/
@JsonInclude(value = Include.NON_NULL)
public class JsonApiRelationship implements HasLinks<JsonApiRelationship>, HasMeta<JsonApiRelationship> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public void close() {
if (admin != null) {
admin.close();
}
/*
* Do not close the registry context when the KafkaContext has client-provided
* credentials. I.e., only close when the context is global (with configured
* credentials).
*/
if (applicationScoped && schemaRegistryContext != null) {
try {
schemaRegistryContext.close();
Expand Down Expand Up @@ -154,6 +159,13 @@ public Optional<String> tokenUrl() {
.map(KafkaListenerAuthenticationOAuth::getTokenEndpointUri));
}

/**
* The SchemaRegistryContext contains a per-Kafka registry client
* and key/value SerDes classes to be used to handle message browsing.
*
* The client and SerDes instances will be kept open and reused until
* the parent KafkaContext is disposed of at application shutdown.
*/
public class SchemaRegistryContext implements Closeable {
private final RegistryClient registryClient;
private final MultiformatDeserializer keyDeserializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

import io.apicurio.registry.resolver.strategy.ArtifactReference;

/**
* Support methods to serialize and deserialize instances of {@link ArtifactReference}
* to a string suitable for use as a referencing in a URL.
*/
public class ArtifactReferences {

private static final Logger LOGGER = Logger.getLogger(ArtifactReferences.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import io.apicurio.registry.serde.avro.DefaultAvroDatumProvider;

/**
* Provides a reader and writer to convert JSON to and from Avro format using a provided
* Avro schema.
*/
public class AvroDatumProvider extends DefaultAvroDatumProvider<RecordData> {
@Override
public DatumWriter<RecordData> createDatumWriter(RecordData data, Schema schema) {
Expand Down Expand Up @@ -65,7 +69,7 @@ public RecordData read(RecordData reuse, Decoder in) throws IOException {
writer.write(datum, jsonEncoder);
jsonEncoder.flush();

return new RecordData(buffer.toByteArray(), null);
return new RecordData(buffer.toByteArray());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
import io.apicurio.registry.resolver.SchemaResolver;
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;

/**
* Simple subclass of {@link AvroKafkaDeserializer} to make the {@code readData}
* methods public.
*/
class AvroDeserializer extends AvroKafkaDeserializer<RecordData> {
AvroDeserializer(SchemaResolver<Schema, RecordData> schemaResolver) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

import java.io.IOException;

/**
* The de-/serializers in this package are re-used between requests and have made
* their {@code close} methods no-ops. This interface is implemented by each to perform
* the actual close operations when a {@link com.github.streamshub.console.api.support.KafkaContext}
* is disposed.
*/
public interface ForceCloseable {

void forceClose() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema;

/**
* Deserializer that supports reading Avro, Protobuf, and raw bytes.
*
* If an Apicurio Registry client is provided, the deserializer attempts to find
* a schema using an identifier found in the message or its headers. If an
* identifier is found and a schema is also found, either Avro or Protobuf
* deserialization will take place by delegating to the Apicurio deserializer
* for each type.
*
* Otherwise, the data will be returned as-is to the client of the Consumer
* using this deserializer. Warning information will be provided for the reason
* a raw message is returned if the deserializer detects the presence of a
* schema identifier.
*/
public class MultiformatDeserializer extends AbstractKafkaDeserializer<Object, RecordData> implements ForceCloseable {

private static final Logger LOG = Logger.getLogger(MultiformatDeserializer.class);
Expand Down Expand Up @@ -127,12 +141,11 @@ private RecordData readAvroData(Headers headers, SchemaLookupResult<Object> sche
} else {
result = avroDeserializer.readData(cast(schema), buffer, start, length);
}
result.schema = schema;
result.meta.put("schema-type", ArtifactType.AVRO);
result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper));
result.meta.put("schema-name", avroSchema.getFullName());
} catch (Exception e) {
result = new RecordData(null, schema);
result = new RecordData((byte[]) null);
result.error = com.github.streamshub.console.api.model.Error.forThrowable(e, "Error deserializing Avro data");
}

Expand All @@ -154,12 +167,12 @@ private RecordData readProtobufData(Headers headers, SchemaLookupResult<Object>
.omittingInsignificantWhitespace()
.print(msg)
.getBytes();
result = new RecordData(data, schema);
result = new RecordData(data);
result.meta.put("schema-type", ArtifactType.PROTOBUF);
result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper));
result.meta.put("schema-name", msg.getDescriptorForType().getFullName());
} catch (Exception e) {
result = new RecordData(null, schema);
result = new RecordData((byte[]) null);
result.error = com.github.streamshub.console.api.model.Error.forThrowable(e, "Error deserializing Protobuf data");
}

Expand All @@ -169,7 +182,7 @@ private RecordData readProtobufData(Headers headers, SchemaLookupResult<Object>
private RecordData readRawData(SchemaLookupResult<Object> schemaResult, ByteBuffer buffer, int start, int length) {
byte[] bytes = new byte[length];
System.arraycopy(buffer.array(), start, bytes, 0, length);
RecordData result = new RecordData(bytes, null);
RecordData result = new RecordData(bytes);

if (schemaResult == RESOLVER_MISSING) {
result.error = new com.github.streamshub.console.api.model.Error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
import io.apicurio.registry.resolver.data.Record;
import io.apicurio.registry.types.ArtifactType;

/**
* Schema parser that delegates to either the Avro or Protobuf schema parser.
* This class attempts to sniff the schema's type using a primitive check for a
* Protobuf message header.
*
* This class can likely be improved with more a advanced/accurate detection
* process.
*/
public class MultiformatSchemaParser<D> implements SchemaParser<Object, D> {

private static final int CACHE_LIMIT = 20;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema;

/**
* Serializer that supports writing Avro, Protobuf, and raw bytes.
*
* This serializer requires that the input data has provided a GAV
* (groupId/artifactId/version) for the target schema, otherwise it will pass
* through the input untouched. If the provided GAV can be found in the Apicurio
* Registry, the schema will be used to serialize to either Avro or Protobuf
* depending on the schema type.
*/
public class MultiformatSerializer extends AbstractKafkaSerializer<Object, RecordData> implements ArtifactReferenceResolverStrategy<Object, RecordData>, ForceCloseable {

private static final SchemaLookupResult<Object> EMPTY_RESULT = SchemaLookupResult.builder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer;
import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema;

/**
* Simple subclass of {@link ProtobufKafkaDeserializer} to make the
* {@code readData} methods public.
*/
class ProtobufDeserializer extends ProtobufKafkaDeserializer<Message> {
ProtobufDeserializer(SchemaResolver<ProtobufSchema, Message> schemaResolver) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@

import io.apicurio.registry.resolver.SchemaLookupResult;
import io.apicurio.registry.resolver.SchemaResolver;
import io.apicurio.registry.serde.AbstractKafkaSerializer;
import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer;
import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema;

/**
* This serializer is required to provide a {@code serialize} method to be used
* instead of {@link AbstractKafkaSerializer#serialize(String, Headers, Object)}
* where the schema is provided, rather than resolved. We have already resolved
* the schema when this serializer is invoked in order to determine that the
* message type should be Protobuf.
*/
class ProtobufSerializer extends ProtobufKafkaSerializer<Message> {
ProtobufSerializer(SchemaResolver<ProtobufSchema, Message> schemaResolver) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,32 @@
import java.util.LinkedHashMap;
import java.util.Map;

import io.apicurio.registry.resolver.ParsedSchema;

/**
* The multi-format de-/serializer uses this type as both the key and value in
* order to provide a bi-directional flow of information. Meta information about
* the associated schema is passed between clients of Producer/Consumer and the
* serializer/de-serializer, and error information may also be conveyed back to
* the client without throwing an exception and disrupting the processing of the
* topic.
*/
public class RecordData {

public static final String BINARY_DATA_MESSAGE = "Binary or non-UTF-8 encoded data cannot be displayed";
static final int REPLACEMENT_CHARACTER = '\uFFFD';

public final Map<String, String> meta = new LinkedHashMap<>(1);
byte[] data;
ParsedSchema<?> schema;
com.github.streamshub.console.api.model.Error error;

public RecordData(byte[] data, ParsedSchema<?> schema) {
super();
this.data = data;
this.schema = schema;
}

public RecordData(byte[] data) {
super();
this.data = data;
this.schema = null;
}

public RecordData(String data) {
this(data != null ? data.getBytes(StandardCharsets.UTF_8) : null);
}

public ParsedSchema<?> schema() {
return schema;
}

public com.github.streamshub.console.api.model.Error error() {
return error;
}
Expand Down
3 changes: 3 additions & 0 deletions console-config-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ kafka:
namespace: my-namespace1 # namespace of the Strimzi Kafka CR (optional)
id: my-kafka1-id # value to be used as an identifier for the cluster. Must be specified when namespace is not.
listener: "secure" # name of the listener to use for connections from the console
# This object may contain a `url` string property with the Apicurio Registry API path to be used to resolve
# Avro or Protobuf schemas for topic message browsing
schemaRegistry: { }
# `properties` contains keys/values to use for any Kafka connection
properties:
security.protocol: SASL_SSL
Expand Down
2 changes: 2 additions & 0 deletions examples/console/010-Console-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ spec:
- name: console-kafka # Name of the `Kafka` CR representing the cluster
namespace: ${KAFKA_NAMESPACE} # Namespace of the `Kafka` CR representing the cluster
listener: secure # Listener on the `Kafka` CR to connect from the console
schemaRegistry: null # Configuration for a connection to an Apicurio Registry instance
# E.g. : { "url": "http://example.com/apis/registry/v2" }
properties:
values: [] # Array of name/value for properties to be used for connections
# made to this cluster
Expand Down

0 comments on commit 3628c4b

Please sign in to comment.