Skip to content

Commit

Permalink
Add registry configuration to CRD, map to console YAML secret
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 15, 2024
1 parent 80d19e8 commit 1285d5f
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public class KafkaCluster {

private Credentials credentials;

@JsonPropertyDescription("""
Configuration for a connection to an Apicurio Registry instance \
to use for serializing and de-serializing records written to or read \
from this Kafka cluster.
""")
private SchemaRegistry schemaRegistry;

private ConfigVars properties = new ConfigVars();

private ConfigVars adminProperties = new ConfigVars();
Expand Down Expand Up @@ -99,6 +106,14 @@ public void setCredentials(Credentials credentials) {
this.credentials = credentials;
}

public SchemaRegistry getSchemaRegistry() {
return schemaRegistry;
}

public void setSchemaRegistry(SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
}

public ConfigVars getProperties() {
return properties;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.github.streamshub.console.api.v1alpha1.spec;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.fabric8.generator.annotation.Required;
import io.sundr.builder.annotations.Buildable;

@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder")
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SchemaRegistry {

@Required
@JsonPropertyDescription("URL of the Apicurio Registry server API.")
private String url;

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.github.streamshub.console.api.v1alpha1.spec.KafkaCluster;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;
import com.github.streamshub.console.config.SchemaRegistryConfig;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
Expand Down Expand Up @@ -129,6 +130,12 @@ private void addConfig(Console primary, Context<Console> context, ConsoleConfig
kcConfig.setName(name);
kcConfig.setListener(listenerName);

if (kafkaRef.getSchemaRegistry() != null) {
SchemaRegistryConfig registry = new SchemaRegistryConfig();
registry.setUrl(kafkaRef.getSchemaRegistry().getUrl());
kcConfig.setSchemaRegistry(registry);
}

config.getKubernetes().setEnabled(Objects.nonNull(namespace));
config.getKafka().getClusters().add(kcConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,22 @@ void setUp() throws Exception {
var allConsoles = client.resources(Console.class).inAnyNamespace();
var allKafkas = client.resources(Kafka.class).inAnyNamespace();
var allKafkaUsers = client.resources(KafkaUser.class).inAnyNamespace();
var allDeployments = client.resources(Deployment.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL);
var allConfigMaps = client.resources(ConfigMap.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL);
var allSecrets = client.resources(Secret.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL);

allConsoles.delete();
allKafkas.delete();
allKafkaUsers.delete();
allDeployments.delete();
allConfigMaps.delete();
allSecrets.delete();

await().atMost(LIMIT).untilAsserted(() -> {
assertTrue(allConsoles.list().getItems().isEmpty());
assertTrue(allKafkas.list().getItems().isEmpty());
assertTrue(allKafkaUsers.list().getItems().isEmpty());
assertTrue(allDeployments.list().getItems().isEmpty());
assertTrue(allConfigMaps.list().getItems().isEmpty());
assertTrue(allSecrets.list().getItems().isEmpty());
});
Expand Down Expand Up @@ -499,7 +502,6 @@ void testConsoleReconciliationWithValidKafkaUser() {
});
}


@Test
void testConsoleReconciliationWithKafkaProperties() {
client.resource(new ConfigMapBuilder()
Expand Down Expand Up @@ -579,6 +581,50 @@ void testConsoleReconciliationWithKafkaProperties() {
});
}

@Test
void testConsoleReconciliationWithSchemaRegistryUrl() {
Console consoleCR = new ConsoleBuilder()
.withMetadata(new ObjectMetaBuilder()
.withName("console-1")
.withNamespace("ns2")
.build())
.withNewSpec()
.withHostname("example.com")
.addNewKafkaCluster()
.withName(kafkaCR.getMetadata().getName())
.withNamespace(kafkaCR.getMetadata().getNamespace())
.withListener(kafkaCR.getSpec().getKafka().getListeners().get(0).getName())
.withNewSchemaRegistry()
.withUrl("http://example.com/apis/registry/v2")
.endSchemaRegistry()
.endKafkaCluster()
.endSpec()
.build();

client.resource(consoleCR).create();

await().ignoreException(NullPointerException.class).atMost(LIMIT).untilAsserted(() -> {
var console = client.resources(Console.class)
.inNamespace(consoleCR.getMetadata().getNamespace())
.withName(consoleCR.getMetadata().getName())
.get();
assertEquals(1, console.getStatus().getConditions().size());
var ready = console.getStatus().getConditions().get(0);
assertEquals("Ready", ready.getType());
assertEquals("False", ready.getStatus());
assertEquals("DependentsNotReady", ready.getReason());

var consoleSecret = client.secrets().inNamespace("ns2").withName("console-1-" + ConsoleSecret.NAME).get();
assertNotNull(consoleSecret);
String configEncoded = consoleSecret.getData().get("console-config.yaml");
byte[] configDecoded = Base64.getDecoder().decode(configEncoded);
ConsoleConfig consoleConfig = new ObjectMapper().readValue(configDecoded, ConsoleConfig.class);
String registryUrl = consoleConfig.getKafka().getClusters().get(0).getSchemaRegistry().getUrl();
Logger.getLogger(getClass()).infof("config YAML: %s", new String(configDecoded));
assertEquals("http://example.com/apis/registry/v2", registryUrl);
});
}

// Utility

private Deployment setReady(Deployment deployment) {
Expand Down

0 comments on commit 1285d5f

Please sign in to comment.