From ac5c347b908004ce9129603ee7765fbdac96ab0b Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Thu, 24 Oct 2024 10:23:02 -0400 Subject: [PATCH] Re-factor registry config to top level, reference from Kafka config Signed-off-by: Michael Edgar --- .../streamshub/console/api/ClientFactory.java | 104 +++-------------- .../console/api/SchemasResource.java | 48 ++++---- .../console/api/service/RecordService.java | 8 +- .../console/api/support/KafkaContext.java | 21 +++- .../factories/ConsoleConfigFactory.java | 110 ++++++++++++++++++ .../kafka/systemtest/TestPlainProfile.java | 16 ++- .../console/config/ConsoleConfig.java | 22 ++++ .../console/config/KafkaClusterConfig.java | 10 +- .../console/config/SchemaRegistryConfig.java | 11 ++ console-config-example.yaml | 10 +- .../api/v1alpha1/spec/ConsoleSpec.java | 10 ++ .../api/v1alpha1/spec/KafkaCluster.java | 11 +- .../api/v1alpha1/spec/SchemaRegistry.java | 16 +++ .../console/dependents/ConsoleSecret.java | 21 +++- .../console/ConsoleReconcilerTest.java | 18 ++- 15 files changed, 284 insertions(+), 152 deletions(-) create mode 100644 api/src/main/java/com/github/streamshub/console/api/support/factories/ConsoleConfigFactory.java diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index 8db059f41..87505bfaf 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -1,9 +1,5 @@ package com.github.streamshub.console.api; -import java.io.IOException; -import java.io.InputStream; -import java.io.UncheckedIOException; -import java.nio.file.Path; import java.util.Base64; import java.util.Collection; import java.util.Collections; @@ -55,21 +51,17 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.config.Config; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.github.streamshub.console.api.support.Holder; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.TrustAllCertificateManager; -import com.github.streamshub.console.api.support.ValidationProxy; import com.github.streamshub.console.api.support.serdes.RecordData; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; +import com.github.streamshub.console.config.SchemaRegistryConfig; -import io.apicurio.registry.rest.client.RegistryClient; -import io.apicurio.registry.rest.client.RegistryClientFactory; import io.apicurio.registry.serde.SerdeConfig; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; @@ -128,15 +120,11 @@ public class ClientFactory { ObjectMapper mapper; @Inject - @ConfigProperty(name = "console.config-path") - Optional configPath; + ConsoleConfig consoleConfig; @Inject Holder> kafkaInformer; - @Inject - ValidationProxy validationService; - @Inject Instance trustManager; @@ -168,57 +156,12 @@ public class ClientFactory { @Produces @ApplicationScoped - public ConsoleConfig produceConsoleConfig() { - return configPath.map(Path::of) - .map(Path::toUri) - .map(uri -> { - try { - return uri.toURL(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }) - .filter(Objects::nonNull) - .map(url -> { - log.infof("Loading console configuration from %s", url); - ObjectMapper yamlMapper = mapper.copyWith(new YAMLFactory()); - - try (InputStream stream = url.openStream()) { - return yamlMapper.readValue(stream, ConsoleConfig.class); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }) - .map(consoleConfig -> { - consoleConfig.getKafka().getClusters().stream().forEach(cluster -> { - if (cluster.getSchemaRegistry() != null) { - var registryConfig = cluster.getSchemaRegistry(); - registryConfig.setUrl(resolveValue(registryConfig.getUrl())); - } - resolveValues(cluster.getProperties()); - resolveValues(cluster.getAdminProperties()); - resolveValues(cluster.getProducerProperties()); - resolveValues(cluster.getConsumerProperties()); - }); - - return consoleConfig; - }) - .map(validationService::validate) - .orElseGet(() -> { - log.warn("Console configuration has not been specified using `console.config-path` property"); - return new ConsoleConfig(); - }); - } - - @Produces - @ApplicationScoped - Map produceKafkaContexts(ConsoleConfig consoleConfig, - Function, Admin> adminBuilder) { + Map produceKafkaContexts(Function, Admin> adminBuilder) { final Map contexts = new ConcurrentHashMap<>(); if (kafkaInformer.isPresent()) { - addKafkaEventHandler(contexts, consoleConfig, adminBuilder); + addKafkaEventHandler(contexts, adminBuilder); } // Configure clusters that will not be configured by events @@ -236,7 +179,6 @@ Map produceKafkaContexts(ConsoleConfig consoleConfig, } void addKafkaEventHandler(Map contexts, - ConsoleConfig consoleConfig, Function, Admin> adminBuilder) { kafkaInformer.get().addEventHandlerWithResyncPeriod(new ResourceEventHandler() { @@ -348,15 +290,18 @@ void putKafkaContext(Map contexts, admin = adminBuilder.apply(adminConfigs); } - RegistryClient schemaRegistryClient = null; + SchemaRegistryConfig registryConfig = null; if (clusterConfig.getSchemaRegistry() != null) { - String registryUrl = clusterConfig.getSchemaRegistry().getUrl(); - schemaRegistryClient = RegistryClientFactory.create(registryUrl); // NOSONAR - closed elsewhere + registryConfig = consoleConfig.getSchemaRegistries() + .stream() + .filter(registry -> registry.getName().equals(clusterConfig.getSchemaRegistry())) + .findFirst() + .orElseThrow(); } KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); - ctx.schemaRegistryClient(schemaRegistryClient, mapper); + ctx.schemaRegistryClient(registryConfig, mapper); KafkaContext previous = contexts.put(clusterId, ctx); @@ -560,7 +505,7 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + public Consumer consumerSupplier(KafkaContext context) { var configs = maybeAuthenticate(context, Consumer.class); return new KafkaConsumer<>( @@ -575,7 +520,7 @@ public void disposeConsumer(@Disposes Consumer consumer) @Produces @RequestScoped - public Producer producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { + public Producer producerSupplier(KafkaContext context) { var configs = maybeAuthenticate(context, Producer.class); return new KafkaProducer<>( configs, @@ -708,29 +653,6 @@ private void applyListenerConfiguration(Map cfg, GenericKafkaLis cfg.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); } - private void resolveValues(Map properties) { - properties.entrySet().forEach(entry -> - entry.setValue(resolveValue(entry.getValue()))); - } - - /** - * If the given value is an expression referencing a configuration value, - * replace it with the target property value. - * - * @param value configuration value that may be a reference to another - * configuration property - * @return replacement property or the same value if the given string is not a - * reference. - */ - private String resolveValue(String value) { - if (value.startsWith("${") && value.endsWith("}")) { - String replacement = value.substring(2, value.length() - 1); - return config.getOptionalValue(replacement, String.class).orElse(value); - } - - return value; - } - Optional getDefaultConfig(String clientType, String configName) { String clientSpecificKey = "console.kafka.%s.%s".formatted(clientType, configName); String generalKey = "console.kafka.%s".formatted(configName); diff --git a/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java index 0bb4a9c3b..6b28cdff0 100644 --- a/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/SchemasResource.java @@ -1,8 +1,6 @@ package com.github.streamshub.console.api; -import java.util.Base64; import java.util.Collections; -import java.util.Map; import java.util.Optional; import jakarta.inject.Inject; @@ -21,15 +19,17 @@ import org.jboss.logging.Logger; import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.serdes.ArtifactReferences; import com.github.streamshub.console.api.support.serdes.MultiformatSchemaParser; +import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.config.SchemaRegistryConfig; import io.apicurio.registry.resolver.DefaultSchemaResolver; import io.apicurio.registry.resolver.SchemaResolver; import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.client.RegistryClientFactory; -@Path("/api/schemas/{schemaId}") +@Path("/api/registries/{registryId}/schemas/{schemaId}") @Tag(name = "Schema Registry Resources") public class SchemasResource { @@ -40,8 +40,16 @@ public class SchemasResource { ObjectMapper objectMapper; @Inject - Map kafkaContexts; - + ConsoleConfig consoleConfig; + + /** + * Retrieve the schema content from the identified/named registry. + * + *

+ * Although opaque to the client, the schemaId is a base-64 encoded, JSON-ified + * {@link io.apicurio.registry.resolver.strategy.ArtifactReference ArtifactReference} + * which will be parsed with {@link ArtifactReferences#fromSchemaId}. + */ @GET @Produces(MediaType.APPLICATION_JSON) @APIResponse(responseCode = "200", ref = "Configurations", content = @Content()) @@ -49,35 +57,27 @@ public class SchemasResource { @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") public Response getSchemaContent( + @Parameter(description = "Schema registry identifier (name)") + @PathParam("registryId") + String registryId, + @Parameter(description = "Schema identifier") @PathParam("schemaId") String schemaId) { - int clusterTerm = schemaId.indexOf('.'); - - if (clusterTerm < 0) { - throw new NotFoundException("No such schema"); - } - - String clusterId = new String(Base64.getUrlDecoder().decode(schemaId.substring(0, clusterTerm))); + SchemaRegistryConfig registryConfig = consoleConfig.getSchemaRegistries() + .stream() + .filter(config -> config.getName().equals(registryId)) + .findFirst() + .orElseThrow(() -> new NotFoundException("Unknown registry")); - if (!kafkaContexts.containsKey(clusterId)) { - throw new NotFoundException("No such schema"); - } - - RegistryClient registryClient = kafkaContexts.get(clusterId).schemaRegistryContext().registryClient(); - - if (registryClient == null) { - throw new NotFoundException("Schema not found, no registry is configured"); - } + RegistryClient registryClient = RegistryClientFactory.create(registryConfig.getUrl()); @SuppressWarnings("resource") SchemaResolver schemaResolver = new DefaultSchemaResolver<>(); schemaResolver.setClient(registryClient); schemaResolver.configure(Collections.emptyMap(), new MultiformatSchemaParser<>(Collections.emptySet())); - schemaId = schemaId.substring(clusterTerm + 1); - var reference = ArtifactReferences.fromSchemaId(schemaId, objectMapper); var schema = schemaResolver.resolveSchemaByArtifactReference(reference); diff --git a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java index 09a2298f1..45f4dceb5 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/RecordService.java @@ -4,7 +4,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -333,15 +332,16 @@ Optional schemaRelationship(RecordData data) { .filter(recordMeta -> recordMeta.containsKey("schema-id")) .map(recordMeta -> { String artifactType = recordMeta.get("schema-type"); - String clusterIdEnc = Base64.getUrlEncoder().encodeToString(kafkaContext.clusterId().getBytes()); - String schemaId = clusterIdEnc + '.' + recordMeta.get("schema-id"); + // schema-id is present, it is null-safe to retrieve the name from configuration + String registryId = kafkaContext.schemaRegistryContext().getConfig().getName(); + String schemaId = recordMeta.get("schema-id"); String name = recordMeta.get("schema-name"); var relationship = new JsonApiRelationship(); relationship.addMeta("artifactType", artifactType); relationship.addMeta("name", name); relationship.data(new Identifier("schemas", schemaId)); - relationship.addLink("content", "/api/schemas/" + schemaId); + relationship.addLink("content", "/api/registries/%s/schemas/%s".formatted(registryId, schemaId)); schemaError(data).ifPresent(error -> relationship.addMeta("errors", List.of(error))); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index 1a8498cec..b577b5e99 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -20,8 +20,10 @@ import com.github.streamshub.console.api.support.serdes.MultiformatDeserializer; import com.github.streamshub.console.api.support.serdes.MultiformatSerializer; import com.github.streamshub.console.config.KafkaClusterConfig; +import com.github.streamshub.console.config.SchemaRegistryConfig; import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.client.RegistryClientFactory; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec; import io.strimzi.api.kafka.model.kafka.KafkaSpec; @@ -127,8 +129,8 @@ public boolean applicationScoped() { return applicationScoped; } - public void schemaRegistryClient(RegistryClient schemaRegistryClient, ObjectMapper objectMapper) { - schemaRegistryContext = new SchemaRegistryContext(schemaRegistryClient, objectMapper); + public void schemaRegistryClient(SchemaRegistryConfig config, ObjectMapper objectMapper) { + schemaRegistryContext = new SchemaRegistryContext(config, objectMapper); } public SchemaRegistryContext schemaRegistryContext() { @@ -167,14 +169,21 @@ public Optional tokenUrl() { * the parent KafkaContext is disposed of at application shutdown. */ public class SchemaRegistryContext implements Closeable { + private final SchemaRegistryConfig config; private final RegistryClient registryClient; private final MultiformatDeserializer keyDeserializer; private final MultiformatDeserializer valueDeserializer; private final MultiformatSerializer keySerializer; private final MultiformatSerializer valueSerializer; - SchemaRegistryContext(RegistryClient schemaRegistryClient, ObjectMapper objectMapper) { - this.registryClient = schemaRegistryClient; + SchemaRegistryContext(SchemaRegistryConfig config, ObjectMapper objectMapper) { + this.config = config; + + if (config != null) { + registryClient = RegistryClientFactory.create(config.getUrl()); + } else { + registryClient = null; + } keyDeserializer = new MultiformatDeserializer(registryClient, objectMapper); keyDeserializer.configure(configs(Consumer.class), true); @@ -189,6 +198,10 @@ public class SchemaRegistryContext implements Closeable { valueSerializer.configure(configs(Producer.class), false); } + public SchemaRegistryConfig getConfig() { + return config; + } + public RegistryClient registryClient() { return registryClient; } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/factories/ConsoleConfigFactory.java b/api/src/main/java/com/github/streamshub/console/api/support/factories/ConsoleConfigFactory.java new file mode 100644 index 000000000..9bd4aa5e9 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/factories/ConsoleConfigFactory.java @@ -0,0 +1,110 @@ +package com.github.streamshub.console.api.support.factories; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.github.streamshub.console.api.support.ValidationProxy; +import com.github.streamshub.console.config.ConsoleConfig; + +@Singleton +public class ConsoleConfigFactory { + + @Inject + @ConfigProperty(name = "console.config-path") + Optional configPath; + + @Inject + Logger log; + + @Inject + Config config; + + @Inject + ObjectMapper mapper; + + @Inject + ValidationProxy validationService; + + @Produces + @ApplicationScoped + public ConsoleConfig produceConsoleConfig() { + return configPath.map(Path::of) + .map(Path::toUri) + .map(uri -> { + try { + return uri.toURL(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }) + .filter(Objects::nonNull) + .map(url -> { + log.infof("Loading console configuration from %s", url); + ObjectMapper yamlMapper = mapper.copyWith(new YAMLFactory()); + + try (InputStream stream = url.openStream()) { + return yamlMapper.readValue(stream, ConsoleConfig.class); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }) + .map(consoleConfig -> { + consoleConfig.getSchemaRegistries().forEach(registry -> { + registry.setUrl(resolveValue(registry.getUrl())); + }); + + consoleConfig.getKafka().getClusters().forEach(cluster -> { + resolveValues(cluster.getProperties()); + resolveValues(cluster.getAdminProperties()); + resolveValues(cluster.getProducerProperties()); + resolveValues(cluster.getConsumerProperties()); + }); + + return consoleConfig; + }) + .map(validationService::validate) + .orElseGet(() -> { + log.warn("Console configuration has not been specified using `console.config-path` property"); + return new ConsoleConfig(); + }); + } + + private void resolveValues(Map properties) { + properties.entrySet().forEach(entry -> + entry.setValue(resolveValue(entry.getValue()))); + } + + /** + * If the given value is an expression referencing a configuration value, + * replace it with the target property value. + * + * @param value configuration value that may be a reference to another + * configuration property + * @return replacement property or the same value if the given string is not a + * reference. + */ + private String resolveValue(String value) { + if (value.startsWith("${") && value.endsWith("}")) { + String replacement = value.substring(2, value.length() - 1); + return config.getOptionalValue(replacement, String.class).orElse(value); + } + + return value; + } +} diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java index 5add0145e..75c0656c9 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java @@ -36,17 +36,21 @@ public Map getConfigOverrides() { var configFile = writeConfiguration(""" kubernetes: enabled: true + + schemaRegistries: + - name: test-registry + ### + # This is the property used by Dev Services for Apicurio Registry + # https://quarkus.io/guides/apicurio-registry-dev-services + ### + url: ${mp.messaging.connector.smallrye-kafka.apicurio.registry.url} + kafka: clusters: - name: test-kafka1 namespace: default id: k1-id - schemaRegistry: - ### - # This is the property used by Dev Services for Apicurio Registry - # https://quarkus.io/guides/apicurio-registry-dev-services - ### - url: ${mp.messaging.connector.smallrye-kafka.apicurio.registry.url} + schemaRegistry: test-registry properties: bootstrap.servers: ${console.test.external-bootstrap} diff --git a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java index 691eabd41..5c3c0f794 100644 --- a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java @@ -1,10 +1,24 @@ package com.github.streamshub.console.config; +import java.util.ArrayList; +import java.util.List; + +import jakarta.validation.constraints.AssertTrue; + +import com.fasterxml.jackson.annotation.JsonIgnore; + public class ConsoleConfig { KubernetesConfig kubernetes = new KubernetesConfig(); + List schemaRegistries = new ArrayList<>(); KafkaConfig kafka = new KafkaConfig(); + @JsonIgnore + @AssertTrue(message = "Schema registry names must be unique") + public boolean hasUniqueRegistryNames() { + return schemaRegistries.stream().map(SchemaRegistryConfig::getName).distinct().count() == schemaRegistries.size(); + } + public KubernetesConfig getKubernetes() { return kubernetes; } @@ -13,6 +27,14 @@ public void setKubernetes(KubernetesConfig kubernetes) { this.kubernetes = kubernetes; } + public List getSchemaRegistries() { + return schemaRegistries; + } + + public void setSchemaRegistries(List schemaRegistries) { + this.schemaRegistries = schemaRegistries; + } + public KafkaConfig getKafka() { return kafka; } diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index ca78ce5a8..0d27c0ad8 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -14,7 +14,11 @@ public class KafkaClusterConfig { private String name; private String namespace; private String listener; - private SchemaRegistryConfig schemaRegistry; + /** + * Name of a configured schema registry that will be used to ser/des configurations + * with this Kafka cluster. + */ + private String schemaRegistry; private Map properties = new LinkedHashMap<>(); private Map adminProperties = new LinkedHashMap<>(); private Map consumerProperties = new LinkedHashMap<>(); @@ -62,11 +66,11 @@ public void setListener(String listener) { this.listener = listener; } - public SchemaRegistryConfig getSchemaRegistry() { + public String getSchemaRegistry() { return schemaRegistry; } - public void setSchemaRegistry(SchemaRegistryConfig schemaRegistry) { + public void setSchemaRegistry(String schemaRegistry) { this.schemaRegistry = schemaRegistry; } diff --git a/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java b/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java index 0bcff0b4c..0d4dc18ec 100644 --- a/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/SchemaRegistryConfig.java @@ -4,9 +4,20 @@ public class SchemaRegistryConfig { + @NotBlank(message = "Schema registry `name` is required") + String name; + @NotBlank(message = "Schema registry `url` is required") String url; + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + public String getUrl() { return url; } diff --git a/console-config-example.yaml b/console-config-example.yaml index be0800351..61487fcb8 100644 --- a/console-config-example.yaml +++ b/console-config-example.yaml @@ -3,15 +3,19 @@ kubernetes: # Kafka and KafkaTopic custom resources. Enabled by default enabled: true +schemaRegistries: + # Array of Apicurio Registries that my be referenced by Kafka cluster configurations + # to resolve Avro or Protobuf schemas for topic message browsing + - name: "my-apicurio-registry" + url: "http://registry.exampl.com/apis/registry/v2/" + kafka: clusters: - name: my-kafka1 # name of the Strimzi Kafka CR namespace: my-namespace1 # namespace of the Strimzi Kafka CR (optional) id: my-kafka1-id # value to be used as an identifier for the cluster. Must be specified when namespace is not. listener: "secure" # name of the listener to use for connections from the console - # This object may contain a `url` string property with the Apicurio Registry API path to be used to resolve - # Avro or Protobuf schemas for topic message browsing - schemaRegistry: { } + schemaRegistry: "my-apicurio-registry" # name of the schema registry to use with this Kafka (optional) # `properties` contains keys/values to use for any Kafka connection properties: security.protocol: SASL_SSL diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConsoleSpec.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConsoleSpec.java index 0194ea1cf..a09f1fd16 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConsoleSpec.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ConsoleSpec.java @@ -18,6 +18,8 @@ public class ConsoleSpec { Images images = new Images(); + List schemaRegistries; + List kafkaClusters = new ArrayList<>(); // TODO: copy EnvVar into console's API to avoid unexpected changes @@ -39,6 +41,14 @@ public void setImages(Images images) { this.images = images; } + public List getSchemaRegistries() { + return schemaRegistries; + } + + public void setSchemaRegistries(List schemaRegistries) { + this.schemaRegistries = schemaRegistries; + } + public List getKafkaClusters() { return kafkaClusters; } diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java index e2cdfffb5..1f8f13745 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java @@ -52,11 +52,10 @@ public class KafkaCluster { private Credentials credentials; @JsonPropertyDescription(""" - Configuration for a connection to an Apicurio Registry instance \ - to use for serializing and de-serializing records written to or read \ - from this Kafka cluster. + Name of a configured Apicurio Registry instance to use for serializing \ + and de-serializing records written to or read from this Kafka cluster. """) - private SchemaRegistry schemaRegistry; + private String schemaRegistry; private ConfigVars properties = new ConfigVars(); @@ -106,11 +105,11 @@ public void setCredentials(Credentials credentials) { this.credentials = credentials; } - public SchemaRegistry getSchemaRegistry() { + public String getSchemaRegistry() { return schemaRegistry; } - public void setSchemaRegistry(SchemaRegistry schemaRegistry) { + public void setSchemaRegistry(String schemaRegistry) { this.schemaRegistry = schemaRegistry; } diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java index 01b627bb5..ee79587a6 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java @@ -10,10 +10,26 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public class SchemaRegistry { + @Required + @JsonPropertyDescription(""" + Name of the Apicurio Registry. The name may be referenced by Kafka clusters \ + configured in the console to indicate that a particular registry is to be \ + used for message deserialization when browsing topics within that cluster. + """) + private String name; + @Required @JsonPropertyDescription("URL of the Apicurio Registry server API.") private String url; + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + public String getUrl() { return url; } diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java index 4b95f6879..fbe2ba3b1 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java @@ -9,12 +9,14 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -30,6 +32,7 @@ import com.github.streamshub.console.api.v1alpha1.spec.ConfigVars; import com.github.streamshub.console.api.v1alpha1.spec.Credentials; import com.github.streamshub.console.api.v1alpha1.spec.KafkaCluster; +import com.github.streamshub.console.api.v1alpha1.spec.SchemaRegistry; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; import com.github.streamshub.console.config.SchemaRegistryConfig; @@ -112,9 +115,20 @@ private static String base64String(int length) { return new String(buffer.toByteArray()).substring(0, length); } + private static List coalesce(List value, Supplier> defaultValue) { + return value != null ? value : defaultValue.get(); + } + private ConsoleConfig buildConfig(Console primary, Context context) { ConsoleConfig config = new ConsoleConfig(); + for (SchemaRegistry registry : coalesce(primary.getSpec().getSchemaRegistries(), Collections::emptyList)) { + var registryConfig = new SchemaRegistryConfig(); + registryConfig.setName(registry.getName()); + registryConfig.setUrl(registry.getUrl()); + config.getSchemaRegistries().add(registryConfig); + } + for (var kafkaRef : primary.getSpec().getKafkaClusters()) { addConfig(primary, context, config, kafkaRef); } @@ -132,12 +146,7 @@ private void addConfig(Console primary, Context context, ConsoleConfig kcConfig.setNamespace(namespace); kcConfig.setName(name); kcConfig.setListener(listenerName); - - if (kafkaRef.getSchemaRegistry() != null) { - SchemaRegistryConfig registry = new SchemaRegistryConfig(); - registry.setUrl(kafkaRef.getSchemaRegistry().getUrl()); - kcConfig.setSchemaRegistry(registry); - } + kcConfig.setSchemaRegistry(kafkaRef.getSchemaRegistry()); config.getKubernetes().setEnabled(Objects.nonNull(namespace)); config.getKafka().getClusters().add(kcConfig); diff --git a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java index 9e41ae26a..ce720f337 100644 --- a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java +++ b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java @@ -607,13 +607,15 @@ void testConsoleReconciliationWithSchemaRegistryUrl() { .build()) .withNewSpec() .withHostname("example.com") + .addNewSchemaRegistry() + .withName("example-registry") + .withUrl("http://example.com/apis/registry/v2") + .endSchemaRegistry() .addNewKafkaCluster() .withName(kafkaCR.getMetadata().getName()) .withNamespace(kafkaCR.getMetadata().getNamespace()) .withListener(kafkaCR.getSpec().getKafka().getListeners().get(0).getName()) - .withNewSchemaRegistry() - .withUrl("http://example.com/apis/registry/v2") - .endSchemaRegistry() + .withSchemaRegistry("example-registry") .endKafkaCluster() .endSpec() .build(); @@ -635,10 +637,16 @@ void testConsoleReconciliationWithSchemaRegistryUrl() { assertNotNull(consoleSecret); String configEncoded = consoleSecret.getData().get("console-config.yaml"); byte[] configDecoded = Base64.getDecoder().decode(configEncoded); - ConsoleConfig consoleConfig = new ObjectMapper().readValue(configDecoded, ConsoleConfig.class); - String registryUrl = consoleConfig.getKafka().getClusters().get(0).getSchemaRegistry().getUrl(); Logger.getLogger(getClass()).infof("config YAML: %s", new String(configDecoded)); + ConsoleConfig consoleConfig = new ObjectMapper().readValue(configDecoded, ConsoleConfig.class); + + String registryName = consoleConfig.getSchemaRegistries().get(0).getName(); + assertEquals("example-registry", registryName); + String registryUrl = consoleConfig.getSchemaRegistries().get(0).getUrl(); assertEquals("http://example.com/apis/registry/v2", registryUrl); + + String registryNameRef = consoleConfig.getKafka().getClusters().get(0).getSchemaRegistry(); + assertEquals("example-registry", registryNameRef); }); }