From 522fc0e8a888b92c40da893cfe912ee8d7a12c7f Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Wed, 31 Jul 2024 17:35:08 -0400 Subject: [PATCH] Return auth type in Kafka meta, treat PLAIN like SCRAM-SHA Signed-off-by: Michael Edgar --- api/README.md | 5 +- .../streamshub/console/api/ClientFactory.java | 75 ++++--- .../console/api/model/KafkaCluster.java | 203 +++++++++--------- .../console/api/model/RelatableResource.java | 46 +--- .../console/api/model/Resource.java | 12 +- .../console/api/service/BrokerService.java | 2 +- .../api/service/KafkaClusterService.java | 56 +++-- .../console/api/support/KafkaContext.java | 29 +++ .../console/api/support/OASModelFilter.java | 2 +- api/src/main/resources/application.properties | 3 - .../api/KafkaClustersResourceNoK8sIT.java | 6 +- .../systemtest/TestPlainNoK8sProfile.java | 6 +- common/pom.xml | 8 +- .../console/config/KafkaClusterConfig.java | 3 + .../console/config/KafkaConfig.java | 8 + console-config-example.yaml | 10 +- .../api/v1alpha1/spec/KafkaCluster.java | 11 +- 17 files changed, 253 insertions(+), 232 deletions(-) diff --git a/api/README.md b/api/README.md index b28e76746..140151e7b 100644 --- a/api/README.md +++ b/api/README.md @@ -40,10 +40,11 @@ create instances of both Prometheus and Kafka. ### Start Console API in Development Mode -Start the API in development mode from the repository root directory. +Start the API in development mode from the repository root directory. Ensure that the config-path given points to a +valid `console-config.yaml`. See [console-config-example.yaml](../console-config-example.yaml) for an example. ```shell -mvn -am -pl api quarkus:dev +mvn -am -pl api quarkus:dev -Dconsole.config-path=$(pwd)/console-config.yaml ``` ### Using the Instance API diff --git a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java index e365c7c28..251f75d21 100644 --- a/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java +++ b/api/src/main/java/com/github/streamshub/console/api/ClientFactory.java @@ -49,6 +49,7 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.plain.PlainLoginModule; import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -62,6 +63,7 @@ import com.github.streamshub.console.api.support.Holder; import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.TrustAllCertificateManager; +import com.github.streamshub.console.api.support.ValidationProxy; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; @@ -92,19 +94,21 @@ @ApplicationScoped public class ClientFactory { + public static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; + public static final String PLAIN = "PLAIN"; + public static final String SCRAM_SHA256 = "SCRAM-SHA-256"; + public static final String SCRAM_SHA512 = "SCRAM-SHA-512"; + private static final String BEARER = "Bearer "; - private static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; private static final String STRIMZI_OAUTH_CALLBACK = "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"; private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName() + " required" + " oauth.access.token=\"%s\" ;"; private static final String BASIC = "Basic "; - private static final String SCRAM_SHA512 = "SCRAM-SHA-512"; - private static final String SASL_SCRAM_CONFIG_TEMPLATE = ScramLoginModule.class.getName() - + " required" - + " username=\"%s\"" - + " password=\"%s\" ;"; + private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;"; + private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName()); + private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName()); static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured"; private final Function noSuchKafka = @@ -129,6 +133,9 @@ public class ClientFactory { @Inject KafkaClusterService kafkaClusterService; + @Inject + ValidationProxy validationService; + @Inject Instance trustManager; @@ -192,8 +199,9 @@ public ConsoleConfig produceConsoleConfig() { return consoleConfig; }) + .map(validationService::validate) .orElseGet(() -> { - log.infof("Console configuration not specified"); + log.warn("Console configuration has not been specified using `console.config-path` property"); return new ConsoleConfig(); }); } @@ -213,6 +221,7 @@ Map produceKafkaContexts(ConsoleConfig consoleConfig, consoleConfig.getKafka().getClusters() .stream() .filter(c -> cachedKafkaResource(c).isEmpty()) + .filter(Predicate.not(KafkaClusterConfig::hasNamespace)) .forEach(clusterConfig -> putKafkaContext(contexts, clusterConfig, Optional.empty(), @@ -309,19 +318,14 @@ void putKafkaContext(Map contexts, String clusterKey = clusterConfig.clusterKey(); String clusterId = Optional.ofNullable(clusterConfig.getId()) .or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId)) - .orElse(null); + .orElseGet(clusterConfig::getName); - if (clusterId == null) { - log.warnf(""" - Ignoring Kafka cluster %s. Cluster id value missing in \ - configuration and no Strimzi Kafka resources found with matching \ - name and namespace.""", clusterKey); - } else if (!replace && contexts.containsKey(clusterId)) { + if (!replace && contexts.containsKey(clusterId)) { log.warnf(""" Ignoring duplicate Kafka cluster id: %s for cluster %s. Cluster id values in \ configuration must be unique and may not match id values of \ clusters discovered using Strimzi Kafka Kubernetes API resources.""", clusterId, clusterKey); - } else if (truststoreRequired(adminConfigs)) { + } else if (kafkaResource.isPresent() && truststoreRequired(adminConfigs)) { if (contexts.containsKey(clusterId) && !truststoreRequired(contexts.get(clusterId).configs(Admin.class))) { log.warnf(""" Ignoring update to Kafka custom resource %s. Connection requires \ @@ -337,13 +341,17 @@ void putKafkaContext(Map contexts, Optional cachedKafkaResource(KafkaClusterConfig clusterConfig) { return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore) - .map(store -> { + .map(store -> store.getByKey(clusterConfig.clusterKey())) + .or(() -> { String key = clusterConfig.clusterKey(); - Kafka resource = store.getByKey(key); - if (resource == null) { - log.warnf("Configuration references Kafka resource %s, but it was not found in cache", key); + + if (kafkaInformer.isPresent()) { + log.warnf("Configuration references Kubernetes Kafka resource %s, but it was not found", key); + } else { + log.warnf("Configuration references Kubernetes Kafka resource %s, but Kubernetes access is disabled", key); } - return resource; + + return Optional.empty(); }) : Optional.empty(); } @@ -438,7 +446,7 @@ public KafkaContext produceKafkaContext(Map contexts, * configuration. The user must provide the login secrets * in the request in that case. */ - var adminConfigs = maybeAuthenticate(ctx.configs(Admin.class)); + var adminConfigs = maybeAuthenticate(ctx, Admin.class); var admin = adminBuilder.apply(adminConfigs); return new KafkaContext(ctx, filter.apply(admin)); } @@ -463,7 +471,7 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = maybeAuthenticate(context.configs(Consumer.class)); + var configs = maybeAuthenticate(context, Consumer.class); Consumer client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer return () -> client; } @@ -475,7 +483,7 @@ public void consumerDisposer(@Disposes Supplier> consum @Produces @RequestScoped public Supplier> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) { - var configs = maybeAuthenticate(context.configs(Producer.class)); + var configs = maybeAuthenticate(context, Producer.class); Producer client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer return () -> client; } @@ -484,11 +492,13 @@ public void producerDisposer(@Disposes Supplier> produc producer.get().close(); } - Map maybeAuthenticate(Map configs) { + Map maybeAuthenticate(KafkaContext context, Class clientType) { + Map configs = context.configs(clientType); + if (configs.containsKey(SaslConfigs.SASL_MECHANISM) && !configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { configs = new HashMap<>(configs); - configureAuthentication(configs); + configureAuthentication(context.saslMechanism(clientType), configs); } return configs; @@ -665,13 +675,16 @@ void logConfig(String clientType, Map config) { } } - void configureAuthentication(Map configs) { - switch (configs.get(SaslConfigs.SASL_MECHANISM).toString()) { + void configureAuthentication(String saslMechanism, Map configs) { + switch (saslMechanism) { case OAUTHBEARER: configureOAuthBearer(configs); break; - case SCRAM_SHA512: - configureScram(configs); + case PLAIN: + configureBasic(configs, SASL_PLAIN_CONFIG_TEMPLATE); + break; + case SCRAM_SHA256, SCRAM_SHA512: + configureBasic(configs, SASL_SCRAM_CONFIG_TEMPLATE); break; default: throw new NotAuthorizedException("Unknown"); @@ -693,11 +706,11 @@ void configureOAuthBearer(Map configs) { configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); } - void configureScram(Map configs) { + void configureBasic(Map configs, String template) { log.trace("SASL/SCRAM enabled"); String jaasConfig = getBasicAuthentication() - .map(SASL_SCRAM_CONFIG_TEMPLATE::formatted) + .map(template::formatted) .orElseThrow(() -> new NotAuthorizedException(BASIC.trim())); configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java index 46da46f1a..e0e3ef9e5 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java @@ -16,15 +16,15 @@ import com.fasterxml.jackson.annotation.JsonFilter; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.github.streamshub.console.api.support.ComparatorBuilder; import com.github.streamshub.console.api.support.ListRequestContext; import static java.util.Comparator.comparing; import static java.util.Comparator.nullsLast; -@Schema(name = "KafkaClusterAttributes") -@JsonFilter("fieldFilter") -public class KafkaCluster { +@Schema(name = "KafkaCluster") +public class KafkaCluster extends Resource { public static class Fields { public static final String NAME = "name"; @@ -46,9 +46,9 @@ public static class Fields { static final Map>> COMPARATORS = ComparatorBuilder.bidirectional( Map.of("id", ID_COMPARATOR, - NAME, comparing(KafkaCluster::getName), - NAMESPACE, comparing(KafkaCluster::getNamespace), - CREATION_TIMESTAMP, comparing(KafkaCluster::getCreationTimestamp))); + NAME, comparing(KafkaCluster::name), + NAMESPACE, comparing(KafkaCluster::namespace), + CREATION_TIMESTAMP, comparing(KafkaCluster::creationTimestamp))); public static final ComparatorBuilder COMPARATOR_BUILDER = new ComparatorBuilder<>(KafkaCluster.Fields::comparator, KafkaCluster.Fields.defaultComparator()); @@ -84,15 +84,12 @@ public static Comparator comparator(String fieldName, boolean desc } @Schema(name = "KafkaClusterListResponse") - public static final class ListResponse extends DataList { + public static final class ListResponse extends DataList { public ListResponse(List data, ListRequestContext listSupport) { super(data.stream() .map(entry -> { - var rsrc = new KafkaClusterResource(entry); - rsrc.addMeta("page", listSupport.buildPageMeta(entry::toCursor)); - rsrc.addMeta("configured", entry.isConfigured()); - rsrc.addMeta("managed", entry.isManaged()); - return rsrc; + entry.addMeta("page", listSupport.buildPageMeta(entry::toCursor)); + return entry; }) .toList()); addMeta("page", listSupport.buildPageMeta()); @@ -101,48 +98,56 @@ public ListResponse(List data, ListRequestContext li } @Schema(name = "KafkaClusterResponse") - public static final class SingleResponse extends DataSingleton { + public static final class SingleResponse extends DataSingleton { public SingleResponse(KafkaCluster data) { - super(new KafkaClusterResource(data)); + super(data); } } - @Schema(name = "KafkaCluster") - public static final class KafkaClusterResource extends Resource { - public KafkaClusterResource(KafkaCluster data) { - super(data.id, "kafkas", data); - addMeta("configured", data.isConfigured()); - addMeta("managed", data.isManaged()); + @JsonFilter("fieldFilter") + static class Attributes { + @JsonProperty + String name; // Strimzi Kafka CR only + + @JsonProperty + String namespace; // Strimzi Kafka CR only + + @JsonProperty + String creationTimestamp; // Strimzi Kafka CR only + + @JsonProperty + final List nodes; + + @JsonProperty + final Node controller; + + @JsonProperty + final List authorizedOperations; + + @JsonProperty + List listeners; // Strimzi Kafka CR only + + @JsonProperty + String kafkaVersion; + + @JsonProperty + String status; + + @JsonProperty + List conditions; + + @JsonProperty + List nodePools; + + Attributes(List nodes, Node controller, List authorizedOperations) { + this.nodes = nodes; + this.controller = controller; + this.authorizedOperations = authorizedOperations; } } - String name; // Strimzi Kafka CR only - String namespace; // Strimzi Kafka CR only - String creationTimestamp; // Strimzi Kafka CR only - @JsonIgnore - String id; // non-final, may be overridden by configuration - final List nodes; - final Node controller; - final List authorizedOperations; - List listeners; // Strimzi Kafka CR only - @Schema(readOnly = true, description = """ - Contains the set of metrics optionally retrieved only in a describe operation. - """) - String kafkaVersion; - String status; - List conditions; - @JsonIgnore - boolean configured; - List nodePools; - @JsonIgnore - boolean managed; - public KafkaCluster(String id, List nodes, Node controller, List authorizedOperations) { - super(); - this.id = id; - this.nodes = nodes; - this.controller = controller; - this.authorizedOperations = authorizedOperations; + super(id, "kafkas", new Attributes(nodes, controller, authorizedOperations)); } /** @@ -156,9 +161,9 @@ public static KafkaCluster fromCursor(JsonObject cursor) { KafkaCluster cluster = new KafkaCluster(cursor.getString("id"), null, null, null); JsonObject attr = cursor.getJsonObject("attributes"); - cluster.setName(attr.getString(Fields.NAME, null)); - cluster.setNamespace(attr.getString(Fields.NAMESPACE, null)); - cluster.setCreationTimestamp(attr.getString(Fields.CREATION_TIMESTAMP, null)); + cluster.name(attr.getString(Fields.NAME, null)); + cluster.namespace(attr.getString(Fields.NAMESPACE, null)); + cluster.creationTimestamp(attr.getString(Fields.CREATION_TIMESTAMP, null)); return cluster; } @@ -168,9 +173,9 @@ public String toCursor(List sortFields) { .add("id", id == null ? Json.createValue("") : Json.createValue(id)); JsonObjectBuilder attrBuilder = Json.createObjectBuilder(); - maybeAddAttribute(attrBuilder, sortFields, Fields.NAME, name); - maybeAddAttribute(attrBuilder, sortFields, Fields.NAMESPACE, namespace); - maybeAddAttribute(attrBuilder, sortFields, Fields.CREATION_TIMESTAMP, creationTimestamp); + maybeAddAttribute(attrBuilder, sortFields, Fields.NAME, attributes.name); + maybeAddAttribute(attrBuilder, sortFields, Fields.NAMESPACE, attributes.namespace); + maybeAddAttribute(attrBuilder, sortFields, Fields.CREATION_TIMESTAMP, attributes.creationTimestamp); cursor.add("attributes", attrBuilder.build()); return Base64.getUrlEncoder().encodeToString(cursor.build().toString().getBytes(StandardCharsets.UTF_8)); @@ -182,103 +187,93 @@ static void maybeAddAttribute(JsonObjectBuilder attrBuilder, List sortFi } } - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; + public String name() { + return attributes.name; } - public String getNamespace() { - return namespace; + public void name(String name) { + attributes.name = name; } - public void setNamespace(String namespace) { - this.namespace = namespace; + public String namespace() { + return attributes.namespace; } - public String getCreationTimestamp() { - return creationTimestamp; + public void namespace(String namespace) { + attributes.namespace = namespace; } - public void setCreationTimestamp(String creationTimestamp) { - this.creationTimestamp = creationTimestamp; + public String creationTimestamp() { + return attributes.creationTimestamp; } - public String getId() { - return id; + public void creationTimestamp(String creationTimestamp) { + attributes.creationTimestamp = creationTimestamp; } public void setId(String id) { this.id = id; } - public List getNodes() { - return nodes; + public List nodes() { + return attributes.nodes; } - public Node getController() { - return controller; + public Node controller() { + return attributes.controller; } - public List getAuthorizedOperations() { - return authorizedOperations; + public List authorizedOperations() { + return attributes.authorizedOperations; } - public List getListeners() { - return listeners; + public List listeners() { + return attributes.listeners; } - public void setListeners(List listeners) { - this.listeners = listeners; + public void listeners(List listeners) { + attributes.listeners = listeners; } - public String getKafkaVersion() { - return kafkaVersion; + public String kafkaVersion() { + return attributes.kafkaVersion; } - public void setKafkaVersion(String kafkaVersion) { - this.kafkaVersion = kafkaVersion; + public void kafkaVersion(String kafkaVersion) { + attributes.kafkaVersion = kafkaVersion; } - public String getStatus() { - return status; + public String status() { + return attributes.status; } - public void setStatus(String status) { - this.status = status; + public void status(String status) { + attributes.status = status; } - public List getConditions() { - return conditions; + public List conditions() { + return attributes.conditions; } - public void setConditions(List conditions) { - this.conditions = conditions; + public void conditions(List conditions) { + attributes.conditions = conditions; } + @JsonIgnore public boolean isConfigured() { - return configured; + return Boolean.TRUE.equals(getMeta("configured")); } + @JsonIgnore public void setConfigured(boolean configured) { - this.configured = configured; - } - - public List getNodePools() { - return nodePools; - } - - public void setNodePools(List nodePools) { - this.nodePools = nodePools; + addMeta("configured", configured); } - public void setManaged(boolean managed) { - this.managed = managed; + public List nodePools() { + return attributes.nodePools; } - public boolean isManaged() { - return managed; + public void nodePools(List nodePools) { + attributes.nodePools = nodePools; } } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/RelatableResource.java b/api/src/main/java/com/github/streamshub/console/api/model/RelatableResource.java index 304e59e63..de3fcde72 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/RelatableResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/RelatableResource.java @@ -1,14 +1,7 @@ package com.github.streamshub.console.api.model; -import java.util.LinkedHashMap; -import java.util.Map; - -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotNull; - import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.github.streamshub.console.api.support.ErrorCategory; /** * A "resource object", as described by JSON:API. @@ -19,51 +12,16 @@ * @param the type of the relationships model */ @JsonInclude(Include.NON_NULL) -public abstract class RelatableResource { - - protected final String id; - @NotNull(payload = ErrorCategory.InvalidResource.class) - protected final String type; - protected Map meta; - - @Valid - @NotNull(payload = ErrorCategory.InvalidResource.class) - protected final A attributes; +public abstract class RelatableResource extends Resource { protected final R relationships; protected RelatableResource(String id, String type, A attributes, R relationships) { - this.id = id; - this.type = type; - this.attributes = attributes; + super(id, type, attributes); this.relationships = relationships; } - public String getId() { - return id; - } - - public String getType() { - return type; - } - - public A getAttributes() { - return attributes; - } - public R getRelationships() { return relationships; } - - public Map getMeta() { - return meta; - } - - public RelatableResource addMeta(String key, Object value) { - if (meta == null) { - meta = new LinkedHashMap<>(); - } - meta.put(key, value); - return this; - } } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/Resource.java b/api/src/main/java/com/github/streamshub/console/api/model/Resource.java index 452afab23..65a812694 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/Resource.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/Resource.java @@ -20,13 +20,13 @@ @JsonInclude(Include.NON_NULL) public abstract class Resource { - private final String id; + protected String id; @NotNull(payload = ErrorCategory.InvalidResource.class) - private final String type; - private Map meta; + protected final String type; + protected Map meta; @Valid @NotNull(payload = ErrorCategory.InvalidResource.class) - private final T attributes; + protected final T attributes; protected Resource(String id, String type, T attributes) { this.id = id; @@ -50,6 +50,10 @@ public Map getMeta() { return meta; } + public Object getMeta(String key) { + return meta != null ? meta.get(key) : null; + } + public Resource addMeta(String key, Object value) { if (meta == null) { meta = new LinkedHashMap<>(); diff --git a/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java b/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java index 7c432d062..d546742f7 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/BrokerService.java @@ -29,7 +29,7 @@ public class BrokerService { public CompletionStage> describeConfigs(String nodeId) { return clusterService.describeCluster(Collections.emptyList()) .thenApply(cluster -> { - if (cluster.getNodes().stream().mapToInt(Node::id).mapToObj(String::valueOf).noneMatch(nodeId::equals)) { + if (cluster.nodes().stream().mapToInt(Node::id).mapToObj(String::valueOf).noneMatch(nodeId::equals)) { throw new NotFoundException("No such node: " + nodeId); } return cluster; diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java index e51aa1f8e..674b365e7 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java @@ -2,6 +2,7 @@ import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -23,6 +24,7 @@ import org.jboss.logging.Logger; import com.github.streamshub.console.api.Annotations; +import com.github.streamshub.console.api.ClientFactory; import com.github.streamshub.console.api.model.Condition; import com.github.streamshub.console.api.model.KafkaCluster; import com.github.streamshub.console.api.model.KafkaListener; @@ -95,17 +97,11 @@ public List listClusters(ListRequestContext listSupp var config = ctx.getValue().clusterConfig(); return kafkaResources.stream() - .filter(k -> Objects.equals(k.getName(), config.getName())) - .filter(k -> Objects.equals(k.getNamespace(), config.getNamespace())) + .filter(k -> Objects.equals(k.name(), config.getName())) + .filter(k -> Objects.equals(k.namespace(), config.getNamespace())) .map(k -> addKafkaContextData(k, ctx.getValue())) .findFirst() - .orElseGet(() -> { - var k = new KafkaCluster(id, null, null, null); - k.setConfigured(true); - k.setName(config.getName()); - k.setNamespace(config.getNamespace()); - return k; - }); + .orElseGet(() -> addKafkaContextData(new KafkaCluster(id, null, null, null), ctx.getValue())); }) .collect(Collectors.toMap(KafkaCluster::getId, Function.identity())); @@ -159,12 +155,28 @@ KafkaCluster toKafkaCluster(Kafka kafka) { KafkaCluster addKafkaContextData(KafkaCluster cluster, KafkaContext kafkaContext) { var config = kafkaContext.clusterConfig(); cluster.setConfigured(true); + cluster.name(config.getName()); + cluster.namespace(config.getNamespace()); + if (config.getId() != null) { // configuration has overridden the id cluster.setId(config.getId()); } - cluster.setName(config.getName()); - cluster.setNamespace(config.getNamespace()); + + switch (kafkaContext.saslMechanism(Admin.class)) { + case ClientFactory.OAUTHBEARER: + Map authMeta = new HashMap<>(2); + authMeta.put("method", "oauth"); + authMeta.put("tokenUrl", kafkaContext.tokenUrl().orElse(null)); + cluster.addMeta("authentication", authMeta); + break; + case ClientFactory.PLAIN, ClientFactory.SCRAM_SHA256, ClientFactory.SCRAM_SHA512: + cluster.addMeta("authentication", Map.of("method", "basic")); + break; + default: + break; + } + return cluster; } @@ -178,9 +190,9 @@ KafkaCluster addKafkaResourceData(KafkaCluster cluster) { } void setKafkaClusterProperties(KafkaCluster cluster, Kafka kafka) { - cluster.setName(kafka.getMetadata().getName()); - cluster.setNamespace(kafka.getMetadata().getNamespace()); - cluster.setCreationTimestamp(kafka.getMetadata().getCreationTimestamp()); + cluster.name(kafka.getMetadata().getName()); + cluster.namespace(kafka.getMetadata().getNamespace()); + cluster.creationTimestamp(kafka.getMetadata().getCreationTimestamp()); var comparator = Comparator .comparingInt((GenericKafkaListener listener) -> @@ -205,39 +217,39 @@ void setKafkaClusterProperties(KafkaCluster cluster, Kafka kafka) { getAuthType(listener).orElse(null))) .toList(); - cluster.setListeners(listeners); + cluster.listeners(listeners); setKafkaClusterStatus(cluster, kafka); } void setKafkaClusterStatus(KafkaCluster cluster, Kafka kafka) { Optional.ofNullable(kafka.getStatus()) .ifPresent(status -> { - cluster.setKafkaVersion(status.getKafkaVersion()); + cluster.kafkaVersion(status.getKafkaVersion()); Optional.ofNullable(status.getConditions()) .ifPresent(conditions -> { - cluster.setConditions(conditions.stream().map(Condition::new).toList()); + cluster.conditions(conditions.stream().map(Condition::new).toList()); conditions.stream() .filter(c -> "NotReady".equals(c.getType()) && "True".equals(c.getStatus())) .findFirst() .ifPresentOrElse( - c -> cluster.setStatus("NotReady"), - () -> cluster.setStatus("Ready")); + c -> cluster.status("NotReady"), + () -> cluster.status("Ready")); }); Optional.ofNullable(status.getKafkaNodePools()) - .ifPresent(pools -> cluster.setNodePools(pools.stream().map(pool -> pool.getName()).toList())); + .ifPresent(pools -> cluster.nodePools(pools.stream().map(pool -> pool.getName()).toList())); }); } KafkaCluster setManaged(KafkaCluster cluster) { - cluster.setManaged(findCluster(cluster) + cluster.addMeta("managed", findCluster(cluster) .map(kafkaTopic -> Boolean.TRUE) .orElse(Boolean.FALSE)); return cluster; } private Optional findCluster(KafkaCluster cluster) { - return findCluster(Cache.namespaceKeyFunc(cluster.getNamespace(), cluster.getName())); + return findCluster(Cache.namespaceKeyFunc(cluster.namespace(), cluster.name())); } private Optional findCluster(String clusterKey) { diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index 48601ee9c..89cd34efe 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -1,15 +1,24 @@ package com.github.streamshub.console.api.support; import java.io.Closeable; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.config.SaslConfigs; import com.github.streamshub.console.config.KafkaClusterConfig; import io.strimzi.api.kafka.model.kafka.Kafka; +import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec; +import io.strimzi.api.kafka.model.kafka.KafkaSpec; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth; +import io.strimzi.kafka.oauth.client.ClientConfig; public class KafkaContext implements Closeable { @@ -82,4 +91,24 @@ public Admin admin() { public boolean applicationScoped() { return applicationScoped; } + + public String saslMechanism(Class clientType) { + return configs(clientType).get(SaslConfigs.SASL_MECHANISM) instanceof String auth ? auth : ""; + } + + public Optional tokenUrl() { + return Optional.ofNullable(clusterConfig.getProperties().get(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI)) + .or(() -> Optional.ofNullable(resource()) + .map(Kafka::getSpec) + .map(KafkaSpec::getKafka) + .map(KafkaClusterSpec::getListeners) + .map(Collection::stream) + .orElseGet(Stream::empty) + .filter(listener -> listener.getName().equals(clusterConfig.getListener())) + .findFirst() + .map(GenericKafkaListener::getAuth) + .filter(KafkaListenerAuthenticationOAuth.class::isInstance) + .map(KafkaListenerAuthenticationOAuth.class::cast) + .map(KafkaListenerAuthenticationOAuth::getTokenEndpointUri)); + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/OASModelFilter.java b/api/src/main/java/com/github/streamshub/console/api/support/OASModelFilter.java index 97a2d823c..5b2bf4769 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/OASModelFilter.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/OASModelFilter.java @@ -54,7 +54,7 @@ public RequestBody filterRequestBody(RequestBody requestBody) { .filter(example -> Objects.nonNull(example.getExternalValue())) .forEach(example -> { try (InputStream stream = getClass().getResourceAsStream(example.getExternalValue())) { - LOGGER.infof("Loading Example externalValue: %s", example.getExternalValue()); + LOGGER.debugf("Loading Example externalValue: %s", example.getExternalValue()); example.setValue(objectMapper.get().readTree(stream)); example.setExternalValue(null); } catch (IOException e) { diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index c70fec245..703261c30 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -66,14 +66,11 @@ console.kafka.admin.default.api.timeout.ms=10000 %dev.quarkus.kubernetes-client.trust-certs=true %dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF %dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG -#%dev.quarkus.log.category."io.vertx.core.http".level=DEBUG -#%dev.quarkus.log.category."io.netty".level=DEBUG ######## %testplain.quarkus.devservices.enabled=true %testplain.quarkus.kubernetes-client.devservices.enabled=true %testplain.quarkus.kubernetes-client.devservices.override-kubeconfig=true -%testplain.quarkus.log.category."io.fabric8.kubernetes".level=DEBUG #%testplain.quarkus.http.auth.proactive=false #%testplain.quarkus.http.auth.permission."oidc".policy=permit diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java index 8d34928b8..4dc3047d5 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java @@ -62,14 +62,14 @@ void setup() throws IOException { kafkaContainer = deployments.getKafkaContainer(); bootstrapServers = URI.create(kafkaContainer.getBootstrapServers()); randomBootstrapServers = URI.create(consoleConfig.getKafka() - .getCluster("default/test-kafka2") + .getCluster("test-kafka2") .map(k -> k.getProperties().get("bootstrap.servers")) .orElseThrow()); utils = new TestHelper(bootstrapServers, config, null); - clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); - clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); + clusterId1 = consoleConfig.getKafka().getCluster("test-kafka1").get().getId(); + clusterId2 = consoleConfig.getKafka().getCluster("test-kafka2").get().getId(); kafkaClusterService.setListUnconfigured(false); } diff --git a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java index b892be8d2..8d479bb04 100644 --- a/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java +++ b/api/src/test/java/com/github/streamshub/console/kafka/systemtest/TestPlainNoK8sProfile.java @@ -28,15 +28,17 @@ public Map getConfigOverrides() { kafka: clusters: - name: test-kafka1 - namespace: default id: k1-id properties: bootstrap.servers: ${console.test.external-bootstrap} - name: test-kafka2 - namespace: default id: k2-id properties: bootstrap.servers: ${console.test.random-bootstrap} + - name: test-kafka3 + namespace: default + id: k3-id + listener: listener0 """); return Map.of( diff --git a/common/pom.xml b/common/pom.xml index 4173178a4..920b0f14a 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,6 +28,10 @@ com.fasterxml.jackson.core jackson-annotations + + jakarta.validation + jakarta.validation-api + provided + - - \ No newline at end of file + diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index 1f7f058fd..b5a8334a9 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -3,11 +3,14 @@ import java.util.LinkedHashMap; import java.util.Map; +import jakarta.validation.constraints.NotBlank; + import com.fasterxml.jackson.annotation.JsonIgnore; public class KafkaClusterConfig { private String id; + @NotBlank private String name; private String namespace; private String listener; diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java index f42a54ed4..9c68d1622 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java @@ -4,12 +4,20 @@ import java.util.List; import java.util.Optional; +import jakarta.validation.constraints.AssertTrue; + import com.fasterxml.jackson.annotation.JsonIgnore; public class KafkaConfig { List clusters = new ArrayList<>(); + @JsonIgnore + @AssertTrue(message = "Kafka cluster names must be unique") + public boolean hasUniqueClusterNames() { + return clusters.stream().map(KafkaClusterConfig::getName).distinct().count() == clusters.size(); + } + @JsonIgnore public Optional getCluster(String clusterKey) { return clusters.stream() diff --git a/console-config-example.yaml b/console-config-example.yaml index 44f539fde..3b55847b7 100644 --- a/console-config-example.yaml +++ b/console-config-example.yaml @@ -1,9 +1,9 @@ -kafka: - kubernetes: - # enable/disable use of Kubernetes to obtain additional information from Strimzi - # Kafka and KafkaTopic custom resources. Enabled by default - enabled: true +kubernetes: + # enable/disable use of Kubernetes to obtain additional information from Strimzi + # Kafka and KafkaTopic custom resources. Enabled by default + enabled: true +kafka: clusters: - name: my-kafka1 # name of the Strimzi Kafka CR namespace: my-namespace1 # namespace of the Strimzi Kafka CR (optional) diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java index 3a14c05b2..1d3323971 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/KafkaCluster.java @@ -9,11 +9,6 @@ @Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") @JsonInclude(JsonInclude.Include.NON_NULL) -@ValidationRule( - // The `namespace` property must be wrapped in double underscore to escape it - // due to it being a "reserved" word. - value = "has(self.id) || has(self.__namespace__)", - message = "One of `id` or `namespace` is required") @ValidationRule( // The `namespace` property must be wrapped in double underscore to escape it // due to it being a "reserved" word. @@ -27,10 +22,10 @@ public class KafkaCluster { resource may be discovered using the name and namespace properties, \ this property is optional. Otherwise, the Kafka cluster identifier \ published in the Kafka resource's status will be used. If namespace \ - is not given or the console or Kubernetes is not in use, this property \ - is required. + is not given or the console or Kubernetes is not in use, and this \ + property is not provided, the ID will default to the name. - When provided, this property will override the Kafka cluster id available \ + When provided, this property will override the Kafka cluster ID available \ in the Kafka resource's status.""") private String id;