diff --git a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiMeta.java b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiMeta.java index d936aba34..b2afff22c 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiMeta.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiMeta.java @@ -9,6 +9,11 @@ import com.fasterxml.jackson.annotation.JsonAnySetter; import com.fasterxml.jackson.annotation.JsonIgnore; +/** + * Base class for JSON API meta data attached to the document root, resources, or relationships. + * + * @see JSON API Document Structure, 7.5 Meta Information + */ @Schema(additionalProperties = Object.class) public class JsonApiMeta { diff --git a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java index 49ccf44df..ffa38d432 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/JsonApiRelationship.java @@ -6,6 +6,12 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; +/** + * Representation of a JSON API resource relationship linking a resource to another or + * providing meta information about the relationship. + * + * @see JSON API Document Structure, 7.2.2.2 Relationships + */ @JsonInclude(value = Include.NON_NULL) public class JsonApiRelationship implements HasLinks, HasMeta { diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index 51dec27ac..b5dae1e55 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -85,6 +85,11 @@ public void close() { if (admin != null) { admin.close(); } + /* + * Do not close the registry context when the KafkaContext has client-provided + * credentials. I.e., only close when the context is global (with configured + * credentials). + */ if (applicationScoped && schemaRegistryContext != null) { try { schemaRegistryContext.close(); @@ -154,6 +159,13 @@ public Optional tokenUrl() { .map(KafkaListenerAuthenticationOAuth::getTokenEndpointUri)); } + /** + * The SchemaRegistryContext contains a per-Kafka registry client + * and key/value SerDes classes to be used to handle message browsing. + * + * The client and SerDes instances will be kept open and reused until + * the parent KafkaContext is disposed of at application shutdown. + */ public class SchemaRegistryContext implements Closeable { private final RegistryClient registryClient; private final MultiformatDeserializer keyDeserializer; diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ArtifactReferences.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ArtifactReferences.java index 74cff29c4..eaa7b213b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ArtifactReferences.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ArtifactReferences.java @@ -17,6 +17,10 @@ import io.apicurio.registry.resolver.strategy.ArtifactReference; +/** + * Support methods to serialize and deserialize instances of {@link ArtifactReference} + * to a string suitable for use as a referencing in a URL. + */ public class ArtifactReferences { private static final Logger LOGGER = Logger.getLogger(ArtifactReferences.class); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java index b983ef7ab..e418512e2 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDatumProvider.java @@ -18,6 +18,10 @@ import io.apicurio.registry.serde.avro.DefaultAvroDatumProvider; +/** + * Provides a reader and writer to convert JSON to and from Avro format using a provided + * Avro schema. + */ public class AvroDatumProvider extends DefaultAvroDatumProvider { @Override public DatumWriter createDatumWriter(RecordData data, Schema schema) { @@ -65,7 +69,7 @@ public RecordData read(RecordData reuse, Decoder in) throws IOException { writer.write(datum, jsonEncoder); jsonEncoder.flush(); - return new RecordData(buffer.toByteArray(), null); + return new RecordData(buffer.toByteArray()); } @Override diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java index 906c997b8..80827337b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/AvroDeserializer.java @@ -9,6 +9,10 @@ import io.apicurio.registry.resolver.SchemaResolver; import io.apicurio.registry.serde.avro.AvroKafkaDeserializer; +/** + * Simple subclass of {@link AvroKafkaDeserializer} to make the {@code readData} + * methods public. + */ class AvroDeserializer extends AvroKafkaDeserializer { AvroDeserializer(SchemaResolver schemaResolver) { super(); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ForceCloseable.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ForceCloseable.java index 905590d1a..d7009b9f9 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ForceCloseable.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ForceCloseable.java @@ -2,6 +2,12 @@ import java.io.IOException; +/** + * The de-/serializers in this package are re-used between requests and have made + * their {@code close} methods no-ops. This interface is implemented by each to perform + * the actual close operations when a {@link com.github.streamshub.console.api.support.KafkaContext} + * is disposed. + */ public interface ForceCloseable { void forceClose() throws IOException; diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java index 902b8bee3..49173ce9a 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatDeserializer.java @@ -27,6 +27,20 @@ import io.apicurio.registry.types.ArtifactType; import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; +/** + * Deserializer that supports reading Avro, Protobuf, and raw bytes. + * + * If an Apicurio Registry client is provided, the deserializer attempts to find + * a schema using an identifier found in the message or its headers. If an + * identifier is found and a schema is also found, either Avro or Protobuf + * deserialization will take place by delegating to the Apicurio deserializer + * for each type. + * + * Otherwise, the data will be returned as-is to the client of the Consumer + * using this deserializer. Warning information will be provided for the reason + * a raw message is returned if the deserializer detects the presence of a + * schema identifier. + */ public class MultiformatDeserializer extends AbstractKafkaDeserializer implements ForceCloseable { private static final Logger LOG = Logger.getLogger(MultiformatDeserializer.class); @@ -127,12 +141,11 @@ private RecordData readAvroData(Headers headers, SchemaLookupResult sche } else { result = avroDeserializer.readData(cast(schema), buffer, start, length); } - result.schema = schema; result.meta.put("schema-type", ArtifactType.AVRO); result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper)); result.meta.put("schema-name", avroSchema.getFullName()); } catch (Exception e) { - result = new RecordData(null, schema); + result = new RecordData((byte[]) null); result.error = com.github.streamshub.console.api.model.Error.forThrowable(e, "Error deserializing Avro data"); } @@ -154,12 +167,12 @@ private RecordData readProtobufData(Headers headers, SchemaLookupResult .omittingInsignificantWhitespace() .print(msg) .getBytes(); - result = new RecordData(data, schema); + result = new RecordData(data); result.meta.put("schema-type", ArtifactType.PROTOBUF); result.meta.put("schema-id", ArtifactReferences.toSchemaId(schemaResult.toArtifactReference(), objectMapper)); result.meta.put("schema-name", msg.getDescriptorForType().getFullName()); } catch (Exception e) { - result = new RecordData(null, schema); + result = new RecordData((byte[]) null); result.error = com.github.streamshub.console.api.model.Error.forThrowable(e, "Error deserializing Protobuf data"); } @@ -169,7 +182,7 @@ private RecordData readProtobufData(Headers headers, SchemaLookupResult private RecordData readRawData(SchemaLookupResult schemaResult, ByteBuffer buffer, int start, int length) { byte[] bytes = new byte[length]; System.arraycopy(buffer.array(), start, bytes, 0, length); - RecordData result = new RecordData(bytes, null); + RecordData result = new RecordData(bytes); if (schemaResult == RESOLVER_MISSING) { result.error = new com.github.streamshub.console.api.model.Error( diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java index a7ccf57bf..2de9cbf2e 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSchemaParser.java @@ -15,6 +15,14 @@ import io.apicurio.registry.resolver.data.Record; import io.apicurio.registry.types.ArtifactType; +/** + * Schema parser that delegates to either the Avro or Protobuf schema parser. + * This class attempts to sniff the schema's type using a primitive check for a + * Protobuf message header. + * + * This class can likely be improved with more a advanced/accurate detection + * process. + */ public class MultiformatSchemaParser implements SchemaParser { private static final int CACHE_LIMIT = 20; diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java index a44c651fc..c37c17655 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/MultiformatSerializer.java @@ -37,6 +37,15 @@ import io.apicurio.registry.types.ArtifactType; import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; +/** + * Serializer that supports writing Avro, Protobuf, and raw bytes. + * + * This serializer requires that the input data has provided a GAV + * (groupId/artifactId/version) for the target schema, otherwise it will pass + * through the input untouched. If the provided GAV can be found in the Apicurio + * Registry, the schema will be used to serialize to either Avro or Protobuf + * depending on the schema type. + */ public class MultiformatSerializer extends AbstractKafkaSerializer implements ArtifactReferenceResolverStrategy, ForceCloseable { private static final SchemaLookupResult EMPTY_RESULT = SchemaLookupResult.builder().build(); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java index 1fe3f5d55..9cde31239 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufDeserializer.java @@ -11,6 +11,10 @@ import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer; import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; +/** + * Simple subclass of {@link ProtobufKafkaDeserializer} to make the + * {@code readData} methods public. + */ class ProtobufDeserializer extends ProtobufKafkaDeserializer { ProtobufDeserializer(SchemaResolver schemaResolver) { super(); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java index c82e793d1..c32a8d62e 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ProtobufSerializer.java @@ -10,9 +10,17 @@ import io.apicurio.registry.resolver.SchemaLookupResult; import io.apicurio.registry.resolver.SchemaResolver; +import io.apicurio.registry.serde.AbstractKafkaSerializer; import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer; import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; +/** + * This serializer is required to provide a {@code serialize} method to be used + * instead of {@link AbstractKafkaSerializer#serialize(String, Headers, Object)} + * where the schema is provided, rather than resolved. We have already resolved + * the schema when this serializer is invoked in order to determine that the + * message type should be Protobuf. + */ class ProtobufSerializer extends ProtobufKafkaSerializer { ProtobufSerializer(SchemaResolver schemaResolver) { super(); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java index 81f0e6ac8..05f62dbbb 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/RecordData.java @@ -8,8 +8,14 @@ import java.util.LinkedHashMap; import java.util.Map; -import io.apicurio.registry.resolver.ParsedSchema; - +/** + * The multi-format de-/serializer uses this type as both the key and value in + * order to provide a bi-directional flow of information. Meta information about + * the associated schema is passed between clients of Producer/Consumer and the + * serializer/de-serializer, and error information may also be conveyed back to + * the client without throwing an exception and disrupting the processing of the + * topic. + */ public class RecordData { public static final String BINARY_DATA_MESSAGE = "Binary or non-UTF-8 encoded data cannot be displayed"; @@ -17,29 +23,17 @@ public class RecordData { public final Map meta = new LinkedHashMap<>(1); byte[] data; - ParsedSchema schema; com.github.streamshub.console.api.model.Error error; - public RecordData(byte[] data, ParsedSchema schema) { - super(); - this.data = data; - this.schema = schema; - } - public RecordData(byte[] data) { super(); this.data = data; - this.schema = null; } public RecordData(String data) { this(data != null ? data.getBytes(StandardCharsets.UTF_8) : null); } - public ParsedSchema schema() { - return schema; - } - public com.github.streamshub.console.api.model.Error error() { return error; } diff --git a/console-config-example.yaml b/console-config-example.yaml index 3b55847b7..be0800351 100644 --- a/console-config-example.yaml +++ b/console-config-example.yaml @@ -9,6 +9,9 @@ kafka: namespace: my-namespace1 # namespace of the Strimzi Kafka CR (optional) id: my-kafka1-id # value to be used as an identifier for the cluster. Must be specified when namespace is not. listener: "secure" # name of the listener to use for connections from the console + # This object may contain a `url` string property with the Apicurio Registry API path to be used to resolve + # Avro or Protobuf schemas for topic message browsing + schemaRegistry: { } # `properties` contains keys/values to use for any Kafka connection properties: security.protocol: SASL_SSL diff --git a/examples/console/010-Console-example.yaml b/examples/console/010-Console-example.yaml index b597bcba2..793fd1b77 100644 --- a/examples/console/010-Console-example.yaml +++ b/examples/console/010-Console-example.yaml @@ -13,6 +13,8 @@ spec: - name: console-kafka # Name of the `Kafka` CR representing the cluster namespace: ${KAFKA_NAMESPACE} # Namespace of the `Kafka` CR representing the cluster listener: secure # Listener on the `Kafka` CR to connect from the console + schemaRegistry: null # Configuration for a connection to an Apicurio Registry instance + # E.g. : { "url": "http://example.com/apis/registry/v2" } properties: values: [] # Array of name/value for properties to be used for connections # made to this cluster