diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java index 1d3323971..e2cdfffb5 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java @@ -51,6 +51,13 @@ public class KafkaCluster { private Credentials credentials; + @JsonPropertyDescription(""" + Configuration for a connection to an Apicurio Registry instance \ + to use for serializing and de-serializing records written to or read \ + from this Kafka cluster. + """) + private SchemaRegistry schemaRegistry; + private ConfigVars properties = new ConfigVars(); private ConfigVars adminProperties = new ConfigVars(); @@ -99,6 +106,14 @@ public void setCredentials(Credentials credentials) { this.credentials = credentials; } + public SchemaRegistry getSchemaRegistry() { + return schemaRegistry; + } + + public void setSchemaRegistry(SchemaRegistry schemaRegistry) { + this.schemaRegistry = schemaRegistry; + } + public ConfigVars getProperties() { return properties; } diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java new file mode 100644 index 000000000..01b627bb5 --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java @@ -0,0 +1,24 @@ +package com.github.streamshub.console.api.v1alpha1.spec; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; + +import io.fabric8.generator.annotation.Required; +import io.sundr.builder.annotations.Buildable; + +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SchemaRegistry { + + @Required + @JsonPropertyDescription("URL of the Apicurio Registry server API.") + private String url; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } +} diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java index d9dc43ced..ea492ff3e 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java @@ -29,6 +29,7 @@ import com.github.streamshub.console.api.v1alpha1.spec.KafkaCluster; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; +import com.github.streamshub.console.config.SchemaRegistryConfig; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -129,6 +130,12 @@ private void addConfig(Console primary, Context context, ConsoleConfig kcConfig.setName(name); kcConfig.setListener(listenerName); + if (kafkaRef.getSchemaRegistry() != null) { + SchemaRegistryConfig registry = new SchemaRegistryConfig(); + registry.setUrl(kafkaRef.getSchemaRegistry().getUrl()); + kcConfig.setSchemaRegistry(registry); + } + config.getKubernetes().setEnabled(Objects.nonNull(namespace)); config.getKafka().getClusters().add(kcConfig); diff --git a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java index 95893d005..123e25b4f 100644 --- a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java +++ b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java @@ -71,12 +71,14 @@ void setUp() throws Exception { var allConsoles = client.resources(Console.class).inAnyNamespace(); var allKafkas = client.resources(Kafka.class).inAnyNamespace(); var allKafkaUsers = client.resources(KafkaUser.class).inAnyNamespace(); + var allDeployments = client.resources(Deployment.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL); var allConfigMaps = client.resources(ConfigMap.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL); var allSecrets = client.resources(Secret.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL); allConsoles.delete(); allKafkas.delete(); allKafkaUsers.delete(); + allDeployments.delete(); allConfigMaps.delete(); allSecrets.delete(); @@ -84,6 +86,7 @@ void setUp() throws Exception { assertTrue(allConsoles.list().getItems().isEmpty()); assertTrue(allKafkas.list().getItems().isEmpty()); assertTrue(allKafkaUsers.list().getItems().isEmpty()); + assertTrue(allDeployments.list().getItems().isEmpty()); assertTrue(allConfigMaps.list().getItems().isEmpty()); assertTrue(allSecrets.list().getItems().isEmpty()); }); @@ -499,7 +502,6 @@ void testConsoleReconciliationWithValidKafkaUser() { }); } - @Test void testConsoleReconciliationWithKafkaProperties() { client.resource(new ConfigMapBuilder() @@ -579,6 +581,50 @@ void testConsoleReconciliationWithKafkaProperties() { }); } + @Test + void testConsoleReconciliationWithSchemaRegistryUrl() { + Console consoleCR = new ConsoleBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName("console-1") + .withNamespace("ns2") + .build()) + .withNewSpec() + .withHostname("example.com") + .addNewKafkaCluster() + .withName(kafkaCR.getMetadata().getName()) + .withNamespace(kafkaCR.getMetadata().getNamespace()) + .withListener(kafkaCR.getSpec().getKafka().getListeners().get(0).getName()) + .withNewSchemaRegistry() + .withUrl("http://example.com/apis/registry/v2") + .endSchemaRegistry() + .endKafkaCluster() + .endSpec() + .build(); + + client.resource(consoleCR).create(); + + await().ignoreException(NullPointerException.class).atMost(LIMIT).untilAsserted(() -> { + var console = client.resources(Console.class) + .inNamespace(consoleCR.getMetadata().getNamespace()) + .withName(consoleCR.getMetadata().getName()) + .get(); + assertEquals(1, console.getStatus().getConditions().size()); + var ready = console.getStatus().getConditions().get(0); + assertEquals("Ready", ready.getType()); + assertEquals("False", ready.getStatus()); + assertEquals("DependentsNotReady", ready.getReason()); + + var consoleSecret = client.secrets().inNamespace("ns2").withName("console-1-" + ConsoleSecret.NAME).get(); + assertNotNull(consoleSecret); + String configEncoded = consoleSecret.getData().get("console-config.yaml"); + byte[] configDecoded = Base64.getDecoder().decode(configEncoded); + ConsoleConfig consoleConfig = new ObjectMapper().readValue(configDecoded, ConsoleConfig.class); + String registryUrl = consoleConfig.getKafka().getClusters().get(0).getSchemaRegistry().getUrl(); + Logger.getLogger(getClass()).infof("config YAML: %s", new String(configDecoded)); + assertEquals("http://example.com/apis/registry/v2", registryUrl); + }); + } + // Utility private Deployment setReady(Deployment deployment) {