resourceNames = new ArrayList<>(1);
+
+ setResource(resource, resourceNames);
+
+ var requiredPermission = new ConsolePermission(resource.toString(), resourceNames, authz.value());
+
+ boolean allow = securityIdentity.checkPermission(requiredPermission)
+ .subscribeAsCompletionStage()
+ .get();
+
+ if (!allow) {
+ throw new ForbiddenException("Access denied");
+ }
+
+ return context.proceed();
+ }
+
+ /**
+ * Pull the resource type and resource name from the request URI path to be used
+ * to determine authorization. The path is transformed as follows.
+ *
+ *
+ * Given a resource path `/api/kafkas/xyz/topics/abc/records`:
+ *
+ *
+ * - Skip the leading `/api` segment
+ *
- Append segments `kafkas/xyz/topics` to the resource type
+ *
- Use segment `abc` as the resource name
+ *
- Append segment `/records` to the resource type
+ *
+ *
+ *
+ * For a principal to be authorized to access the resource, they must be a member
+ * of a role with access to `kafkas` `xyz` (named or all `kafkas`), and further
+ * with access to resource `topics/records` `abc` (named or all `topics/records`).
+ *
+ * @param resource target resource type builder
+ * @param resourceNames collection to hold the resource name
+ */
+ private void setResource(StringBuilder resource, List resourceNames) {
+ var segments = requestUri.getPathSegments();
+ var segmentCount = segments.size();
+
+ // skip the first segment `/api`
+ String kafkas = segments.get(1).getPath();
+ resource.append(kafkas);
+
+ if (segmentCount > 2) {
+ String kafkaId = segments.get(2).getPath();
+
+ /*
+ * For URLs like `/api/kafkas/123`, the Kafka ID is the resource name
+ * and is configured at the top-level `security` key in the console's
+ * configuration. Otherwise, the Kafka ID is appended to the resource
+ * path and the configuration originates from the Kafka-level `security`
+ * key, scoped to the Kafka cluster under which it is specified.
+ */
+
+ if (segmentCount > 3) {
+ resource.append('/');
+ resource.append(kafkaId);
+ } else {
+ resourceNames.add(kafkaId);
+ }
+ }
+
+ boolean isTopics = false;
+
+ for (int s = 3; s < segmentCount; s++) {
+ String segment = segments.get(s).getPath();
+
+ if (s == 4) {
+ if (isTopics) {
+ /*
+ * Attempt to cross-reference the topic ID to the topic name
+ * which is used to configure topic-level authorization.
+ */
+ segment = topicDescribe.topicNameForId(segment).toCompletableFuture().join()
+ .orElse(segment);
+ }
+
+ resourceNames.add(segment);
+ } else {
+ if (s == 3) {
+ isTopics = Topic.API_TYPE.equals(segment);
+ }
+ resource.append('/');
+ resource.append(segment);
+ }
+ }
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/Authorized.java b/api/src/main/java/com/github/streamshub/console/api/security/Authorized.java
new file mode 100644
index 000000000..4fc482335
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/Authorized.java
@@ -0,0 +1,18 @@
+package com.github.streamshub.console.api.security;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import jakarta.interceptor.InterceptorBinding;
+
+/**
+ * Binding annotation to mark methods that should be intercepted by the
+ * {@link AuthorizationInterceptor}.
+ */
+@InterceptorBinding
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.TYPE, ElementType.METHOD })
+public @interface Authorized {
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java b/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java
new file mode 100644
index 000000000..b23023cb3
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/ConsoleAuthenticationMechanism.java
@@ -0,0 +1,415 @@
+package com.github.streamshub.console.api.security;
+
+import java.io.IOException;
+import java.security.Permission;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.jboss.logging.Logger;
+import org.jose4j.jwt.JwtClaims;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.streamshub.console.api.ClientFactory;
+import com.github.streamshub.console.api.model.Error;
+import com.github.streamshub.console.api.model.ErrorResponse;
+import com.github.streamshub.console.api.support.ErrorCategory;
+import com.github.streamshub.console.api.support.KafkaContext;
+import com.github.streamshub.console.config.ConsoleConfig;
+import com.github.streamshub.console.config.security.Privilege;
+import com.github.streamshub.console.config.security.SecurityConfig;
+import com.github.streamshub.console.config.security.SubjectConfig;
+
+import io.quarkus.oidc.runtime.OidcAuthenticationMechanism;
+import io.quarkus.oidc.runtime.OidcJwtCallerPrincipal;
+import io.quarkus.security.AuthenticationFailedException;
+import io.quarkus.security.credential.Credential;
+import io.quarkus.security.identity.IdentityProviderManager;
+import io.quarkus.security.identity.SecurityIdentity;
+import io.quarkus.security.identity.request.AnonymousAuthenticationRequest;
+import io.quarkus.security.identity.request.AuthenticationRequest;
+import io.quarkus.security.identity.request.TokenAuthenticationRequest;
+import io.quarkus.security.identity.request.UsernamePasswordAuthenticationRequest;
+import io.quarkus.security.runtime.QuarkusPrincipal;
+import io.quarkus.security.runtime.QuarkusSecurityIdentity;
+import io.quarkus.vertx.http.runtime.security.ChallengeData;
+import io.quarkus.vertx.http.runtime.security.HttpAuthenticationMechanism;
+import io.smallrye.mutiny.Uni;
+import io.vertx.core.MultiMap;
+import io.vertx.ext.web.RoutingContext;
+
+@Alternative
+@Priority(1)
+@ApplicationScoped
+public class ConsoleAuthenticationMechanism implements HttpAuthenticationMechanism {
+
+ public static final String OAUTHBEARER = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+ public static final String PLAIN = "PLAIN";
+ public static final String SCRAM_SHA256 = "SCRAM-SHA-256";
+ public static final String SCRAM_SHA512 = "SCRAM-SHA-512";
+
+ private static final String BEARER = "Bearer ";
+ private static final String BASIC = "Basic ";
+
+ private static class Audit extends java.util.logging.Level {
+ private static final long serialVersionUID = 1L;
+
+ Audit() {
+ super("AUDIT", java.util.logging.Level.INFO.intValue() - 1);
+ }
+ }
+
+ private static final java.util.logging.Level AUDIT = new Audit();
+
+ private static final SecurityIdentity ANONYMOUS = QuarkusSecurityIdentity.builder()
+ .setAnonymous(true)
+ .setPrincipal(new QuarkusPrincipal("ANONYMOUS"))
+ .build();
+
+ @Inject
+ Logger log;
+
+ @Inject
+ ObjectMapper mapper;
+
+ @Inject
+ ConsoleConfig consoleConfig;
+
+ @Inject
+ Map contexts;
+
+ @Inject
+ OidcAuthenticationMechanism oidc;
+
+ boolean oidcEnabled() {
+ return Objects.nonNull(consoleConfig.getSecurity().getOidc());
+ }
+
+ @Override
+ public Uni authenticate(RoutingContext context, IdentityProviderManager identityProviderManager) {
+ if (oidcEnabled()) {
+ return oidc.authenticate(context, identityProviderManager)
+ .map(identity -> {
+ if (identity != null) {
+ String clusterId = getClusterId(context);
+ var ctx = clusterId != null ? contexts.get(clusterId) : null;
+ return createIdentity(ctx, identity);
+ }
+ throw new AuthenticationFailedException();
+ });
+ }
+
+ String clusterId = getClusterId(context);
+
+ if (clusterId == null) {
+ return Uni.createFrom().item(createAnonymousIdentity(null));
+ }
+
+ var ctx = contexts.get(clusterId);
+
+ if (ctx == null) {
+ // No Kafka context to establish identity, become anonymous
+ return Uni.createFrom().item(createAnonymousIdentity(null));
+ }
+
+ String saslMechanism = ctx.saslMechanism(Admin.class);
+
+ if (ctx.admin() != null || saslMechanism.isEmpty()) {
+ // Admin credentials already given or there is no SASL authentication needed
+ return Uni.createFrom().item(createAnonymousIdentity(ctx));
+ }
+
+ var identity = createIdentity(ctx, context.request().headers(), saslMechanism);
+
+ if (identity != null) {
+ return Uni.createFrom().item(identity);
+ }
+
+ return Uni.createFrom().failure(new AuthenticationFailedException());
+ }
+
+ @Override
+ public Uni sendChallenge(RoutingContext context) {
+ return getChallenge(context).map(challengeData -> {
+ if (challengeData == null) {
+ return false;
+ }
+
+ var response = context.response();
+ response.setStatusCode(challengeData.status);
+
+ if (challengeData.headerName != null) {
+ response.headers().set(challengeData.headerName, challengeData.headerContent);
+ }
+
+ try {
+ response.send(mapper.writeValueAsString(((PayloadChallengeData) challengeData).payload));
+ } catch (IOException e) {
+ log.warnf(e, "Failed to serialize challenge response body: %s", e.getMessage());
+ }
+
+ return true;
+ });
+ }
+
+ @Override
+ public Uni getChallenge(RoutingContext context) {
+ if (oidcEnabled()) {
+ return oidc.getChallenge(context)
+ .map(data -> {
+ var category = ErrorCategory.get(ErrorCategory.NotAuthenticated.class);
+ Error error = category.createError("Authentication credentials missing or invalid", null, null);
+ var responseBody = new ErrorResponse(List.of(error));
+ return new PayloadChallengeData(data, responseBody);
+ });
+ }
+
+ String clusterId = getClusterId(context);
+
+ if (clusterId == null) {
+ return Uni.createFrom().nullItem();
+ }
+
+ var ctx = contexts.get(clusterId);
+
+ if (ctx == null) {
+ return Uni.createFrom().nullItem();
+ }
+
+ String saslMechanism = ctx.saslMechanism(Admin.class);
+ String scheme = getAuthorizationScheme(saslMechanism);
+ ChallengeData challenge;
+
+ if (scheme != null) {
+ var category = ErrorCategory.get(ErrorCategory.NotAuthenticated.class);
+ Error error = category.createError("Authentication credentials missing or invalid", null, null);
+ var responseBody = new ErrorResponse(List.of(error));
+ challenge = new PayloadChallengeData(401, "WWW-Authenticate", scheme, responseBody);
+ } else {
+ log.warnf("Access not permitted to cluster %s with unknown SASL mechanism '%s'",
+ clusterId, saslMechanism);
+ var category = ErrorCategory.get(ErrorCategory.ResourceNotFound.class);
+ Error error = category.createError(ClientFactory.NO_SUCH_KAFKA_MESSAGE.formatted(clusterId), null, null);
+ var responseBody = new ErrorResponse(List.of(error));
+ challenge = new PayloadChallengeData(404, null, null, responseBody);
+ }
+
+ return Uni.createFrom().item(challenge);
+ }
+
+ @Override
+ public Set> getCredentialTypes() {
+ if (oidcEnabled()) {
+ return oidc.getCredentialTypes();
+ }
+
+ return Set.of(
+ AnonymousAuthenticationRequest.class,
+ TokenAuthenticationRequest.class,
+ UsernamePasswordAuthenticationRequest.class
+ );
+ }
+
+ String getClusterId(RoutingContext context) {
+ Pattern p = Pattern.compile("/api/kafkas/([^/]+)(?:/.*)?");
+ Matcher m = p.matcher(context.normalizedPath());
+ if (m.matches()) {
+ return m.group(1);
+ }
+ return null;
+ }
+
+ String getAuthorizationScheme(String saslMechanism) {
+ switch (saslMechanism) {
+ case OAUTHBEARER:
+ return BEARER.trim();
+ case PLAIN, SCRAM_SHA256, SCRAM_SHA512:
+ return BASIC.trim();
+ default:
+ return null;
+ }
+ }
+
+ SecurityIdentity createAnonymousIdentity(KafkaContext ctx) {
+ return createIdentity(ctx, ANONYMOUS);
+ }
+
+ SecurityIdentity createIdentity(KafkaContext ctx, SecurityIdentity source) {
+ var builder = QuarkusSecurityIdentity.builder(source);
+ addRoleChecker(ctx, builder, source.getPrincipal());
+ return builder.build();
+ }
+
+ SecurityIdentity createIdentity(KafkaContext ctx, MultiMap headers, String saslMechanism) {
+ switch (saslMechanism) {
+ case OAUTHBEARER:
+ return createOAuthIdentity(ctx, headers);
+ case PLAIN:
+ return createBasicIdentity(ctx, headers, SaslJaasConfigCredential::forPlainLogin);
+ case SCRAM_SHA256, SCRAM_SHA512:
+ return createBasicIdentity(ctx, headers, SaslJaasConfigCredential::forScramLogin);
+ default:
+ return null;
+ }
+ }
+
+ SecurityIdentity createOAuthIdentity(KafkaContext ctx, MultiMap headers) {
+ return getAuthorization(headers, BEARER)
+ .map(accessToken -> {
+ var builder = QuarkusSecurityIdentity.builder();
+ builder.addCredential(SaslJaasConfigCredential.forOAuthLogin(accessToken));
+ Principal principal;
+
+ try {
+ var claims = JwtClaims.parse(accessToken);
+ principal = new OidcJwtCallerPrincipal(claims, null);
+ } catch (Exception e) {
+ log.infof("JWT access token could not be parsed: %s", e.getMessage());
+ principal = new QuarkusPrincipal("UNKNOWN");
+ }
+
+ builder.setPrincipal(principal);
+ addRoleChecker(ctx, builder, principal);
+ return builder.build();
+ })
+ .orElse(null);
+ }
+
+ SecurityIdentity createBasicIdentity(KafkaContext ctx, MultiMap headers, BiFunction credentialBuilder) {
+ return getBasicAuthentication(headers)
+ .map(userpass -> {
+ var builder = QuarkusSecurityIdentity.builder();
+ var principal = new QuarkusPrincipal(userpass[0]);
+ builder.addCredential(credentialBuilder.apply(userpass[0], userpass[1]));
+ builder.setPrincipal(principal);
+ addRoleChecker(ctx, builder, principal);
+ return builder.build();
+ })
+ .orElse(null);
+ }
+
+ void addRoleChecker(KafkaContext ctx, QuarkusSecurityIdentity.Builder builder, Principal principal) {
+ var globalSecurity = consoleConfig.getSecurity();
+ Optional clusterSecurity = ctx != null
+ ? Optional.of(ctx.clusterConfig().getSecurity())
+ : Optional.empty();
+
+ if (globalSecurity.getRoles().isEmpty()
+ && clusterSecurity.map(cs -> cs.getRoles().isEmpty()).orElse(true)) {
+ // No roles are defined - allow everything
+ builder.addPermissionChecker(requiredPermission -> {
+ boolean allowed = true;
+ String category = getClass().getPackageName() + (allowed ? ".ALLOW" : ".DENY");
+ java.util.logging.Logger.getLogger(category).log(AUDIT, () -> {
+ return String.format("Principal %s %s access to %s", principal.getName(), allowed ? "allowed" : "denied", requiredPermission);
+ });
+
+ return Uni.createFrom().item(allowed);
+ });
+ }
+
+ Stream globalSubjects = globalSecurity.getSubjects().stream();
+ Stream clusterSubjects = clusterSecurity.map(cs -> cs.getSubjects().stream())
+ .orElseGet(Stream::empty);
+
+ List roleNames = Stream.concat(clusterSubjects, globalSubjects)
+ .filter(sub -> Objects.isNull(sub.getIssuer()) /* or issuer matches `iss` claim */)
+ .filter(sub -> Objects.isNull(sub.getClaim()) /* only without OIDC */)
+ .filter(sub -> sub.getInclude().contains(principal.getName()))
+ .flatMap(sub -> sub.getRoleNames().stream())
+ .distinct()
+ .toList();
+
+ Stream globalPermissions = getPermissions(globalSecurity, roleNames, "");
+ Stream clusterPermissions = clusterSecurity
+ .map(cs -> getPermissions(cs, roleNames, "kafkas/" + ctx.clusterId() + '/'))
+ .orElseGet(Stream::empty);
+
+ List possessedPermissions = Stream.concat(globalPermissions, clusterPermissions).toList();
+
+ builder.addPermissionChecker(requiredPermission -> {
+ boolean allowed = possessedPermissions
+ .stream()
+ .anyMatch(possessed -> possessed.implies(requiredPermission));
+
+ String category = getClass().getPackageName() + (allowed ? ".ALLOW" : ".DENY");
+
+ java.util.logging.Logger.getLogger(category).log(AUDIT, () -> {
+ return String.format("Principal %s %s access to %s", principal.getName(), allowed ? "allowed" : "denied", requiredPermission);
+ });
+
+ return Uni.createFrom().item(allowed);
+ });
+ }
+
+ Stream getPermissions(SecurityConfig security, Collection roleNames, String resourcePrefix) {
+ return security.getRoles()
+ .stream()
+ .filter(role -> roleNames.contains(role.getName()))
+ .flatMap(role -> role.getRules().stream())
+ .flatMap(rule -> {
+ List rulePermissions = new ArrayList<>();
+ Privilege[] actions = rule.getPrivileges().toArray(Privilege[]::new);
+
+ for (var resource : rule.getResources()) {
+ rulePermissions.add(new ConsolePermission(
+ resourcePrefix + resource,
+ rule.getResourceNames(),
+ actions
+ ));
+ }
+
+ return rulePermissions.stream();
+ });
+ }
+
+ Optional getBasicAuthentication(MultiMap headers) {
+ return getAuthorization(headers, BASIC)
+ .map(Base64.getDecoder()::decode)
+ .map(String::new)
+ .filter(authn -> authn.indexOf(':') >= 0)
+ .map(authn -> new String[] {
+ authn.substring(0, authn.indexOf(':')),
+ authn.substring(authn.indexOf(':') + 1)
+ })
+ .filter(userPass -> !userPass[0].isEmpty() && !userPass[1].isEmpty());
+ }
+
+ Optional getAuthorization(MultiMap headers, String scheme) {
+ return Optional.ofNullable(headers.get(HttpHeaders.AUTHORIZATION))
+ .filter(header -> header.regionMatches(true, 0, scheme, 0, scheme.length()))
+ .map(header -> header.substring(scheme.length()));
+ }
+
+ static class PayloadChallengeData extends ChallengeData {
+ public final Object payload;
+
+ public PayloadChallengeData(int status, CharSequence headerName, String headerContent, Object payload) {
+ super(status, headerName, headerContent);
+ this.payload = payload;
+ }
+
+ public PayloadChallengeData(ChallengeData data, Object payload) {
+ super(data.status, data.headerName, data.headerContent);
+ this.payload = payload;
+ }
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java b/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java
new file mode 100644
index 000000000..f870f775c
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/ConsolePermission.java
@@ -0,0 +1,190 @@
+package com.github.streamshub.console.api.security;
+
+import java.security.Permission;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.github.streamshub.console.config.security.Privilege;
+
+import static java.util.function.Predicate.not;
+
+public class ConsolePermission extends Permission {
+
+ private static final long serialVersionUID = 1L;
+ public static final String ACTIONS_SEPARATOR = ",";
+
+ private String resource;
+ private Collection resourceNames;
+ private final Set actions;
+
+ public ConsolePermission(String resource, Privilege... actions) {
+ super("console");
+ this.resource = resource;
+ this.resourceNames = Collections.emptySet();
+ this.actions = checkActions(actions);
+ }
+
+ public ConsolePermission(String resource, Collection resourceNames, Privilege... actions) {
+ super("console");
+ this.resource = resource;
+ this.resourceNames = resourceNames;
+ this.actions = checkActions(actions);
+ }
+
+ private static Set checkActions(Privilege[] actions) {
+ Set validActions = new HashSet<>(actions.length, 1);
+ for (Privilege action : actions) {
+ validActions.add(validateAndTrim(action, "Action"));
+ }
+ return Collections.unmodifiableSet(validActions);
+ }
+
+ private static Privilege validateAndTrim(Privilege action, String paramName) {
+ if (action == null) {
+ throw new IllegalArgumentException(String.format("%s must not be null", paramName));
+ }
+
+ return action;
+ }
+
+ public String resource() {
+ return resource;
+ }
+
+ public ConsolePermission resource(String resource) {
+ this.resource = resource;
+ return this;
+ }
+
+ public ConsolePermission resourceName(String resourceName) {
+ this.resourceNames = Collections.singleton(resourceName);
+ return this;
+ }
+
+ @Override
+ public boolean implies(Permission other) {
+ if (other instanceof ConsolePermission requiredPermission) {
+ if (!getName().equals(requiredPermission.getName())) {
+ return false;
+ }
+
+ return implies(requiredPermission);
+ } else {
+ return false;
+ }
+ }
+
+ boolean implies(ConsolePermission requiredPermission) {
+ if (resourceDenied(requiredPermission)) {
+ return false;
+ }
+
+ // actions are optional, however if at least one action was specified,
+ // an intersection of compared sets must not be empty
+ if (requiredPermission.actions.isEmpty()) {
+ // no required actions
+ return true;
+ }
+
+ if (actions.isEmpty()) {
+ // no possessed actions
+ return false;
+ }
+
+ if (actions.contains(Privilege.ALL)) {
+ // all actions possessed
+ return true;
+ }
+
+ for (Privilege action : requiredPermission.actions) {
+ if (actions.contains(action)) {
+ // has at least one of required actions
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ boolean resourceDenied(ConsolePermission requiredPermission) {
+ /*
+ * The action requires a permission unrelated to this configured
+ * permission.
+ * E.g. consumerGroups versus topics
+ */
+ if (!requiredPermission.resource.equals(resource)) {
+ return true;
+ }
+
+ if (resourceNames.isEmpty()) {
+ /*
+ * Configuration does not specify any resource names, so
+ * access to any is allowed.
+ */
+ return false;
+ }
+
+ if (requiredPermission.resourceNames.isEmpty()) {
+ /*
+ * Configuration specifies named resources, but this request
+ * has no resource name. I.e., the request is for an index/list
+ * end point. The permission is granted here, but individual
+ * resources in the list response may be filtered later.
+ */
+ return false;
+ }
+
+ /*
+ * Deny when any of the required names are not given in configuration.
+ */
+ return requiredPermission.resourceNames.stream().anyMatch(not(this::matchesResourceName));
+ }
+
+ boolean matchesResourceName(String requiredName) {
+ if (resourceNames.contains(requiredName)) {
+ return true;
+ }
+
+ return resourceNames.stream()
+ .filter(n -> n.endsWith("*"))
+ .map(n -> n.substring(0, n.length() - 1))
+ .anyMatch(requiredName::startsWith);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!(obj instanceof ConsolePermission other)) {
+ return false;
+ }
+
+ return getName().equals(other.getName())
+ && resource.equals(other.resource)
+ && actions.equals(other.actions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getName(), resource, actions);
+ }
+
+ @Override
+ public String toString() {
+ return getName() + ":" + resource() + ":" + resourceNames + ":" + actions;
+ }
+
+ /**
+ * @return null if no actions were specified, or actions joined together with the {@link #ACTIONS_SEPARATOR}
+ */
+ @Override
+ public String getActions() {
+ return actions.isEmpty() ? null : actions.stream().map(Enum::name).collect(Collectors.joining(ACTIONS_SEPARATOR));
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/OidcTenantConfigResolver.java b/api/src/main/java/com/github/streamshub/console/api/security/OidcTenantConfigResolver.java
new file mode 100644
index 000000000..8c9542baf
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/OidcTenantConfigResolver.java
@@ -0,0 +1,49 @@
+package com.github.streamshub.console.api.security;
+
+import java.time.Duration;
+import java.util.List;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import com.github.streamshub.console.config.ConsoleConfig;
+
+import io.quarkus.oidc.OidcRequestContext;
+import io.quarkus.oidc.OidcTenantConfig;
+import io.quarkus.oidc.TenantConfigResolver;
+import io.smallrye.mutiny.Uni;
+import io.vertx.ext.web.RoutingContext;
+
+/**
+ * This class is discovered and used by the Quarkus OIDC framework. The purpose
+ * is to create an OIDC tenant from the ConsoleConfig (sourced from YAML) that
+ * is provided to the console by the user directly or via the operator.
+ */
+@ApplicationScoped
+public class OidcTenantConfigResolver implements TenantConfigResolver {
+
+ @Inject
+ ConsoleConfig consoleConfig;
+
+ OidcTenantConfig oidcConfig;
+
+ @PostConstruct
+ void initialize() {
+ oidcConfig = new OidcTenantConfig();
+ var oidc = consoleConfig.getSecurity().getOidc();
+
+ oidcConfig.setTenantId(oidc.getTenantId());
+ oidcConfig.setDiscoveryEnabled(true);
+ oidcConfig.setAuthServerUrl(oidc.getAuthServerUrl());
+ oidcConfig.setRoles(OidcTenantConfig.Roles.fromClaimPath(List.of("groups")));
+ oidcConfig.getToken().setForcedJwkRefreshInterval(Duration.ofSeconds(5));
+ }
+
+ @Override
+ public Uni resolve(RoutingContext routingContext,
+ OidcRequestContext requestContext) {
+ return Uni.createFrom().item(oidcConfig);
+ }
+
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/PermissionService.java b/api/src/main/java/com/github/streamshub/console/api/security/PermissionService.java
new file mode 100644
index 000000000..17e93289d
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/PermissionService.java
@@ -0,0 +1,73 @@
+package com.github.streamshub.console.api.security;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.ForbiddenException;
+
+import com.github.streamshub.console.api.model.ConsumerGroup;
+import com.github.streamshub.console.api.model.KafkaRebalance;
+import com.github.streamshub.console.api.model.KafkaRecord;
+import com.github.streamshub.console.api.model.Topic;
+import com.github.streamshub.console.api.support.KafkaContext;
+import com.github.streamshub.console.config.security.Privilege;
+
+import io.quarkus.security.identity.SecurityIdentity;
+
+@RequestScoped
+public class PermissionService {
+
+ private static final Set KAFKA_SUBRESOURCES = Set.of(
+ ConsumerGroup.API_TYPE,
+ KafkaRebalance.API_TYPE,
+ // Records are a sub-resource of topics
+ Topic.API_TYPE + '/' + KafkaRecord.API_TYPE,
+ Topic.API_TYPE);
+
+ @Inject
+ SecurityIdentity securityIdentity;
+
+ @Inject
+ KafkaContext kafkaContext;
+
+ private String resolveResource(String resource) {
+ if (KAFKA_SUBRESOURCES.contains(resource)) {
+ resource = "kafkas/" + kafkaContext.clusterId() + '/' + resource;
+ }
+ return resource;
+ }
+
+ private boolean checkPermission(ConsolePermission required) {
+ return securityIdentity.checkPermission(required)
+ .subscribeAsCompletionStage()
+ .join();
+ }
+
+ public Predicate permitted(String resource, Privilege privilege, Function name) {
+ ConsolePermission required = new ConsolePermission(resolveResource(resource), privilege);
+
+ return (T item) -> {
+ required.resourceName(name.apply(item));
+ return checkPermission(required);
+ };
+ }
+
+ public boolean permitted(String resource, Privilege privilege, String name) {
+ return checkPermission(new ConsolePermission(resolveResource(resource), List.of(name), privilege));
+ }
+
+ public void assertPermitted(String resource, Privilege privilege, String name) {
+ if (!permitted(resource, privilege, name)) {
+ throw forbidden(resource, privilege, name);
+ }
+ }
+
+ public ForbiddenException forbidden(String resource, Privilege privilege, String name) {
+ return new ForbiddenException("Access denied: resource={%s} privilege:{%s}, resourceName:{%s}"
+ .formatted(resource, privilege, name));
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/ResourcePrivilege.java b/api/src/main/java/com/github/streamshub/console/api/security/ResourcePrivilege.java
new file mode 100644
index 000000000..da592bb32
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/ResourcePrivilege.java
@@ -0,0 +1,20 @@
+package com.github.streamshub.console.api.security;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import com.github.streamshub.console.config.security.Privilege;
+
+/**
+ * Method annotation used by the {@link AuthorizationInterceptor} to declare
+ * the privilege a principal must be granted to execute the annotated method.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface ResourcePrivilege {
+
+ Privilege value() default Privilege.ALL;
+
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/security/SaslJaasConfigCredential.java b/api/src/main/java/com/github/streamshub/console/api/security/SaslJaasConfigCredential.java
new file mode 100644
index 000000000..be87105b5
--- /dev/null
+++ b/api/src/main/java/com/github/streamshub/console/api/security/SaslJaasConfigCredential.java
@@ -0,0 +1,40 @@
+package com.github.streamshub.console.api.security;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.security.scram.ScramLoginModule;
+
+import io.quarkus.security.credential.Credential;
+
+public class SaslJaasConfigCredential implements Credential {
+
+ private static final String SASL_OAUTH_CONFIG_TEMPLATE = OAuthBearerLoginModule.class.getName()
+ + " required"
+ + " oauth.access.token=\"%s\" ;";
+
+ private static final String BASIC_TEMPLATE = "%s required username=\"%%s\" password=\"%%s\" ;";
+ private static final String SASL_PLAIN_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(PlainLoginModule.class.getName());
+ private static final String SASL_SCRAM_CONFIG_TEMPLATE = BASIC_TEMPLATE.formatted(ScramLoginModule.class.getName());
+
+ public static SaslJaasConfigCredential forOAuthLogin(String accessToken) {
+ return new SaslJaasConfigCredential(SASL_OAUTH_CONFIG_TEMPLATE.formatted(accessToken));
+ }
+
+ public static SaslJaasConfigCredential forPlainLogin(String username, String password) {
+ return new SaslJaasConfigCredential(SASL_PLAIN_CONFIG_TEMPLATE.formatted(username, password));
+ }
+
+ public static SaslJaasConfigCredential forScramLogin(String username, String password) {
+ return new SaslJaasConfigCredential(SASL_SCRAM_CONFIG_TEMPLATE.formatted(username, password));
+ }
+
+ private final String value;
+
+ private SaslJaasConfigCredential(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
+ }
+}
diff --git a/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java b/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java
index 075c81e10..63b8f51ec 100644
--- a/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java
+++ b/api/src/main/java/com/github/streamshub/console/api/service/ConsumerGroupService.java
@@ -51,6 +51,7 @@
import com.github.streamshub.console.api.model.PartitionId;
import com.github.streamshub.console.api.model.PartitionInfo;
import com.github.streamshub.console.api.model.Topic;
+import com.github.streamshub.console.api.security.PermissionService;
import com.github.streamshub.console.api.support.ConsumerGroupValidation;
import com.github.streamshub.console.api.support.FetchFilterPredicate;
import com.github.streamshub.console.api.support.KafkaContext;
@@ -58,6 +59,7 @@
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.UnknownTopicIdPatch;
import com.github.streamshub.console.api.support.ValidationProxy;
+import com.github.streamshub.console.config.security.Privilege;
@ApplicationScoped
public class ConsumerGroupService {
@@ -88,7 +90,10 @@ public class ConsumerGroupService {
KafkaContext kafkaContext;
@Inject
- TopicService topicService;
+ PermissionService permissionService;
+
+ @Inject
+ TopicDescribeService topicService;
@Inject
ValidationProxy validationService;
@@ -111,7 +116,10 @@ public CompletionStage> listConsumerGroups(String topicId, L
.exceptionally(error -> {
throw (RuntimeException) UnknownTopicIdPatch.apply(error, CompletionException::new);
})
- .thenComposeAsync(unused -> listConsumerGroupMembership(List.of(topicId)), asyncExec)
+ .thenComposeAsync(topic -> {
+ permissionService.assertPermitted(Topic.API_TYPE, Privilege.GET, topic.name());
+ return listConsumerGroupMembership(List.of(topicId));
+ }, asyncExec)
.thenComposeAsync(topicGroups -> {
if (topicGroups.containsKey(topicId)) {
return listConsumerGroups(topicGroups.get(topicId), includes, listSupport);
@@ -120,7 +128,9 @@ public CompletionStage> listConsumerGroups(String topicId, L
}, asyncExec);
}
- CompletionStage> listConsumerGroups(List groupIds, List includes, ListRequestContext listSupport) {
+ private CompletionStage> listConsumerGroups(List groupIds,
+ List includes, ListRequestContext listSupport) {
+
Admin adminClient = kafkaContext.admin();
Set states = listSupport.filters()
@@ -142,11 +152,12 @@ CompletionStage> listConsumerGroups(List groupIds, L
.inStates(states))
.valid()
.toCompletionStage()
- .thenApply(groups -> groups.stream()
+ .thenApplyAsync(groups -> groups.stream()
.filter(group -> groupIds.isEmpty() || groupIds.contains(group.groupId()))
- .map(ConsumerGroup::fromKafkaModel)
- .toList())
- .thenApply(list -> list.stream()
+ .filter(permissionService.permitted(ConsumerGroup.API_TYPE, Privilege.LIST, ConsumerGroupListing::groupId))
+ .map(ConsumerGroup::fromKafkaModel),
+ threadContext.currentContextExecutor())
+ .thenApply(groups -> groups
.filter(listSupport)
.map(listSupport::tally)
.filter(listSupport::betweenCursors)
@@ -154,7 +165,9 @@ CompletionStage> listConsumerGroups(List groupIds, L
.dropWhile(listSupport::beforePageBegin)
.takeWhile(listSupport::pageCapacityAvailable)
.toList())
- .thenCompose(groups -> augmentList(adminClient, groups, includes));
+ .thenComposeAsync(
+ groups -> augmentList(adminClient, groups, includes),
+ threadContext.currentContextExecutor());
}
public CompletionStage describeConsumerGroup(String requestGroupId, List includes) {
@@ -162,7 +175,9 @@ public CompletionStage describeConsumerGroup(String requestGroupI
String groupId = preprocessGroupId(requestGroupId);
return assertConsumerGroupExists(adminClient, groupId)
- .thenCompose(nothing -> describeConsumerGroups(adminClient, List.of(groupId), includes))
+ .thenComposeAsync(
+ nothing -> describeConsumerGroups(adminClient, List.of(groupId), includes),
+ threadContext.currentContextExecutor())
.thenApply(groups -> groups.get(groupId))
.thenApply(result -> result.getOrThrow(CompletionException::new));
}
@@ -174,13 +189,18 @@ public CompletionStage