diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index cccbd5e54..b789ad453 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -8,6 +8,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -297,7 +298,7 @@ void putKafkaContext(Map contexts, KafkaClusterConfig clusterConfig, Optional kafkaResource, Function, Admin> adminBuilder, - boolean replace) { + boolean allowReplacement) { var adminConfigs = buildConfig(AdminClientConfig.configNames(), clusterConfig, @@ -330,38 +331,29 @@ void putKafkaContext(Map contexts, clientConfigs.put(Consumer.class, Collections.unmodifiableMap(consumerConfigs)); clientConfigs.put(Producer.class, Collections.unmodifiableMap(producerConfigs)); - Admin admin = null; - - if (establishGlobalConnection(adminConfigs)) { - admin = adminBuilder.apply(adminConfigs); - } - String clusterKey = clusterConfig.clusterKey(); String clusterId = clusterId(clusterConfig, kafkaResource); - if (!replace && contexts.containsKey(clusterId)) { + if (contexts.containsKey(clusterId) && !allowReplacement) { log.warnf(""" Ignoring duplicate Kafka cluster id: %s for cluster %s. Cluster id values in \ configuration must be unique and may not match id values of \ clusters discovered using Strimzi Kafka Kubernetes API resources.""", clusterId, clusterKey); - } else { - boolean truststoreNowRequired = truststoreRequired(adminConfigs); + } else if (validConfigs(kafkaResource, clusterConfig, clientConfigs)) { + Admin admin = null; - if (resourceStatusDroppedCertificates(contexts.get(clusterId), kafkaResource, truststoreNowRequired)) { - log.warnf(""" - Ignoring update to Kafka custom resource %s. Connection requires \ - trusted certificate which is no longer available.""", clusterKey); - } else { - if (truststoreNowRequired && kafkaResource.isPresent()) { - log.warnf(""" - Connection requires trusted certificate(s) which are not present \ - in the Kafka CR status of resource %s.""", clusterKey); - } + if (establishGlobalConnection(adminConfigs)) { + admin = adminBuilder.apply(adminConfigs); + } - KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); - log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId); - KafkaContext previous = contexts.put(clusterId, ctx); - Optional.ofNullable(previous).ifPresent(KafkaContext::close); + KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin); + KafkaContext previous = contexts.put(clusterId, ctx); + + if (previous == null) { + log.infof("Added KafkaContext for cluster %s, id=%s", clusterKey, clusterId); + } else { + log.infof("Replaced KafkaContext for cluster %s, id=%s", clusterKey, clusterId); + previous.close(); } } } @@ -376,19 +368,6 @@ String clusterId(KafkaClusterConfig clusterConfig, Optional kafkaResource .orElseGet(clusterConfig::getName); } - /** - * Checks whether the previous KafkaContext contained TLS trusted certificates, but due to them being - * removed from the Strimzi Kafka CR being in a transient state, they are no longer present. We will ignore - * this update and keep the old KafkaContext. - */ - boolean resourceStatusDroppedCertificates(KafkaContext context, Optional kafkaResource, boolean truststoreNowRequired) { - if (!truststoreNowRequired || context == null || kafkaResource.isEmpty()) { - return false; - } - - return !truststoreRequired(context.configs(Admin.class)); - } - Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore) .map(store -> store.getByKey(clusterConfig.clusterKey())) @@ -405,6 +384,45 @@ Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { }) : Optional.empty(); } + boolean validConfigs(Optional kafkaResource, KafkaClusterConfig clusterConfig, Map, Map> clientConfigs) { + boolean createContext = true; + boolean missingListenerStatus = kafkaResource.isPresent() + && getListenerStatus(kafkaResource, clusterConfig.getListener()).isEmpty(); + StringBuilder clientsMessage = new StringBuilder(); + + if (missingListenerStatus) { + clientsMessage.append("; Kafka resource has no status for listener '%s', bootstrap servers and trusted certificates could not be derived" + .formatted(clusterConfig.getListener())); + } + + for (Map.Entry, Map> client : clientConfigs.entrySet()) { + Class clientType = client.getKey(); + Set missing = findMissingRequiredConfigs(client.getValue()); + + if (!missing.isEmpty()) { + clientsMessage.append("; %s client is missing required properties: %s" + .formatted(clientType.getSimpleName(), missing)); + createContext = false; + } + + if (truststoreRequired(client.getValue())) { + clientsMessage.append(""" + ; %s client is a secure/SSL connection, but no truststore configuration is available. \ + The connection may fail if the Kafka cluster is using an untrusted certificate""".formatted(clientType.getSimpleName())); + } + } + + if (createContext) { + clientsMessage.insert(0, "Some configuration may be missing for connection to cluster %s, connection attempts may fail".formatted(clusterConfig.clusterKey())); + log.warn(clientsMessage.toString().trim()); + } else { + clientsMessage.insert(0, "Missing configuration detected for connection to cluster %s, no connection will be setup".formatted(clusterConfig.clusterKey())); + log.error(clientsMessage.toString().trim()); + } + + return createContext; + } + Map requiredAdminConfig() { Map configs = new HashMap<>(); configs.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000); @@ -450,6 +468,20 @@ static boolean establishGlobalConnection(Map configs) { return false; } + static Set findMissingRequiredConfigs(Map configs) { + Set missing = new LinkedHashSet<>(); + + if (!configs.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) { + missing.add(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + } + + if (configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG) && !configs.containsKey(SaslConfigs.SASL_MECHANISM)) { + missing.add(SaslConfigs.SASL_MECHANISM); + } + + return missing; + } + void disposeKafkaContexts(@Disposes Map contexts) { log.infof("Closing all known KafkaContexts"); @@ -579,12 +611,7 @@ Map buildConfig(Set configNames, .filter(listener -> listener.getName().equals(config.getListener())) .findFirst(); - var listenerStatus = cluster.map(Kafka::getStatus) - .map(KafkaStatus::getListeners) - .map(Collection::stream) - .orElseGet(Stream::empty) - .filter(listener -> listener.getName().equals(config.getListener())) - .findFirst(); + var listenerStatus = getListenerStatus(cluster, config.getListener()); listenerSpec.ifPresent(listener -> applyListenerConfiguration(cfg, listener)); @@ -600,15 +627,10 @@ Map buildConfig(Set configNames, cfg.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certificates); }); - if (truststoreRequired(cfg)) { - if (cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) && trustManager.isResolvable()) { - trustManager.get().trustClusterCertificate(cfg); - } else { - log.warnf(""" - Failed to set configuration for %s client to Kafka cluster %s. Connection \ - requires truststore which could not be obtained from the Kafka resource status.""" - .formatted(clientType, config.clusterKey())); - } + if (truststoreRequired(cfg) && trustManager.isResolvable() + && cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) { + // trustManager is only resolvable in 'dev' mode + trustManager.get().trustClusterCertificate(cfg); } cfg.putAll(overrideProperties); @@ -622,6 +644,15 @@ Map buildConfig(Set configNames, return cfg; } + Optional getListenerStatus(Optional cluster, String listenerName) { + return cluster.map(Kafka::getStatus) + .map(KafkaStatus::getListeners) + .map(Collection::stream) + .orElseGet(Stream::empty) + .filter(listener -> listener.getName().equals(listenerName)) + .findFirst(); + } + private void applyListenerConfiguration(Map cfg, GenericKafkaListener listener) { var authType = Optional.ofNullable(listener.getAuth()) .map(KafkaListenerAuthentication::getType) diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java index 2cabacdc8..71d186103 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java @@ -2,7 +2,6 @@ import java.io.IOException; import java.net.URI; -import java.time.Duration; import java.util.Base64; import java.util.HashMap; import java.util.List; @@ -15,7 +14,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import jakarta.enterprise.util.AnnotationLiteral; +import jakarta.enterprise.inject.literal.NamedLiteral; import jakarta.enterprise.util.TypeLiteral; import jakarta.inject.Inject; import jakarta.json.Json; @@ -44,6 +43,7 @@ import com.github.streamshub.console.api.support.Holder; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.config.KafkaClusterConfig; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; import com.github.streamshub.console.test.AdminClientSpy; @@ -90,7 +90,11 @@ @TestProfile(TestPlainProfile.class) class KafkaClustersResourceIT { - private static final List STATIC_KAFKAS = List.of("test-kafka1", "test-kafka2"); + /** + * List of Kafka clusters that are always created. test-kafkaY is configured in {@link TestPlainProfile} + * but has no associated Strimzi Kafka CR. + */ + private static final List STATIC_KAFKAS = List.of("test-kafka1", "test-kafka2", "test-kafkaY"); @Inject Config config; @@ -160,9 +164,8 @@ void setup() throws IOException { .endStatus() .build()); - // Wait for the informer cache to be populated with all Kafka CRs - await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), 2)); + // Wait for the context map to be populated with all Kafka configurations + await().atMost(10, TimeUnit.SECONDS).until(() -> configuredContexts.size() == STATIC_KAFKAS.size()); clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); @@ -187,9 +190,9 @@ void testListClusters() { whenRequesting(req -> req.queryParam("fields[kafkas]", "name,status,nodePools,listeners").get()) .assertThat() .statusCode(is(Status.OK.getStatusCode())) - .body("data.size()", equalTo(2)) - .body("data.id", containsInAnyOrder(clusterId1, clusterId2)) - .body("data.attributes.name", containsInAnyOrder("test-kafka1", "test-kafka2")) + .body("data.size()", equalTo(STATIC_KAFKAS.size())) + .body("data.id", containsInAnyOrder(clusterId1, clusterId2, "test-kafkaY")) + .body("data.attributes.name", containsInAnyOrder("test-kafka1", "test-kafka2", "test-kafkaY")) .body("data.find { it.attributes.name == 'test-kafka1'}.attributes.status", is("Ready")) .body("data.find { it.attributes.name == 'test-kafka1'}.attributes.nodePools", contains("my-node-pool")) .body("data.find { it.attributes.name == 'test-kafka1'}.attributes.listeners", hasItem(allOf( @@ -198,7 +201,9 @@ void testListClusters() { .body("data.find { it.attributes.name == 'test-kafka2'}.attributes.status", is("NotReady")) .body("data.find { it.attributes.name == 'test-kafka2'}.attributes.listeners", hasItem(allOf( hasEntry(equalTo("bootstrapServers"), equalTo(k2Bootstrap)), - hasEntry(equalTo("authType"), nullValue(String.class))))); + hasEntry(equalTo("authType"), nullValue(String.class))))) + .body("data.find { it.attributes.name == 'test-kafkaY'}.attributes.status", is(nullValue())) + .body("data.find { it.attributes.name == 'test-kafkaY'}.attributes.listeners", is(nullValue())); } @Test @@ -209,16 +214,6 @@ void testListClustersWithInformerError() { private static final long serialVersionUID = 1L; }; - @SuppressWarnings("all") - class NamedLiteral extends AnnotationLiteral implements jakarta.inject.Named { - private static final long serialVersionUID = 1L; - - @Override - public String value() { - return "KafkaInformer"; - } - } - // Force an unhandled exception Mockito.when(informer.getStore()).thenThrow(new RuntimeException("EXPECTED TEST EXCEPTION") { private static final long serialVersionUID = 1L; @@ -229,7 +224,7 @@ public synchronized Throwable fillInStackTrace() { } }); - QuarkusMock.installMockForType(Holder.of(informer), informerType, new NamedLiteral()); + QuarkusMock.installMockForType(Holder.of(informer), informerType, NamedLiteral.of("KafkaInformer")); whenRequesting(req -> req.get()) .assertThat() @@ -241,14 +236,14 @@ public synchronized Throwable fillInStackTrace() { @ParameterizedTest @CsvSource({ - "'name' , 'test-kafka1,test-kafka2'", - "'-name', 'test-kafka2,test-kafka1'" + "'name' , 'test-kafka1,test-kafka2,test-kafkaY'", + "'-name', 'test-kafkaY,test-kafka2,test-kafka1'" }) void testListClustersSortedByName(String sortParam, String expectedNameList) { whenRequesting(req -> req.queryParam("sort", sortParam).get()) .assertThat() .statusCode(is(Status.OK.getStatusCode())) - .body("data.size()", is(2)) + .body("data.size()", is(STATIC_KAFKAS.size())) .body("data.attributes.name", contains(expectedNameList.split(","))); } @@ -257,8 +252,8 @@ void testListClustersContainsPageMetaData() { whenRequesting(req -> req.get()) .assertThat() .statusCode(is(Status.OK.getStatusCode())) - .body("meta.page.total", is(2)) - .body("data.size()", is(2)) + .body("meta.page.total", is(STATIC_KAFKAS.size())) + .body("data.size()", is(STATIC_KAFKAS.size())) .body("data.meta.page", everyItem(hasKey(equalTo("cursor")))); } @@ -275,15 +270,15 @@ void testListClustersWithRangePaginationTruncated() { .map(kafka -> kafka.getMetadata().getName())) .toList(); - // Wait for the informer cache to be populated with all Kafka CRs + // Wait for the informer cache to be populated with all Kafka CRs (subtract 1 to account for kafkaY w/o CR) await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), allKafkaNames.size())); + .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), allKafkaNames.size() - 1)); var fullResponse = whenRequesting(req -> req.queryParam("sort", "name").get()) .assertThat() .statusCode(is(Status.OK.getStatusCode())) - .body("meta.page.total", is(9)) - .body("data.size()", is(9)) + .body("meta.page.total", is(allKafkaNames.size())) + .body("data.size()", is(allKafkaNames.size())) .body("data.meta.page", everyItem(hasKey(equalTo("cursor")))) .extract() .asInputStream(); @@ -306,7 +301,7 @@ void testListClustersWithRangePaginationTruncated() { .get()) .assertThat() .statusCode(is(Status.OK.getStatusCode())) - .body("meta.page.total", is(9)) // total is the count for the full/unpaged result set + .body("meta.page.total", is(allKafkaNames.size())) // total is the count for the full/unpaged result set // requested range has 7 recs, but truncated to 6 to satisfy page[size] .body("meta.page.rangeTruncated", is(true)) .body("data.size()", is(6)) @@ -316,7 +311,7 @@ void testListClustersWithRangePaginationTruncated() { @ParameterizedTest @CsvSource({ "1, 8, , 6", // skip first two and last one (6 remain) - "1, , 10, 7", // skip first two (7 remain) + "1, , 10, 8", // skip first two (8 remain) " , 8, , 8", // skip last one (8 remain) " , 8, 5, 5", // skip last one (8 remain), limit to 5 on page }) @@ -330,17 +325,18 @@ void testListClustersWithPaginationCursors(Integer afterIndex, Integer beforeInd .map(name -> utils.buildKafkaResource(name, randomBootstrapServers)) .map(kafka -> utils.apply(client, kafka)) .map(kafka -> kafka.getMetadata().getName())) - .toList(); + .sorted() + .toList(); - // Wait for the informer cache to be populated with all Kafka CRs + // Wait for the informer cache to be populated with all Kafka CRs (subtract 1 to account for kafkaY w/o CR) await().atMost(10, TimeUnit.SECONDS) - .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), allKafkaNames.size())); + .until(() -> Objects.equals(kafkaInformer.get().getStore().list().size(), allKafkaNames.size() - 1)); var fullResponse = whenRequesting(req -> req.queryParam("sort", "name").get()) .assertThat() .statusCode(is(Status.OK.getStatusCode())) - .body("meta.page.total", is(9)) - .body("data.size()", is(9)) + .body("meta.page.total", is(allKafkaNames.size())) + .body("data.size()", is(allKafkaNames.size())) .body("data.meta.page", everyItem(hasKey(equalTo("cursor")))) .extract() .asInputStream(); @@ -367,7 +363,7 @@ void testListClustersWithPaginationCursors(Integer afterIndex, Integer beforeInd .get()) .assertThat() .statusCode(is(Status.OK.getStatusCode())) - .body("meta.page.total", is(9)) // total is the count for the full/unpaged result set + .body("meta.page.total", is(allKafkaNames.size())) // total is the count for the full/unpaged result set .body("data.size()", is(expectedResultCount)) .body("data.meta.page", everyItem(hasKey(equalTo("cursor")))) .extract() @@ -546,8 +542,13 @@ void testDescribeClusterWithCertificates() { utils.apply(client, kafka); - await().atMost(Duration.ofSeconds(5)) - .until(() -> configuredContexts.containsKey(clusterId)); + // Wait for the added cluster to be configured in the context map + await().atMost(10, TimeUnit.SECONDS) + .until(() -> configuredContexts.values() + .stream() + .map(KafkaContext::clusterConfig) + .map(KafkaClusterConfig::clusterKey) + .anyMatch(Cache.metaNamespaceKeyFunc(kafka)::equals)); whenRequesting(req -> req.get("{clusterId}", clusterId)) .assertThat() @@ -582,17 +583,13 @@ void testDescribeClusterWithTLSMissingCertificates() { utils.apply(client, kafka); - await().atMost(Duration.ofSeconds(5)) - .until(() -> configuredContexts.containsKey(clusterId)); - - await().atMost(Duration.ofSeconds(5)) - .until(() -> kafkaInformer.get() - .getStore() - .list() + // Wait for the added cluster to be configured in the context map + await().atMost(10, TimeUnit.SECONDS) + .until(() -> configuredContexts.values() .stream() - .anyMatch(k -> Objects.equals( - Cache.metaNamespaceKeyFunc(kafka), - Cache.metaNamespaceKeyFunc(k)))); + .map(KafkaContext::clusterConfig) + .map(KafkaClusterConfig::clusterKey) + .anyMatch(Cache.metaNamespaceKeyFunc(kafka)::equals)); whenRequesting(req -> req.get("{clusterId}", clusterId)) .assertThat() @@ -670,8 +667,13 @@ void testDescribeClusterWithOAuthTokenUrl(boolean tls, String expectedProtocol) utils.apply(client, kafka); - await().atMost(Duration.ofSeconds(5)) - .until(() -> configuredContexts.containsKey(clusterId)); + // Wait for the added cluster to be configured in the context map + await().atMost(10, TimeUnit.SECONDS) + .until(() -> configuredContexts.values() + .stream() + .map(KafkaContext::clusterConfig) + .map(KafkaClusterConfig::clusterKey) + .anyMatch(Cache.metaNamespaceKeyFunc(kafka)::equals)); whenRequesting(req -> req .auth().oauth2("my-secure-token") @@ -715,8 +717,13 @@ void testDescribeClusterWithScram(boolean tls, String expectedProtocol) { utils.apply(client, kafka); - await().atMost(Duration.ofSeconds(5)) - .until(() -> configuredContexts.containsKey(clusterId)); + // Wait for the added cluster to be configured in the context map + await().atMost(10, TimeUnit.SECONDS) + .until(() -> configuredContexts.values() + .stream() + .map(KafkaContext::clusterConfig) + .map(KafkaClusterConfig::clusterKey) + .anyMatch(Cache.metaNamespaceKeyFunc(kafka)::equals)); whenRequesting(req -> req .auth().basic("u", "p") diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java index 2f1b5e290..385e10ec6 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainProfile.java @@ -43,15 +43,33 @@ public Map getConfigOverrides() { id: k1-id properties: bootstrap.servers: ${console.test.external-bootstrap} + - name: test-kafka2 namespace: default id: k2-id properties: bootstrap.servers: ${console.test.random-bootstrap} + - name: test-kafka3 namespace: default # listener is named and bootstrap.servers not set (will be retrieved from Kafka CR) listener: listener0 + + # missing required bootstrap.servers and sasl.mechanism + - name: test-kafkaX + properties: + sasl.jaas.config: something + + - name: test-kafkaY + properties: + bootstrap.servers: ${console.test.external-bootstrap} + + # duplicate test-kafkaY that will be ignored + - name: test-kafkaY + properties: + bootstrap.servers: ${console.test.external-bootstrap} + sasl.mechanism: SCRAM-SHA-512 + sasl.jaas.config: something """); return Map.of("console.config-path", configFile.getAbsolutePath()); diff --git a/api/src/test/java/com/github/streamshub/console/test/AdminClientSpy.java b/api/src/test/java/com/github/streamshub/console/test/AdminClientSpy.java index 9aea512f4..1713d7033 100644 --- a/api/src/test/java/com/github/streamshub/console/test/AdminClientSpy.java +++ b/api/src/test/java/com/github/streamshub/console/test/AdminClientSpy.java @@ -5,9 +5,8 @@ import java.util.function.Function; import java.util.function.UnaryOperator; -import jakarta.enterprise.util.AnnotationLiteral; +import jakarta.enterprise.inject.literal.NamedLiteral; import jakarta.enterprise.util.TypeLiteral; -import jakarta.inject.Named; import org.apache.kafka.clients.admin.Admin; import org.mockito.Mockito; @@ -34,22 +33,6 @@ public final class AdminClientSpy { private static final long serialVersionUID = 1L; }; - @SuppressWarnings("all") - static class NamedLiteral extends AnnotationLiteral implements Named { - private static final long serialVersionUID = 1L; - - final String value; - - public NamedLiteral(String value) { - this.value = value; - } - - @Override - public String value() { - return value; - } - } - static final String KAFKA_ADMIN_BUILDER = "kafkaAdminBuilder"; static final String KAFKA_ADMIN_FILTER = "kafkaAdminFilter"; @@ -66,7 +49,7 @@ public static void install(Consumer adminSetup) { return client; }; - QuarkusMock.installMockForType(filter, CLIENT_FILTER_TYPE_LITERAL, new NamedLiteral(KAFKA_ADMIN_FILTER)); + QuarkusMock.installMockForType(filter, CLIENT_FILTER_TYPE_LITERAL, NamedLiteral.of(KAFKA_ADMIN_FILTER)); } /** @@ -83,7 +66,7 @@ public static void install(UnaryOperator> configSetup, Consu return client; }; - QuarkusMock.installMockForType(builder, CLIENT_BUILDER_TYPE_LITERAL, new NamedLiteral(KAFKA_ADMIN_BUILDER)); + QuarkusMock.installMockForType(builder, CLIENT_BUILDER_TYPE_LITERAL, NamedLiteral.of(KAFKA_ADMIN_BUILDER)); } private AdminClientSpy() { diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java index d9dc43ced..f90aa8415 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java @@ -6,6 +6,7 @@ import java.io.UncheckedIOException; import java.security.SecureRandom; import java.util.Base64; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -13,12 +14,14 @@ import java.util.Optional; import java.util.Random; import java.util.function.Function; +import java.util.function.Predicate; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -132,26 +135,28 @@ private void addConfig(Console primary, Context context, ConsoleConfig config.getKubernetes().setEnabled(Objects.nonNull(namespace)); config.getKafka().getClusters().add(kcConfig); + setConfigVars(primary, context, kcConfig.getProperties(), kafkaRef.getProperties()); + setConfigVars(primary, context, kcConfig.getAdminProperties(), kafkaRef.getAdminProperties()); + setConfigVars(primary, context, kcConfig.getConsumerProperties(), kafkaRef.getConsumerProperties()); + setConfigVars(primary, context, kcConfig.getProducerProperties(), kafkaRef.getProducerProperties()); + if (namespace != null && listenerName != null) { - // TODO: add informer for Kafka CRs + // Changes in the Kafka resource picked up during periodic reconciliation Kafka kafka = getResource(context, Kafka.class, namespace, name); setListenerConfig(kcConfig.getProperties(), kafka, listenerName); } - Optional.ofNullable(kafkaRef.getCredentials()) - .map(Credentials::getKafkaUser) - .ifPresent(user -> { - String userNs = Optional.ofNullable(user.getNamespace()).orElse(namespace); - setKafkaUserConfig( - context, - getResource(context, KafkaUser.class, userNs, user.getName()), - kcConfig.getProperties()); - }); - - setConfigVars(primary, context, kcConfig.getProperties(), kafkaRef.getProperties()); - setConfigVars(primary, context, kcConfig.getAdminProperties(), kafkaRef.getAdminProperties()); - setConfigVars(primary, context, kcConfig.getConsumerProperties(), kafkaRef.getConsumerProperties()); - setConfigVars(primary, context, kcConfig.getProducerProperties(), kafkaRef.getProducerProperties()); + if (!kcConfig.getProperties().containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + Optional.ofNullable(kafkaRef.getCredentials()) + .map(Credentials::getKafkaUser) + .ifPresent(user -> { + String userNs = Optional.ofNullable(user.getNamespace()).orElse(namespace); + setKafkaUserConfig( + context, + getResource(context, KafkaUser.class, userNs, user.getName()), + kcConfig.getProperties()); + }); + } } void setListenerConfig(Map properties, Kafka kafka, String listenerName) { @@ -191,30 +196,45 @@ void setListenerConfig(Map properties, Kafka kafka, String liste protocol.append("PLAINTEXT"); } - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); + properties.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); if (mechanism != null) { - properties.put(SaslConfigs.SASL_MECHANISM, mechanism); + properties.putIfAbsent(SaslConfigs.SASL_MECHANISM, mechanism); } - ListenerStatus listenerStatus = Optional.ofNullable(kafka.getStatus()) + Optional listenerStatus = Optional.ofNullable(kafka.getStatus()) .map(KafkaStatus::getListeners) .orElseGet(Collections::emptyList) .stream() .filter(l -> l.getName().equals(listenerName)) - .findFirst() - .orElse(null); - - Optional.ofNullable(listenerStatus) - .map(ListenerStatus::getBootstrapServers) - .or(() -> Optional.ofNullable(listenerSpec.getConfiguration()) - .map(GenericKafkaListenerConfiguration::getBootstrap) - .map(GenericKafkaListenerConfigurationBootstrap::getHost)) - .ifPresent(bootstrapServers -> properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); + .findFirst(); + + properties.computeIfAbsent( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + key -> listenerStatus.map(ListenerStatus::getBootstrapServers) + .or(() -> Optional.ofNullable(listenerSpec.getConfiguration()) + .map(GenericKafkaListenerConfiguration::getBootstrap) + .map(GenericKafkaListenerConfigurationBootstrap::getHost)) + .orElseThrow(() -> new ReconciliationException(""" + Bootstrap servers could not be found for listener '%s' on Kafka %s/%s \ + and no configuration was given in the Console resource""" + .formatted(listenerName, kafka.getMetadata().getNamespace(), kafka.getMetadata().getName())))); + + if (!properties.containsKey(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG) + && !properties.containsKey(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG)) { + listenerStatus.map(ListenerStatus::getCertificates) + .filter(Objects::nonNull) + .filter(Predicate.not(Collection::isEmpty)) + .map(certificates -> String.join("\n", certificates).trim()) + .ifPresent(certificates -> { + properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + properties.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certificates); + }); + } } void setKafkaUserConfig(Context context, KafkaUser user, Map properties) { - // TODO: add informer for KafkaUser CRs and the referenced Secret + // Changes in the KafkaUser resource and referenced Secret picked up during periodic reconciliation var secretName = Optional.ofNullable(user.getStatus()) .map(KafkaUserStatus::getSecret) .orElseThrow(() -> new ReconciliationException("KafkaUser %s/%s missing .status.secret"