resourceNames = new ArrayList<>(1);
+
+ setResource(resource, resourceNames);
+
+ var requiredPermission = new ConsolePermission(resource.toString(), resourceNames, authz.value());
+
+ boolean allow = securityIdentity.checkPermission(requiredPermission)
+ .subscribeAsCompletionStage()
+ .get();
+
+ if (!allow) {
+ throw new ForbiddenException("Access denied");
+ }
+
+ return context.proceed();
+ }
+
+ /**
+ * Pull the resource type and resource name from the request URI path to be used
+ * to determine authorization. The path is transformed as follows.
+ *
+ *
+ * Given a resource path `/api/kafkas/xyz/topics/abc/records`:
+ *
+ *
+ * - Skip the leading `/api` segment
+ *
- Append segments `kafkas/xyz/topics` to the resource type
+ *
- Use segment `abc` as the resource name
+ *
- Append segment `/records` to the resource type
+ *
+ *
+ *
+ * For a principal to be authorized to access the resource, they must be a member
+ * of a role with access to `kafkas` `xyz` (named or all `kafkas`), and further
+ * with access to resource `topics/records` `abc` (named or all `topics/records`).
+ *
+ * @param resource target resource type builder
+ * @param resourceNames collection to hold the resource name
+ */
+ private void setResource(StringBuilder resource, List resourceNames) {
+ var segments = requestUri.getPathSegments();
+ var segmentCount = segments.size();
+
+ // skip the first segment `/api`
+ String kafkas = segments.get(1).getPath();
+ resource.append(kafkas);
+
+ if (segmentCount > 2) {
+ String kafkaId = segments.get(2).getPath();
+ String kafkaName = Optional.ofNullable(contexts.get(kafkaId))
+ .map(KafkaContext::clusterConfig)
+ .map(KafkaClusterConfig::getName)
+ .orElseThrow(() -> ClientFactory.NO_SUCH_KAFKA.apply(kafkaId));
+
+ /*
+ * For URLs like `/api/kafkas/123`, the Kafka ID is the resource name
+ * and is configured at the top-level `security` key in the console's
+ * configuration. Otherwise, the Kafka ID is appended to the resource
+ * path and the configuration originates from the Kafka-level `security`
+ * key, scoped to the Kafka cluster under which it is specified.
+ */
+
+ if (segmentCount > 3) {
+ resource.append('/');
+ resource.append(kafkaName);
+ } else {
+ resourceNames.add(kafkaName);
+ }
+ }
+
+ setKafkaResource(resource, resourceNames, segments);
+ }
+
+ private void setKafkaResource(StringBuilder resource, List resourceNames, List segments) {
+ int segmentCount = segments.size();
+ UnaryOperator converter = UnaryOperator.identity();
+
+ for (int s = 3; s < segmentCount; s++) {
+ String segment = segments.get(s).getPath();
+
+ if (s == 4) {
+ resourceNames.add(converter.apply(segment));
+ } else {
+ if (s == 3) {
+ if (ResourceTypes.Kafka.TOPICS.value().equals(segment)) {
+ converter = this::topicName;
+ } else if (ResourceTypes.Kafka.REBALANCES.value().equals(segment)) {
+ converter = this::rebalanceName;
+ }
+ }
+ resource.append('/');
+ resource.append(segment);
+ }
+ }
+ }
+
+ /**
+ * Attempt to cross-reference the topic ID to the topic name which is used to
+ * configure topic-level authorization.
+ */
+ private String topicName(String topicId) {
+ return topicDescribe.topicNameForId(topicId).toCompletableFuture().join()
+ .orElseThrow(() -> new UnknownTopicIdException("No such topic: " + topicId));
+ }
+
+ /**
+ * Extract the Kafka Rebalance name from the encoded rebalanceId.
+ */
+ private String rebalanceName(String rebalanceId) {
+ String decodedId = new String(Base64.getUrlDecoder().decode(rebalanceId));
+ String[] idElements = decodedId.split("/");
+
+ if (idElements.length != 2) {
+ throw new NotFoundException("No such rebalance: " + rebalanceId);
+ }
+
+ return idElements[1];
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/Authorized.java b/api/src/main/java/com/github/streamshub/console/api/security/Authorized.java
new file mode 100644
index 000000000..4fc482335
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/Authorized.java
@@ -0,0 +1,18 @@
+package com.github.streamshub.console.api.security;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import jakarta.interceptor.InterceptorBinding;
+
+/**
+ * Binding annotation to mark methods that should be intercepted by the
+ * {@link AuthorizationInterceptor}.
+ */
+@InterceptorBinding
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.TYPE, ElementType.METHOD })
+public @interface Authorized {
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java b/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java
new file mode 100644
index 000000000..853c2b76b
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java
@@ -0,0 +1,489 @@
+package com.github.streamshub.console.api.security;
+
+import java.io.IOException;
+import java.security.Permission;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+import jakarta.ws.rs.core.MediaType;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.eclipse.microprofile.jwt.JsonWebToken;
+import org.jboss.logging.Logger;
+import org.jose4j.jwt.JwtClaims;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.streamshub.console.api.ClientFactory;
+import com.github.streamshub.console.api.model.Error;
+import com.github.streamshub.console.api.model.ErrorResponse;
+import com.github.streamshub.console.api.support.ErrorCategory;
+import com.github.streamshub.console.api.support.KafkaContext;
+import com.github.streamshub.console.config.ConsoleConfig;
+import com.github.streamshub.console.config.security.Audit;
+import com.github.streamshub.console.config.security.AuditConfig;
+import com.github.streamshub.console.config.security.Privilege;
+import com.github.streamshub.console.config.security.SecurityConfig;
+import com.github.streamshub.console.config.security.SubjectConfig;
+
+import io.quarkus.oidc.runtime.OidcAuthenticationMechanism;
+import io.quarkus.oidc.runtime.OidcJwtCallerPrincipal;
+import io.quarkus.security.AuthenticationFailedException;
+import io.quarkus.security.credential.Credential;
+import io.quarkus.security.identity.IdentityProviderManager;
+import io.quarkus.security.identity.SecurityIdentity;
+import io.quarkus.security.identity.request.AnonymousAuthenticationRequest;
+import io.quarkus.security.identity.request.AuthenticationRequest;
+import io.quarkus.security.identity.request.TokenAuthenticationRequest;
+import io.quarkus.security.identity.request.UsernamePasswordAuthenticationRequest;
+import io.quarkus.security.runtime.QuarkusPrincipal;
+import io.quarkus.security.runtime.QuarkusSecurityIdentity;
+import io.quarkus.vertx.http.runtime.security.ChallengeData;
+import io.quarkus.vertx.http.runtime.security.HttpAuthenticationMechanism;
+import io.smallrye.mutiny.Uni;
+import io.vertx.core.MultiMap;
+import io.vertx.ext.web.RoutingContext;
+
+@Alternative
+@Priority(1)
+@ApplicationScoped
+public class ConsoleAuthenticationMechanism implements HttpAuthenticationMechanism {
+
+ public static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+ public static final String PLAIN = "PLAIN";
+ public static final String SCRAM_SHA256 = "SCRAM-SHA-256";
+ public static final String SCRAM_SHA512 = "SCRAM-SHA-512";
+
+ private static final String BEARER = "Bearer ";
+ private static final String BASIC = "Basic ";
+
+ private static final SecurityIdentity ANONYMOUS = QuarkusSecurityIdentity.builder()
+ .setAnonymous(true)
+ .setPrincipal(new QuarkusPrincipal("ANONYMOUS"))
+ .build();
+
+ @Inject
+ Logger log;
+
+ @Inject
+ ObjectMapper mapper;
+
+ @Inject
+ ConsoleConfig consoleConfig;
+
+ @Inject
+ Map contexts;
+
+ @Inject
+ OidcAuthenticationMechanism oidc;
+
+ boolean oidcEnabled() {
+ return Objects.nonNull(consoleConfig.getSecurity().getOidc());
+ }
+
+ @Override
+ public Uni authenticate(RoutingContext context, IdentityProviderManager identityProviderManager) {
+ if (oidcEnabled()) {
+ return oidc.authenticate(context, identityProviderManager)
+ .map(identity -> augmentIdentity(context, identity))
+ .onFailure().invoke(this::maybeLogAuthenticationFailure);
+ }
+
+ String clusterId = getClusterId(context);
+
+ if (clusterId == null) {
+ return Uni.createFrom().item(createAnonymousIdentity(null));
+ }
+
+ var ctx = contexts.get(clusterId);
+
+ if (ctx == null) {
+ // No Kafka context to establish identity, become anonymous
+ return Uni.createFrom().item(createAnonymousIdentity(null));
+ }
+
+ String saslMechanism = ctx.saslMechanism(Admin.class);
+
+ if (ctx.admin() != null || saslMechanism.isEmpty()) {
+ // Admin credentials already given or there is no SASL authentication needed
+ return Uni.createFrom().item(createAnonymousIdentity(ctx));
+ }
+
+ var identity = createIdentity(ctx, context.request().headers(), saslMechanism);
+
+ if (identity != null) {
+ return Uni.createFrom().item(identity);
+ }
+
+ return Uni.createFrom().failure(new AuthenticationFailedException());
+ }
+
+ @Override
+ public Uni sendChallenge(RoutingContext context) {
+ return getChallenge(context).map(challengeData -> {
+ if (challengeData == null) {
+ return false;
+ }
+
+ var response = context.response();
+ response.setStatusCode(challengeData.status);
+
+ if (challengeData.headerName != null) {
+ response.headers().set(challengeData.headerName, challengeData.headerContent);
+ }
+
+ response.headers().set(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON);
+
+ try {
+ response.send(mapper.writeValueAsString(((PayloadChallengeData) challengeData).payload));
+ } catch (IOException e) {
+ log.warnf(e, "Failed to serialize challenge response body: %s", e.getMessage());
+ }
+
+ return true;
+ });
+ }
+
+ @Override
+ public Uni getChallenge(RoutingContext context) {
+ if (oidcEnabled()) {
+ return oidc.getChallenge(context)
+ .map(data -> {
+ var category = ErrorCategory.get(ErrorCategory.NotAuthenticated.class);
+ Error error = category.createError("Authentication credentials missing or invalid", null, null);
+ var responseBody = new ErrorResponse(List.of(error));
+ return new PayloadChallengeData(data, responseBody);
+ });
+ }
+
+ String clusterId = getClusterId(context);
+
+ if (clusterId == null) {
+ return Uni.createFrom().nullItem();
+ }
+
+ var ctx = contexts.get(clusterId);
+
+ if (ctx == null) {
+ return Uni.createFrom().nullItem();
+ }
+
+ String saslMechanism = ctx.saslMechanism(Admin.class);
+ String scheme = getAuthorizationScheme(saslMechanism);
+ ChallengeData challenge;
+
+ if (scheme != null) {
+ var category = ErrorCategory.get(ErrorCategory.NotAuthenticated.class);
+ Error error = category.createError("Authentication credentials missing or invalid", null, null);
+ var responseBody = new ErrorResponse(List.of(error));
+ challenge = new PayloadChallengeData(401, "WWW-Authenticate", scheme, responseBody);
+ } else {
+ log.warnf("Access not permitted to cluster %s with unknown SASL mechanism '%s'",
+ clusterId, saslMechanism);
+ var category = ErrorCategory.get(ErrorCategory.ResourceNotFound.class);
+ Error error = category.createError(ClientFactory.NO_SUCH_KAFKA_MESSAGE.formatted(clusterId), null, null);
+ var responseBody = new ErrorResponse(List.of(error));
+ challenge = new PayloadChallengeData(404, null, null, responseBody);
+ }
+
+ return Uni.createFrom().item(challenge);
+ }
+
+ @Override
+ public Set> getCredentialTypes() {
+ if (oidcEnabled()) {
+ return oidc.getCredentialTypes();
+ }
+
+ return Set.of(
+ AnonymousAuthenticationRequest.class,
+ TokenAuthenticationRequest.class,
+ UsernamePasswordAuthenticationRequest.class
+ );
+ }
+
+ private String getClusterId(RoutingContext context) {
+ Pattern p = Pattern.compile("/api/kafkas/([^/]+)(?:/.*)?");
+ Matcher m = p.matcher(context.normalizedPath());
+ if (m.matches()) {
+ return m.group(1);
+ }
+ return null;
+ }
+
+ private String getAuthorizationScheme(String saslMechanism) {
+ switch (saslMechanism) {
+ case OAUTHBEARER:
+ return BEARER.trim();
+ case PLAIN, SCRAM_SHA256, SCRAM_SHA512:
+ return BASIC.trim();
+ default:
+ return null;
+ }
+ }
+
+ private SecurityIdentity createAnonymousIdentity(KafkaContext ctx) {
+ return createIdentity(ctx, ANONYMOUS);
+ }
+
+ private SecurityIdentity augmentIdentity(RoutingContext context, SecurityIdentity identity) {
+ if (identity != null) {
+ String clusterId = getClusterId(context);
+ var ctx = clusterId != null ? contexts.get(clusterId) : null;
+ return createIdentity(ctx, identity);
+ }
+ throw new AuthenticationFailedException();
+ }
+
+ private SecurityIdentity createIdentity(KafkaContext ctx, SecurityIdentity source) {
+ var builder = QuarkusSecurityIdentity.builder(source);
+ addRoleChecker(ctx, builder, source.getPrincipal());
+ return builder.build();
+ }
+
+ private SecurityIdentity createIdentity(KafkaContext ctx, MultiMap headers, String saslMechanism) {
+ switch (saslMechanism) {
+ case OAUTHBEARER:
+ return createOAuthIdentity(ctx, headers);
+ case PLAIN:
+ return createBasicIdentity(ctx, headers, SaslJaasConfigCredential::forPlainLogin);
+ case SCRAM_SHA256, SCRAM_SHA512:
+ return createBasicIdentity(ctx, headers, SaslJaasConfigCredential::forScramLogin);
+ default:
+ return null;
+ }
+ }
+
+ private SecurityIdentity createOAuthIdentity(KafkaContext ctx, MultiMap headers) {
+ return getAuthorization(headers, BEARER)
+ .map(accessToken -> {
+ var builder = QuarkusSecurityIdentity.builder();
+ builder.addCredential(SaslJaasConfigCredential.forOAuthLogin(accessToken));
+ Principal principal;
+
+ try {
+ var claims = JwtClaims.parse(accessToken);
+ principal = new OidcJwtCallerPrincipal(claims, null);
+ } catch (Exception e) {
+ log.infof("JWT access token could not be parsed: %s", e.getMessage());
+ principal = new QuarkusPrincipal("UNKNOWN");
+ }
+
+ builder.setPrincipal(principal);
+ addRoleChecker(ctx, builder, principal);
+ return builder.build();
+ })
+ .orElse(null);
+ }
+
+ private SecurityIdentity createBasicIdentity(KafkaContext ctx, MultiMap headers, BiFunction credentialBuilder) {
+ return getBasicAuthentication(headers)
+ .map(userpass -> {
+ var builder = QuarkusSecurityIdentity.builder();
+ var principal = new QuarkusPrincipal(userpass[0]);
+ builder.addCredential(credentialBuilder.apply(userpass[0], userpass[1]));
+ builder.setPrincipal(principal);
+ addRoleChecker(ctx, builder, principal);
+ return builder.build();
+ })
+ .orElse(null);
+ }
+
+ private void addRoleChecker(KafkaContext ctx, QuarkusSecurityIdentity.Builder builder, Principal principal) {
+ var globalSecurity = consoleConfig.getSecurity();
+ Optional clusterSecurity = ctx != null
+ ? Optional.of(ctx.clusterConfig().getSecurity())
+ : Optional.empty();
+
+ var auditRules = mergeAuditRules(
+ getAuditRules(globalSecurity.getAudit(), ""),
+ clusterSecurity.map(c -> getAuditRules(c.getAudit(), "kafkas/" + ctx.clusterConfig().getName() + '/'))
+ .orElseGet(Collections::emptyMap)
+ );
+
+ if (globalSecurity.getRoles().isEmpty()
+ && clusterSecurity.map(cs -> cs.getRoles().isEmpty()).orElse(true)) {
+ // No roles are defined - allow everything
+ builder.addPermissionChecker(requiredPermission -> {
+ auditLog(principal, requiredPermission, true, auditRules.get(requiredPermission));
+ return Uni.createFrom().item(true);
+ });
+
+ return;
+ }
+
+ Stream globalSubjects = globalSecurity.getSubjects().stream();
+ Stream clusterSubjects = clusterSecurity.map(cs -> cs.getSubjects().stream())
+ .orElseGet(Stream::empty);
+
+ List roleNames = Stream.concat(clusterSubjects, globalSubjects)
+ .filter(sub -> matchesPrincipal(sub, principal))
+ .flatMap(sub -> sub.getRoleNames().stream())
+ .distinct()
+ .toList();
+
+ Stream globalPermissions = getPermissions(globalSecurity, roleNames, "");
+ Stream clusterPermissions = clusterSecurity
+ .map(cs -> getPermissions(cs, roleNames, "kafkas/" + ctx.clusterConfig().getName() + '/'))
+ .orElseGet(Stream::empty);
+
+ List possessedPermissions = Stream.concat(globalPermissions, clusterPermissions).toList();
+
+ builder.addPermissionChecker(requiredPermission -> {
+ boolean allowed = possessedPermissions
+ .stream()
+ .anyMatch(possessed -> possessed.implies(requiredPermission));
+
+ auditLog(principal, requiredPermission, allowed, auditRules.get(requiredPermission));
+ return Uni.createFrom().item(allowed);
+ });
+ }
+
+ private void auditLog(Principal principal, Permission required, boolean allowed, Audit audit) {
+ if (audit != null && audit.logResult(allowed)) {
+ log.infof("%s %s %s", principal.getName(), allowed ? "allowed" : "denied", required);
+ } else {
+ log.tracef("%s %s %s", principal.getName(), allowed ? "allowed" : "denied", required);
+ }
+ }
+
+ private void maybeLogAuthenticationFailure(Throwable t) {
+ if (t.getCause() instanceof org.jose4j.jwt.consumer.InvalidJwtException ije) {
+ log.debugf("Invalid JWT: %s", ije.getErrorDetails());
+ }
+ }
+
+ private boolean matchesPrincipal(SubjectConfig subjectConfig, Principal principal) {
+ String claimName = subjectConfig.getClaim();
+ List include = subjectConfig.getInclude();
+
+ if (claimName == null) {
+ return include.contains(principal.getName());
+ } else if (principal instanceof JsonWebToken jwt) {
+ Object claim = jwt.getClaim(claimName);
+
+ if (claim instanceof String) {
+ return include.contains(claim);
+ }
+
+ // array claim, like set/list of groups
+ if (claim instanceof Collection> values) {
+ for (Object value : values) {
+ if (include.contains(value)) {
+ return true;
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ private Stream getPermissions(SecurityConfig security, Collection roleNames, String resourcePrefix) {
+ return security.getRoles()
+ .stream()
+ .filter(role -> roleNames.contains(role.getName()))
+ .flatMap(role -> role.getRules().stream())
+ .flatMap(rule -> {
+ List rulePermissions = new ArrayList<>();
+ Privilege[] actions = rule.getPrivileges().toArray(Privilege[]::new);
+
+ for (var resource : rule.getResources()) {
+ rulePermissions.add(new ConsolePermission(
+ resourcePrefix + resource,
+ rule.getResourceNames(),
+ actions
+ ));
+ }
+
+ return rulePermissions.stream();
+ });
+ }
+
+ private Map mergeAuditRules(Map global, Map cluster) {
+ return Stream.concat(global.entrySet().stream(), cluster.entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ private Map getAuditRules(List audits, String resourcePrefix) {
+ return audits.stream().flatMap(rule -> {
+ Map auditRules = new HashMap<>();
+ Set actions = rule.getPrivileges().stream().flatMap(p -> p.expand().stream()).collect(Collectors.toSet());
+
+ for (var action : actions) {
+ for (var resource : rule.getResources()) {
+ if (rule.getResourceNames().isEmpty()) {
+ auditRules.put(
+ new ConsolePermission(
+ resourcePrefix + resource,
+ Collections.emptySet(),
+ action),
+ rule.getDecision());
+ } else {
+ for (String name : rule.getResourceNames()) {
+ auditRules.put(
+ new ConsolePermission(
+ resourcePrefix + resource,
+ Collections.singleton(name),
+ action),
+ rule.getDecision());
+ }
+ }
+ }
+ }
+
+ return auditRules.entrySet().stream();
+ }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ private Optional getBasicAuthentication(MultiMap headers) {
+ return getAuthorization(headers, BASIC)
+ .map(Base64.getDecoder()::decode)
+ .map(String::new)
+ .filter(authn -> authn.indexOf(':') >= 0)
+ .map(authn -> new String[] {
+ authn.substring(0, authn.indexOf(':')),
+ authn.substring(authn.indexOf(':') + 1)
+ })
+ .filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty());
+ }
+
+ private Optional getAuthorization(MultiMap headers, String scheme) {
+ return Optional.ofNullable(headers.get(HttpHeaders.AUTHORIZATION))
+ .filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length()))
+ .map(header -> header.substring(scheme.length()));
+ }
+
+ private static class PayloadChallengeData extends ChallengeData {
+ public final Object payload;
+
+ public PayloadChallengeData(int status, CharSequence headerName, String headerContent, Object payload) {
+ super(status, headerName, headerContent);
+ this.payload = payload;
+ }
+
+ public PayloadChallengeData(ChallengeData data, Object payload) {
+ super(data.status, data.headerName, data.headerContent);
+ this.payload = payload;
+ }
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java b/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java
new file mode 100644
index 000000000..3bbc9aae4
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java
@@ -0,0 +1,169 @@
+package com.github.streamshub.console.api.security;
+
+import java.security.Permission;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.github.streamshub.console.config.security.Privilege;
+
+import static java.util.function.Predicate.not;
+
+public class ConsolePermission extends Permission {
+
+ private static final long serialVersionUID = 1L;
+ public static final String ACTIONS_SEPARATOR = ",";
+
+ private String resource;
+ private Collection resourceNames;
+ private final Set actions;
+
+ public ConsolePermission(String resource, Privilege... actions) {
+ super("console");
+ this.resource = resource;
+ this.resourceNames = Collections.emptySet();
+ this.actions = checkActions(actions);
+ }
+
+ public ConsolePermission(String resource, Collection resourceNames, Privilege... actions) {
+ super("console");
+ this.resource = resource;
+ this.resourceNames = resourceNames;
+ this.actions = checkActions(actions);
+ }
+
+ private static Set checkActions(Privilege[] actions) {
+ Objects.requireNonNull(actions);
+
+ if (actions.length == 0) {
+ throw new IllegalArgumentException("actions must not be zero length");
+ }
+
+ Set validActions = new HashSet<>(actions.length, 1);
+
+ for (Privilege action : actions) {
+ validActions.add(Objects.requireNonNull(action));
+ }
+
+ return Collections.unmodifiableSet(validActions);
+ }
+
+ ConsolePermission resourceName(String resourceName) {
+ this.resourceNames = Collections.singleton(resourceName);
+ return this;
+ }
+
+ @Override
+ public boolean implies(Permission other) {
+ if (other instanceof ConsolePermission requiredPermission) {
+ if (!getName().equals(requiredPermission.getName())) {
+ return false;
+ }
+
+ return implies(requiredPermission);
+ } else {
+ return false;
+ }
+ }
+
+ boolean implies(ConsolePermission requiredPermission) {
+ if (resourceDenied(requiredPermission)) {
+ return false;
+ }
+
+ if (actions.contains(Privilege.ALL)) {
+ // all actions possessed
+ return true;
+ }
+
+ for (Privilege action : requiredPermission.actions) {
+ if (actions.contains(action)) {
+ // has at least one of required actions
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ boolean resourceDenied(ConsolePermission requiredPermission) {
+ /*
+ * The action requires a permission unrelated to this configured
+ * permission.
+ * E.g. consumerGroups versus topics
+ */
+ if (!requiredPermission.resource.equals(resource)) {
+ return true;
+ }
+
+ if (resourceNames.isEmpty()) {
+ /*
+ * Configuration does not specify any resource names, so
+ * access to any is allowed.
+ */
+ return false;
+ }
+
+ if (requiredPermission.resourceNames.isEmpty()) {
+ /*
+ * Configuration specifies named resources, but this request
+ * has no resource name. I.e., the request is for an index/list
+ * end point. The permission is granted here, but individual
+ * resources in the list response may be filtered later.
+ */
+ return false;
+ }
+
+ /*
+ * Deny when any of the required names are not given in configuration.
+ */
+ return requiredPermission.resourceNames.stream().anyMatch(not(this::matchesResourceName));
+ }
+
+ boolean matchesResourceName(String requiredName) {
+ if (resourceNames.contains(requiredName)) {
+ return true;
+ }
+
+ return resourceNames.stream()
+ .filter(n -> n.endsWith("*"))
+ .map(n -> n.substring(0, n.length() - 1))
+ .anyMatch(requiredName::startsWith);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!(obj instanceof ConsolePermission other)) {
+ return false;
+ }
+
+ return getName().equals(other.getName())
+ && resource.equals(other.resource)
+ && actions.equals(other.actions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getName(), resource, actions);
+ }
+
+ @Override
+ public String toString() {
+ return getName() + ":" + resource + ":" + resourceNames + ":" + actions;
+ }
+
+ /**
+ * @return null if no actions were specified, or actions joined together with the {@link #ACTIONS_SEPARATOR}
+ */
+ @Override
+ public String getActions() {
+ return actions.isEmpty() ? null : actions.stream().map(Enum::name).collect(Collectors.joining(ACTIONS_SEPARATOR));
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/OidcTenantConfigResolver.java b/api/src/main/java/com/github/streamshub/console/api/security/OidcTenantConfigResolver.java
new file mode 100644
index 000000000..f91189fa0
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/OidcTenantConfigResolver.java
@@ -0,0 +1,51 @@
+package com.github.streamshub.console.api.security;
+
+import java.util.List;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import com.github.streamshub.console.config.ConsoleConfig;
+
+import io.quarkus.oidc.OidcRequestContext;
+import io.quarkus.oidc.OidcTenantConfig;
+import io.quarkus.oidc.TenantConfigResolver;
+import io.smallrye.mutiny.Uni;
+import io.vertx.ext.web.RoutingContext;
+
+/**
+ * This class is discovered and used by the Quarkus OIDC framework. The purpose
+ * is to create an OIDC tenant from the ConsoleConfig (sourced from YAML) that
+ * is provided to the console by the user directly or via the operator.
+ */
+@ApplicationScoped
+public class OidcTenantConfigResolver implements TenantConfigResolver {
+
+ @Inject
+ ConsoleConfig consoleConfig;
+
+ OidcTenantConfig oidcConfig;
+
+ @PostConstruct
+ void initialize() {
+ oidcConfig = new OidcTenantConfig();
+ var oidc = consoleConfig.getSecurity().getOidc();
+
+ oidcConfig.setTenantId(oidc.getTenantId());
+ oidcConfig.setDiscoveryEnabled(true);
+ oidcConfig.setAuthServerUrl(oidc.getAuthServerUrl());
+ oidcConfig.setRoles(OidcTenantConfig.Roles.fromClaimPath(List.of("groups")));
+
+ if (oidc.getIssuer() != null) {
+ oidcConfig.getToken().setIssuer(oidc.getIssuer());
+ }
+ }
+
+ @Override
+ public Uni resolve(RoutingContext routingContext,
+ OidcRequestContext requestContext) {
+ return Uni.createFrom().item(oidcConfig);
+ }
+
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/PermissionService.java b/api/src/main/java/com/github/streamshub/console/api/security/PermissionService.java
new file mode 100644
index 000000000..711044777
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/PermissionService.java
@@ -0,0 +1,69 @@
+package com.github.streamshub.console.api.security;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.ForbiddenException;
+
+import com.github.streamshub.console.api.support.KafkaContext;
+import com.github.streamshub.console.config.security.Privilege;
+import com.github.streamshub.console.config.security.ResourceTypes;
+
+import io.quarkus.security.identity.SecurityIdentity;
+
+@RequestScoped
+public class PermissionService {
+
+ private static final Set KAFKA_SUBRESOURCES = Stream.of(ResourceTypes.Kafka.values())
+ .map(v -> v.value())
+ .collect(Collectors.toSet());
+
+ @Inject
+ SecurityIdentity securityIdentity;
+
+ @Inject
+ KafkaContext kafkaContext;
+
+ private String resolveResource(String resource) {
+ if (KAFKA_SUBRESOURCES.contains(resource)) {
+ resource = "kafkas/" + kafkaContext.clusterConfig().getName() + '/' + resource;
+ }
+ return resource;
+ }
+
+ private boolean checkPermission(ConsolePermission required) {
+ return securityIdentity.checkPermission(required)
+ .subscribeAsCompletionStage()
+ .join();
+ }
+
+ public Predicate permitted(String resource, Privilege privilege, Function name) {
+ ConsolePermission required = new ConsolePermission(resolveResource(resource), privilege);
+
+ return (T item) -> {
+ required.resourceName(name.apply(item));
+ return checkPermission(required);
+ };
+ }
+
+ public boolean permitted(String resource, Privilege privilege, String name) {
+ return checkPermission(new ConsolePermission(resolveResource(resource), List.of(name), privilege));
+ }
+
+ public void assertPermitted(String resource, Privilege privilege, String name) {
+ if (!permitted(resource, privilege, name)) {
+ throw forbidden(resource, privilege, name);
+ }
+ }
+
+ public ForbiddenException forbidden(String resource, Privilege privilege, String name) {
+ return new ForbiddenException("Access denied: resource={%s} privilege:{%s}, resourceName:{%s}"
+ .formatted(resource, privilege, name));
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ResourcePrivilege.java b/api/src/main/java/com/github/streamshub/console/api/security/ResourcePrivilege.java
new file mode 100644
index 000000000..da592bb32
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/ResourcePrivilege.java
@@ -0,0 +1,20 @@
+package com.github.streamshub.console.api.security;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import com.github.streamshub.console.config.security.Privilege;
+
+/**
+ * Method annotation used by the {@link AuthorizationInterceptor} to declare
+ * the privilege a principal must be granted to execute the annotated method.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface ResourcePrivilege {
+
+ Privilege value() default Privilege.ALL;
+
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/SaslJaasConfigCredential.java b/api/src/main/java/com/github/streamshub/console/api/security/SaslJaasConfigCredential.java
new file mode 100644
index 000000000..be87105b5
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/SaslJaasConfigCredential.java
@@ -0,0 +1,40 @@
+package com.github.streamshub.console.api.security;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
+
+import io.quarkus.security.credential.Credential;
+
+public class SaslJaasConfigCredential implements Credential {
+
+ private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName()
+ + " required"
+ + " oauth.access.token=\"%s\" ;";
+
+ private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;";
+ private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName());
+ private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName());
+
+ public static SaslJaasConfigCredential forOAuthLogin(String accessToken) {
+ return new SaslJaasConfigCredential(SASL_OAUTH_CONFIG_TEMPLATE.formatted(accessToken));
+ }
+
+ public static SaslJaasConfigCredential forPlainLogin(String username, String password) {
+ return new SaslJaasConfigCredential(SASL_PLAIN_CONFIG_TEMPLATE.formatted(username, password));
+ }
+
+ public static SaslJaasConfigCredential forScramLogin(String username, String password) {
+ return new SaslJaasConfigCredential(SASL_SCRAM_CONFIG_TEMPLATE.formatted(username, password));
+ }
+
+ private final String value;
+
+ private SaslJaasConfigCredential(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java b/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java
index 075c81e10..2b068d76b 100644
--- a/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java
+++ b/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java
@@ -51,6 +51,7 @@
import com.github.streamshub.console.api.model.PartitionId;
import com.github.streamshub.console.api.model.PartitionInfo;
import com.github.streamshub.console.api.model.Topic;
+import com.github.streamshub.console.api.security.PermissionService;
import com.github.streamshub.console.api.support.ConsumerGroupValidation;
import com.github.streamshub.console.api.support.FetchFilterPredicate;
import com.github.streamshub.console.api.support.KafkaContext;
@@ -58,6 +59,7 @@
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.UnknownTopicIdPatch;
import com.github.streamshub.console.api.support.ValidationProxy;
+import com.github.streamshub.console.config.security.Privilege;
@ApplicationScoped
public class ConsumerGroupService {
@@ -88,7 +90,10 @@ public class ConsumerGroupService {
KafkaContext kafkaContext;
@Inject
- TopicService topicService;
+ PermissionService permissionService;
+
+ @Inject
+ TopicDescribeService topicService;
@Inject
ValidationProxy validationService;
@@ -111,7 +116,10 @@ public CompletionStage> listConsumerGroups(String topicId, L
.exceptionally(error -> {
throw (RuntimeException) UnknownTopicIdPatch.apply(error, CompletionException::new);
})
- .thenComposeAsync(unused -> listConsumerGroupMembership(List.of(topicId)), asyncExec)
+ .thenComposeAsync(topic -> {
+ permissionService.assertPermitted(Topic.API_TYPE, Privilege.GET, topic.name());
+ return listConsumerGroupMembership(List.of(topicId));
+ }, asyncExec)
.thenComposeAsync(topicGroups -> {
if (topicGroups.containsKey(topicId)) {
return listConsumerGroups(topicGroups.get(topicId), includes, listSupport);
@@ -120,7 +128,9 @@ public CompletionStage> listConsumerGroups(String topicId, L
}, asyncExec);
}
- CompletionStage> listConsumerGroups(List groupIds, List includes, ListRequestContext listSupport) {
+ private CompletionStage> listConsumerGroups(List groupIds,
+ List includes, ListRequestContext listSupport) {
+
Admin adminClient = kafkaContext.admin();
Set states = listSupport.filters()
@@ -142,11 +152,12 @@ CompletionStage> listConsumerGroups(List groupIds, L
.inStates(states))
.valid()
.toCompletionStage()
- .thenApply(groups -> groups.stream()
+ .thenApplyAsync(groups -> groups.stream()
.filter(group -> groupIds.isEmpty() || groupIds.contains(group.groupId()))
- .map(ConsumerGroup::fromKafkaModel)
- .toList())
- .thenApply(list -> list.stream()
+ .filter(permissionService.permitted(ConsumerGroup.API_TYPE, Privilege.LIST, ConsumerGroupListing::groupId))
+ .map(ConsumerGroup::fromKafkaModel),
+ threadContext.currentContextExecutor())
+ .thenApply(groups -> groups
.filter(listSupport)
.map(listSupport::tally)
.filter(listSupport::betweenCursors)
@@ -154,7 +165,9 @@ CompletionStage> listConsumerGroups(List groupIds, L
.dropWhile(listSupport::beforePageBegin)
.takeWhile(listSupport::pageCapacityAvailable)
.toList())
- .thenCompose(groups -> augmentList(adminClient, groups, includes));
+ .thenComposeAsync(
+ groups -> augmentList(adminClient, groups, includes),
+ threadContext.currentContextExecutor());
}
public CompletionStage describeConsumerGroup(String requestGroupId, List includes) {
@@ -162,7 +175,9 @@ public CompletionStage describeConsumerGroup(String requestGroupI
String groupId = preprocessGroupId(requestGroupId);
return assertConsumerGroupExists(adminClient, groupId)
- .thenCompose(nothing -> describeConsumerGroups(adminClient, List.of(groupId), includes))
+ .thenComposeAsync(
+ nothing -> describeConsumerGroups(adminClient, List.of(groupId), includes),
+ threadContext.currentContextExecutor())
.thenApply(groups -> groups.get(groupId))
.thenApply(result -> result.getOrThrow(CompletionException::new));
}
@@ -174,13 +189,18 @@ public CompletionStage