From 34e1195e23a2291d876c717f47b119242e85987c Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Thu, 7 Nov 2024 15:40:47 -0500 Subject: [PATCH] Rebalance OIDC tests, security config validation Signed-off-by: Michael Edgar --- api/pom.xml | 4 +- .../ConsoleAuthenticationMechanism.java | 2 +- .../api/security/ConsolePermission.java | 30 +- .../api/service/TopicDescribeService.java | 6 +- .../console/api/BrokersResourceIT.java | 2 + .../console/api/ConsumerGroupsResourceIT.java | 2 + .../console/api/KafkaClustersResourceIT.java | 3 +- .../api/KafkaClustersResourceNoK8sIT.java | 1 + .../api/KafkaClustersResourceOidcIT.java | 9 +- .../api/KafkaRebalancesResourceIT.java | 8 +- .../api/KafkaRebalancesResourceOidcIT.java | 294 ++++++++++++++++++ .../console/api/RecordsResourceIT.java | 10 + .../console/api/TopicsResourceIT.java | 1 + .../console/config/ConsoleConfig.java | 23 ++ .../console/config/KafkaClusterConfig.java | 2 + .../console/config/KafkaConfig.java | 7 + .../config/security/GlobalSecurityConfig.java | 3 + .../console/config/security/RoleConfig.java | 8 + .../console/config/security/RuleConfig.java | 11 +- .../config/security/SecurityConfig.java | 5 + .../config/security/SubjectConfig.java | 11 +- operator/pom.xml | 4 +- 22 files changed, 395 insertions(+), 51 deletions(-) create mode 100644 api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceOidcIT.java diff --git a/api/pom.xml b/api/pom.xml index 45ac363de..a5ecc6316 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -320,7 +320,7 @@ com/github/streamshub/console/api/support/TrustAllCertificateManager.class - *QuarkusClassLoader + ${project.build.directory}/jacoco-quarkus.exec true @@ -334,7 +334,7 @@ com/github/streamshub/console/api/support/TrustAllCertificateManager.class - *QuarkusClassLoader + ${project.build.directory}/jacoco-quarkus.exec true diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java b/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java index 5a8c3d409..a6217495e 100644 --- a/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java +++ b/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java @@ -340,7 +340,7 @@ private void addRoleChecker(KafkaContext ctx, QuarkusSecurityIdentity.Builder bu Stream globalPermissions = getPermissions(globalSecurity, roleNames, ""); Stream clusterPermissions = clusterSecurity - .map(cs -> getPermissions(cs, roleNames, "kafkas/" + ctx.clusterId() + '/')) + .map(cs -> getPermissions(cs, roleNames, "kafkas/" + ctx.clusterConfig().getName() + '/')) .orElseGet(Stream::empty); List possessedPermissions = Stream.concat(globalPermissions, clusterPermissions).toList(); diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java b/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java index f6d80c743..88257b7ee 100644 --- a/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java +++ b/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java @@ -36,19 +36,19 @@ public ConsolePermission(String resource, Collection resourceNames, Priv } private static Set checkActions(Privilege[] actions) { - Set validActions = new HashSet<>(actions.length, 1); - for (Privilege action : actions) { - validActions.add(validateAndTrim(action, "Action")); + Objects.requireNonNull(actions); + + if (actions.length == 0) { + throw new IllegalArgumentException("actions must not be zero length"); } - return Collections.unmodifiableSet(validActions); - } - private static Privilege validateAndTrim(Privilege action, String paramName) { - if (action == null) { - throw new IllegalArgumentException(String.format("%s must not be null", paramName)); + Set validActions = new HashSet<>(actions.length, 1); + + for (Privilege action : actions) { + validActions.add(Objects.requireNonNull(action)); } - return action; + return Collections.unmodifiableSet(validActions); } public ConsolePermission resourceName(String resourceName) { @@ -74,18 +74,6 @@ boolean implies(ConsolePermission requiredPermission) { return false; } - // actions are optional, however if at least one action was specified, - // an intersection of compared sets must not be empty - if (requiredPermission.actions.isEmpty()) { - // no required actions - return true; - } - - if (actions.isEmpty()) { - // no possessed actions - return false; - } - if (actions.contains(Privilege.ALL)) { // all actions possessed return true; diff --git a/api/src/main/java/com/github/streamshub/console/api/service/TopicDescribeService.java b/api/src/main/java/com/github/streamshub/console/api/service/TopicDescribeService.java index c18573212..966355a8b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/TopicDescribeService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/TopicDescribeService.java @@ -284,7 +284,7 @@ private CompletableFuture maybeFetchConsumerGroups(Map topics return consumerGroupService .listConsumerGroupMembership(searchTopics) - .thenAccept(consumerGroups -> { + .thenAccept(consumerGroups -> topics.forEach((topicId, topic) -> { String idString = topicId.toString(); @@ -294,8 +294,8 @@ private CompletableFuture maybeFetchConsumerGroups(Map topics topic.consumerGroups().data().addAll(identifiers); topic.consumerGroups().addMeta("count", identifiers.size()); } - }); - }).toCompletableFuture(); + })) + .toCompletableFuture(); } /* package */ CompletionStage>> describeTopics( diff --git a/api/src/test/java/com/github/streamshub/console/api/BrokersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/BrokersResourceIT.java index b778af6a4..3ffbb3e05 100644 --- a/api/src/test/java/com/github/streamshub/console/api/BrokersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/BrokersResourceIT.java @@ -63,6 +63,8 @@ void setup() { utils = new TestHelper(bootstrapServers, config, null); client.resources(Kafka.class).inAnyNamespace().delete(); + consoleConfig.clearSecurity(); + utils.apply(client, new KafkaBuilder() .withNewMetadata() .withName("test-kafka1") diff --git a/api/src/test/java/com/github/streamshub/console/api/ConsumerGroupsResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/ConsumerGroupsResourceIT.java index 4dbd2ceac..ec0a574dc 100644 --- a/api/src/test/java/com/github/streamshub/console/api/ConsumerGroupsResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/ConsumerGroupsResourceIT.java @@ -125,6 +125,8 @@ void setup() throws IOException { utils = new TestHelper(bootstrapServers, config, null); client.resources(Kafka.class).inAnyNamespace().delete(); + consoleConfig.clearSecurity(); + utils.apply(client, utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers)); // Wait for the informer cache to be populated with all Kafka CRs diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java index 52bebf182..39cac2501 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java @@ -44,7 +44,6 @@ import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; -import com.github.streamshub.console.config.security.GlobalSecurityConfig; import com.github.streamshub.console.config.security.GlobalSecurityConfigBuilder; import com.github.streamshub.console.config.security.Privilege; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; @@ -136,10 +135,10 @@ void setup() throws IOException { .map(k -> k.getProperties().get("bootstrap.servers")) .orElseThrow()); - consoleConfig.setSecurity(new GlobalSecurityConfig()); utils = new TestHelper(bootstrapServers, config, null); client.resources(Kafka.class).inAnyNamespace().delete(); + consoleConfig.clearSecurity(); utils.apply(client, new KafkaBuilder(utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers, new KafkaListenerAuthenticationCustomBuilder() diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java index 4dc3047d5..33f188254 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceNoK8sIT.java @@ -66,6 +66,7 @@ void setup() throws IOException { .map(k -> k.getProperties().get("bootstrap.servers")) .orElseThrow()); + consoleConfig.clearSecurity(); utils = new TestHelper(bootstrapServers, config, null); clusterId1 = consoleConfig.getKafka().getCluster("test-kafka1").get().getId(); diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceOidcIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceOidcIT.java index 23136ee49..100196817 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceOidcIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceOidcIT.java @@ -15,7 +15,6 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.eclipse.microprofile.config.Config; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -25,7 +24,6 @@ import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.KafkaClusterConfig; -import com.github.streamshub.console.config.security.GlobalSecurityConfig; import com.github.streamshub.console.config.security.GlobalSecurityConfigBuilder; import com.github.streamshub.console.config.security.Privilege; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; @@ -86,6 +84,7 @@ void setup() throws IOException { tokens = new TokenUtils(config); client.resources(Kafka.class).inAnyNamespace().delete(); + consoleConfig.clearSecurity(); Kafka kafka1 = new KafkaBuilder(utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers)) .editOrNewStatus() @@ -127,12 +126,6 @@ void setup() throws IOException { clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); } - @AfterEach - void teardown() throws IOException { - client.resources(Kafka.class).inAnyNamespace().delete(); - consoleConfig.setSecurity(new GlobalSecurityConfig()); - } - @Test void testListClustersWithNoRolesDefined() { consoleConfig.setSecurity(oidcSecurity().build()); diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java index d2835be2f..aa321b747 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java @@ -14,7 +14,6 @@ import jakarta.ws.rs.core.Response.Status; import org.eclipse.microprofile.config.Config; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -121,6 +120,7 @@ void setup() { client.resources(Kafka.class).inAnyNamespace().delete(); client.resources(KafkaRebalance.class).inAnyNamespace().delete(); + consoleConfig.clearSecurity(); utils.apply(client, new KafkaBuilder(utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers)) .editSpec() @@ -160,12 +160,6 @@ void setup() { clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); } - @AfterEach - void tearDown() { - client.resources(Kafka.class).inAnyNamespace().delete(); - client.resources(KafkaRebalance.class).inAnyNamespace().delete(); - } - @Test void testListRebalancesIncludesAllowedActions() { whenRequesting(req -> req.get("", clusterId1)) diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceOidcIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceOidcIT.java new file mode 100644 index 000000000..deaa2f460 --- /dev/null +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceOidcIT.java @@ -0,0 +1,294 @@ +package com.github.streamshub.console.api; + +import java.net.URI; +import java.time.Instant; +import java.util.Arrays; +import java.util.UUID; + +import jakarta.inject.Inject; +import jakarta.ws.rs.core.Response.Status; + +import org.eclipse.microprofile.config.Config; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.config.security.GlobalSecurityConfigBuilder; +import com.github.streamshub.console.config.security.Privilege; +import com.github.streamshub.console.config.security.SecurityConfigBuilder; +import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; +import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; +import com.github.streamshub.console.kafka.systemtest.utils.TokenUtils; +import com.github.streamshub.console.test.TestHelper; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.quarkus.test.common.http.TestHTTPEndpoint; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; +import io.strimzi.api.ResourceLabels; +import io.strimzi.api.kafka.model.kafka.Kafka; +import io.strimzi.api.kafka.model.kafka.KafkaBuilder; +import io.strimzi.api.kafka.model.rebalance.KafkaRebalance; +import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceBuilder; +import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceMode; +import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState; +import io.strimzi.test.container.StrimziKafkaContainer; + +import static com.github.streamshub.console.test.TestHelper.whenRequesting; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +@QuarkusTest +@TestHTTPEndpoint(KafkaRebalancesResource.class) +@TestProfile(TestPlainProfile.class) +class KafkaRebalancesResourceOidcIT { + + @Inject + Config config; + + @Inject + KubernetesClient client; + + @Inject + ConsoleConfig consoleConfig; + + @DeploymentManager.InjectDeploymentManager + DeploymentManager deployments; + + TestHelper utils; + TokenUtils tokens; + + StrimziKafkaContainer kafkaContainer; + String clusterId1; + String clusterId2; + URI bootstrapServers; + URI randomBootstrapServers; + + static KafkaRebalance buildRebalance(int sequence, String clusterName, KafkaRebalanceMode mode, KafkaRebalanceState state) { + var builder = new KafkaRebalanceBuilder() + .withNewMetadata() + .withName("rebalance-" + sequence) + .withNamespace("default") + .endMetadata() + .withNewSpec() + .withMode(mode) + .endSpec(); + + if (clusterName != null) { + builder.editMetadata() + .addToLabels(ResourceLabels.STRIMZI_CLUSTER_LABEL, clusterName) + .endMetadata(); + } + + if (state != null) { + builder = builder + .withNewStatus() + .addNewCondition() + .withType(state.name()) + .withStatus("True") + .withLastTransitionTime(Instant.now().toString()) + .endCondition() + .addToOptimizationResult("intraBrokerDataToMoveMB", "0") + .endStatus(); + } + + return builder.build(); + } + + @BeforeEach + void setup() { + kafkaContainer = deployments.getKafkaContainer(); + bootstrapServers = URI.create(kafkaContainer.getBootstrapServers()); + randomBootstrapServers = URI.create(consoleConfig.getKafka() + .getCluster("default/test-kafka2") + .map(k -> k.getProperties().get("bootstrap.servers")) + .orElseThrow()); + + utils = new TestHelper(bootstrapServers, config, null); + tokens = new TokenUtils(config); + + client.resources(Kafka.class).inAnyNamespace().delete(); + client.resources(KafkaRebalance.class).inAnyNamespace().delete(); + consoleConfig.clearSecurity(); + + utils.apply(client, new KafkaBuilder(utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers)) + .editSpec() + .withNewCruiseControl() + // empty + .endCruiseControl() + .endSpec() + .build()); + + // Second cluster is offline/non-existent + utils.apply(client, new KafkaBuilder(utils.buildKafkaResource("test-kafka2", UUID.randomUUID().toString(), randomBootstrapServers)) + .editOrNewStatus() + .addNewCondition() + .withType("NotReady") + .withStatus("True") + .endCondition() + .endStatus() + .build()); + + int r = 0; + + // No cluster name - MUST BE FIRST for "Not found" test + utils.apply(client, buildRebalance(r++, null, KafkaRebalanceMode.FULL, null)); + + for (String clusterName : Arrays.asList("test-kafka1", "test-kafka2", "test-kafka3")) { + for (KafkaRebalanceMode mode : KafkaRebalanceMode.values()) { + // No status + utils.apply(client, buildRebalance(r++, clusterName, mode, null)); + + for (KafkaRebalanceState state : KafkaRebalanceState.values()) { + utils.apply(client, buildRebalance(r++, clusterName, mode, state)); + } + } + } + + clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); + clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); + } + + @ParameterizedTest + @CsvSource({ + "alice, a", + // bob is on both teams, not used for this test + "susan, b", + }) + void testListRebalancesWithPerTeamKafkaClusterAccess(String username, String team) { + int total = KafkaRebalanceMode.values().length * (KafkaRebalanceState.values().length + 1); + + consoleConfig.setSecurity(oidcSecurity() + .addNewSubject() + .withClaim("groups") + .withInclude("team-a") + .withRoleNames("dev-a") + .endSubject() + .addNewSubject() + .withClaim("groups") + .withInclude("team-b") + .withRoleNames("dev-b") + .endSubject() + .build()); + + consoleConfig.getKafka().getClusterById(clusterId1).ifPresent(cfg -> { + cfg.setSecurity(new SecurityConfigBuilder() + .addNewRole() + .withName("dev-a") + .addNewRule() + .withResources("rebalances") + .withPrivileges(Privilege.LIST) + .endRule() + .endRole() + .build()); + }); + + consoleConfig.getKafka().getClusterById(clusterId2).ifPresent(cfg -> { + cfg.setSecurity(new SecurityConfigBuilder() + .addNewRole() + .withName("dev-b") + .addNewRule() + .withResources("rebalances") + .withPrivileges(Privilege.LIST) + .endRule() + .endRole() + .build()); + }); + + String allowedId = "a".equals(team) ? clusterId1 : clusterId2; + String forbiddenId = "a".equals(team) ? clusterId2 : clusterId1; + + whenRequesting(req -> req + .auth() + .oauth2(tokens.getToken(username)) + .param("page[size]", total) + .get("", allowedId)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.size()", equalTo(total)); + + whenRequesting(req -> req + .auth() + .oauth2(tokens.getToken(username)) + .get("", forbiddenId)) + .assertThat() + .statusCode(is(Status.FORBIDDEN.getStatusCode())); + } + + @Test + void testListRebalancesWithUnrelatedRoleAccess() { + // alice is granted access to topics, but not rebalances + + consoleConfig.setSecurity(oidcSecurity() + .addNewSubject() + .withInclude("alice") + .withRoleNames("developer") + .endSubject() + .build()); + + consoleConfig.getKafka().getClusterById(clusterId1).ifPresent(cfg -> { + cfg.setSecurity(new SecurityConfigBuilder() + .addNewRole() + .withName("developer") + .addNewRule() + .withResources("topics") + .withPrivileges(Privilege.ALL) + .endRule() + .endRole() + .build()); + }); + + whenRequesting(req -> req + .auth() + .oauth2(tokens.getToken("alice")) + .get("", clusterId1)) + .assertThat() + .statusCode(is(Status.FORBIDDEN.getStatusCode())); + } + + @Test + void testListRebalancesWithMissingPrivilege() { + // alice can get and update rebalances, but she may not list them + + consoleConfig.setSecurity(oidcSecurity() + .addNewSubject() + .withInclude("alice") + .withRoleNames("developer") + .endSubject() + .build()); + + consoleConfig.getKafka().getClusterById(clusterId1).ifPresent(cfg -> { + cfg.setSecurity(new SecurityConfigBuilder() + .addNewRole() + .withName("developer") + .addNewRule() + .withResources("rebalances") + .withPrivileges(Privilege.GET, Privilege.UPDATE) + .endRule() + .endRole() + .build()); + }); + + whenRequesting(req -> req + .auth() + .oauth2(tokens.getToken("alice")) + .get("", clusterId1)) + .assertThat() + .statusCode(is(Status.FORBIDDEN.getStatusCode())); + } + + // Helper methods + + GlobalSecurityConfigBuilder oidcSecurity() { + return new GlobalSecurityConfigBuilder() + .withNewOidc() + .withClientId("console-client") + .withClientSecret("console-client-secret") + .withAuthServerUrl(config.getValue("console.test.oidc-url", String.class)) + .withIssuer(config.getValue("console.test.oidc-issuer", String.class)) + .endOidc(); + } + +} diff --git a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java index 3b5e31f81..342c3e1a8 100644 --- a/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/RecordsResourceIT.java @@ -41,6 +41,7 @@ import com.github.streamshub.console.api.support.KafkaContext; import com.github.streamshub.console.api.support.serdes.RecordData; import com.github.streamshub.console.config.ConsoleConfig; +import com.github.streamshub.console.config.KafkaClusterConfig; import com.github.streamshub.console.kafka.systemtest.TestPlainProfile; import com.github.streamshub.console.kafka.systemtest.deployment.DeploymentManager; import com.github.streamshub.console.test.RecordHelper; @@ -110,11 +111,20 @@ void setup() throws IOException { recordUtils = new RecordHelper(bootstrapServers, config, null); client.resources(Kafka.class).inAnyNamespace().delete(); + consoleConfig.clearSecurity(); utils.apply(client, utils.buildKafkaResource("test-kafka1", utils.getClusterId(), bootstrapServers)); // Second cluster is offline/non-existent utils.apply(client, utils.buildKafkaResource("test-kafka2", UUID.randomUUID().toString(), randomBootstrapServers)); + // Wait for the context map to be populated with all Kafka configurations + await().atMost(10, TimeUnit.SECONDS).until(() -> kafkaContexts.values() + .stream() + .map(KafkaContext::clusterConfig) + .map(KafkaClusterConfig::getName) + .toList() + .containsAll(List.of("test-kafka1", "test-kafka2"))); + clusterId1 = consoleConfig.getKafka().getCluster("default/test-kafka1").get().getId(); clusterId2 = consoleConfig.getKafka().getCluster("default/test-kafka2").get().getId(); } diff --git a/api/src/test/java/com/github/streamshub/console/api/TopicsResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/TopicsResourceIT.java index 1a9aa6f3b..3349ce824 100644 --- a/api/src/test/java/com/github/streamshub/console/api/TopicsResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/TopicsResourceIT.java @@ -158,6 +158,7 @@ void setup() throws IOException { client.resources(Kafka.class).inAnyNamespace().delete(); client.resources(KafkaTopic.class).inAnyNamespace().delete(); + consoleConfig.clearSecurity(); utils.apply(client, utils.buildKafkaResource(clusterName1, utils.getClusterId(), bootstrapServers1)); // Second cluster is offline/non-existent diff --git a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java index f2bf0b280..81c194650 100644 --- a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.github.streamshub.console.config.security.GlobalSecurityConfig; +import com.github.streamshub.console.config.security.SecurityConfig; import io.sundr.builder.annotations.Buildable; import io.xlate.validation.constraints.Expression; @@ -28,6 +29,7 @@ public class ConsoleConfig { KubernetesConfig kubernetes = new KubernetesConfig(); + @Valid GlobalSecurityConfig security = new GlobalSecurityConfig(); @Valid @@ -42,6 +44,27 @@ public boolean hasUniqueRegistryNames() { return schemaRegistries.stream().map(SchemaRegistryConfig::getName).distinct().count() == schemaRegistries.size(); } + /** + * Specifying security subjects local to a Kafka cluster is not allowed when global OIDC + * security is enabled. + */ + @JsonIgnore + @AssertTrue(message = "Security subjects must not be specified for Kafka clusters when OIDC security is used") + public boolean kafkaClusterSubjectsEmptyWithOIDC() { + if (security.getOidc() == null) { + return true; + } + + return kafka.getClusters().stream().allMatch(k -> k.getSecurity().getSubjects().isEmpty()); + } + + // testing + @JsonIgnore + public void clearSecurity() { + security = new GlobalSecurityConfig(); + kafka.getClusters().forEach(k -> k.setSecurity(new SecurityConfig())); + } + public KubernetesConfig getKubernetes() { return kubernetes; } diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java index c08d606a2..dcaf96365 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java @@ -3,6 +3,7 @@ import java.util.LinkedHashMap; import java.util.Map; +import jakarta.validation.Valid; import jakarta.validation.constraints.NotBlank; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -18,6 +19,7 @@ public class KafkaClusterConfig { private String name; private String namespace; private String listener; + @Valid private SecurityConfig security = new SecurityConfig(); /** * Name of a configured schema registry that will be used to ser/des configurations diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java index ee0635670..d87c6fe2b 100644 --- a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java @@ -30,6 +30,13 @@ public Optional getCluster(String clusterKey) { .findFirst(); } + @JsonIgnore + public Optional getClusterById(String clusterId) { + return clusters.stream() + .filter(k -> clusterId.equals(k.getId())) + .findFirst(); + } + public List getClusters() { return clusters; } diff --git a/common/src/main/java/com/github/streamshub/console/config/security/GlobalSecurityConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/GlobalSecurityConfig.java index deafbfe2f..a2602deb4 100644 --- a/common/src/main/java/com/github/streamshub/console/config/security/GlobalSecurityConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/security/GlobalSecurityConfig.java @@ -1,10 +1,13 @@ package com.github.streamshub.console.config.security; +import jakarta.validation.Valid; + import io.sundr.builder.annotations.Buildable; @Buildable(editableEnabled = false) public class GlobalSecurityConfig extends SecurityConfig { + @Valid private OidcConfig oidc; public OidcConfig getOidc() { diff --git a/common/src/main/java/com/github/streamshub/console/config/security/RoleConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/RoleConfig.java index 6065bbcac..03a6bf6ad 100644 --- a/common/src/main/java/com/github/streamshub/console/config/security/RoleConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/security/RoleConfig.java @@ -3,12 +3,20 @@ import java.util.ArrayList; import java.util.List; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotEmpty; + import io.sundr.builder.annotations.Buildable; @Buildable(editableEnabled = false) public class RoleConfig { + @NotBlank private String name; + + @Valid + @NotEmpty private List rules = new ArrayList<>(); public String getName() { diff --git a/common/src/main/java/com/github/streamshub/console/config/security/RuleConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/RuleConfig.java index f1b0fa5d4..357c61815 100644 --- a/common/src/main/java/com/github/streamshub/console/config/security/RuleConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/security/RuleConfig.java @@ -3,6 +3,9 @@ import java.util.ArrayList; import java.util.List; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + import io.sundr.builder.annotations.Buildable; @Buildable(editableEnabled = false) @@ -11,17 +14,19 @@ public class RuleConfig { /** * Resources to which this rule applies (required) */ - List resources = new ArrayList<>(); + @NotEmpty + List<@NotNull String> resources = new ArrayList<>(); /** * Specific resource names to which this rule applies (optional) */ - List resourceNames = new ArrayList<>(); + List<@NotNull String> resourceNames = new ArrayList<>(); /** * Privileges/actions that may be performed for subjects having this rule */ - List privileges = new ArrayList<>(); + @NotEmpty + List<@NotNull Privilege> privileges = new ArrayList<>(); public List getResources() { return resources; diff --git a/common/src/main/java/com/github/streamshub/console/config/security/SecurityConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/SecurityConfig.java index 4929e4eea..3038ac598 100644 --- a/common/src/main/java/com/github/streamshub/console/config/security/SecurityConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/security/SecurityConfig.java @@ -3,12 +3,17 @@ import java.util.ArrayList; import java.util.List; +import jakarta.validation.Valid; + import io.sundr.builder.annotations.Buildable; @Buildable(editableEnabled = false) public class SecurityConfig { + @Valid private List subjects = new ArrayList<>(); + + @Valid private List roles = new ArrayList<>(); public List getSubjects() { diff --git a/common/src/main/java/com/github/streamshub/console/config/security/SubjectConfig.java b/common/src/main/java/com/github/streamshub/console/config/security/SubjectConfig.java index bd6891ff6..3aee5ae2f 100644 --- a/common/src/main/java/com/github/streamshub/console/config/security/SubjectConfig.java +++ b/common/src/main/java/com/github/streamshub/console/config/security/SubjectConfig.java @@ -3,14 +3,21 @@ import java.util.ArrayList; import java.util.List; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + import io.sundr.builder.annotations.Buildable; @Buildable(editableEnabled = false) public class SubjectConfig { private String claim; - private List include = new ArrayList<>(); - private List roleNames = new ArrayList<>(); + + @NotEmpty + private List<@NotNull String> include = new ArrayList<>(); + + @NotEmpty + private List<@NotNull String> roleNames = new ArrayList<>(); public String getClaim() { return claim; diff --git a/operator/pom.xml b/operator/pom.xml index 3820eebaf..f0de15bf1 100644 --- a/operator/pom.xml +++ b/operator/pom.xml @@ -128,7 +128,7 @@ prepare-agent - *QuarkusClassLoader + ${project.build.directory}/jacoco-quarkus.exec true @@ -139,7 +139,7 @@ prepare-agent-integration - *QuarkusClassLoader + ${project.build.directory}/jacoco-quarkus.exec true