From 07d39eaf7f59a6b3ba8d45fc4f9ccfc966b56c88 Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Thu, 6 Jul 2023 14:16:42 +0800 Subject: [PATCH] [branch-2.10.4][improve] Pass group ID to authorizer when using OAuth (#1926) (#1945) ### Motivation Currently, the KoP only supports checking the topic permission when doing the consume authorization, but Kafka support checking the topic and group ID permission. Before introducing this change, let's understand why KoP can't check group ID permission. When Kafka does the consume authorization check, it will first check the group ID permission in `handleJoinGroupRequest` method, then it will check the topic permission in the `handleFetchRequest` method. However, Pulsar is using another way to check consume permission. See the `org.apache.pulsar.broker.authorization.AuthorizationService#canConsumeAsync` method, it requires passing the topic name and subscription to check both permissions in the same place, and the topic name is required param. ``` public CompletableFuture canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData,String subscription) ``` If we follow the Kafka way to check the consumer permission, we can't get the topic name when joining a group since the join group request does not contain the topic name. So we have to authorize permission in the fetch request. However, to authorization consume permission in the fetch request, we can only get the topic name in this request. In this case, we can't authorize the group ID. ### Modifications Since we can't get group ID when handling fetch requests, we need to find a way to pass through group ID when doing the authentication. In OAuth, we have a `credentials_file.json` file that needs to be config, for example: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant" } ``` Here we can add a new parameter into the config: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant", "group_id": "my-group-id" } ``` Then we can add these parameters to `SaslExtensions`, and send it to the broker. (cherry picked from commit e108e44621decef8221b44f37322c59f8b674be5) --- .../handlers/kop/KafkaRequestHandler.java | 122 +++++++---- .../kop/KafkaServiceConfiguration.java | 7 + .../handlers/kop/security/KafkaPrincipal.java | 4 +- .../kop/security/PlainSaslServer.java | 4 + .../kop/security/SaslAuthenticator.java | 3 + .../kop/security/auth/Authorizer.java | 1 + .../kop/security/auth/ResourceType.java | 4 + .../security/auth/SimpleAclAuthorizer.java | 32 ++- .../oauth/KopOAuthBearerSaslServer.java | 83 +++++-- .../oauth/OauthValidatorCallbackHandler.java | 11 +- .../kop/security/oauth/ClientConfig.java | 20 +- .../security/oauth/ClientCredentialsFlow.java | 28 +-- .../kop/security/oauth/ClientInfo.java | 45 ++++ .../oauth/OauthLoginCallbackHandler.java | 30 +++ .../kop/security/oauth/ClientConfigTest.java | 25 ++- .../oauth/ClientCredentialsFlowTest.java | 11 +- .../resources/private_key_with_group_id.json | 6 + .../pulsar/handlers/kop/HydraOAuthUtils.java | 27 ++- .../kop/SaslOAuthKopHandlersTest.java | 2 +- .../SaslOAuthKopHandlersWithGroupIdTest.java | 203 ++++++++++++++++++ .../auth/SimpleAclAuthorizerTest.java | 52 ++--- 21 files changed, 586 insertions(+), 134 deletions(-) create mode 100644 oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientInfo.java create mode 100644 oauth-client/src/test/resources/private_key_with_group_id.json create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslOAuthKopHandlersWithGroupIdTest.java diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 291a8dd2cc..c90424eafd 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -313,7 +313,7 @@ public KafkaRequestHandler(PulsarService pulsarService, : null; final boolean authorizationEnabled = pulsarService.getBrokerService().isAuthorizationEnabled(); this.authorizer = authorizationEnabled && authenticationEnabled - ? new SimpleAclAuthorizer(pulsarService) + ? new SimpleAclAuthorizer(pulsarService, kafkaConfig) : null; this.adminManager = adminManager; this.producePurgatory = producePurgatory; @@ -933,59 +933,85 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato checkArgument(findCoordinator.getRequest() instanceof FindCoordinatorRequest); FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest(); - String pulsarTopicName; - int partition; - CompletableFuture storeGroupIdFuture; if (request.coordinatorType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION) { TransactionCoordinator transactionCoordinator = getTransactionCoordinator(); - partition = transactionCoordinator.partitionFor(request.coordinatorKey()); - pulsarTopicName = transactionCoordinator.getTopicPartitionName(partition); - storeGroupIdFuture = CompletableFuture.completedFuture(null); - } else if (request.coordinatorType() == FindCoordinatorRequest.CoordinatorType.GROUP) { - partition = getGroupCoordinator().partitionFor(request.coordinatorKey()); - pulsarTopicName = getGroupCoordinator().getTopicPartitionName(partition); - if (kafkaConfig.isKopEnableGroupLevelConsumerMetrics()) { - String groupId = request.coordinatorKey(); - String groupIdPath = GroupIdUtils.groupIdPathFormat(findCoordinator.getClientHost(), - findCoordinator.getHeader().clientId()); - currentConnectedClientId.add(findCoordinator.getHeader().clientId()); + int partition = transactionCoordinator.partitionFor(request.coordinatorKey()); + String pulsarTopicName = transactionCoordinator.getTopicPartitionName(partition); + findBroker(TopicName.get(pulsarTopicName)) + .whenComplete((node, throwable) -> { + if (node.error() != Errors.NONE || throwable != null) { + log.error("[{}] Request {}: Error while find coordinator.", + ctx.channel(), findCoordinator.getHeader(), throwable); + + resultFuture.complete(KafkaResponseUtils + .newFindCoordinator(Errors.LEADER_NOT_AVAILABLE)); + return; + } - // Store group name to metadata store for current client, use to collect consumer metrics. - storeGroupIdFuture = storeGroupId(groupId, groupIdPath); - } else { - storeGroupIdFuture = CompletableFuture.completedFuture(null); - } + if (log.isDebugEnabled()) { + log.debug("[{}] Found node {} as coordinator for key {} partition {}.", + ctx.channel(), node.leader(), request.coordinatorKey(), partition); + } + resultFuture.complete(KafkaResponseUtils.newFindCoordinator(node.leader())); + }); + } else if (request.coordinatorType() == FindCoordinatorRequest.CoordinatorType.GROUP) { + authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.GROUP, request.coordinatorKey())) + .whenComplete((isAuthorized, ex) -> { + if (ex != null) { + log.error("Describe group authorize failed, group - {}. {}", + request.coordinatorKey(), ex.getMessage()); + resultFuture.complete(KafkaResponseUtils + .newFindCoordinator(Errors.GROUP_AUTHORIZATION_FAILED)); + return; + } + if (!isAuthorized) { + resultFuture.complete( + KafkaResponseUtils + .newFindCoordinator(Errors.GROUP_AUTHORIZATION_FAILED)); + return; + } + CompletableFuture storeGroupIdFuture; + int partition = getGroupCoordinator().partitionFor(request.coordinatorKey()); + String pulsarTopicName = getGroupCoordinator().getTopicPartitionName(partition); + if (kafkaConfig.isKopEnableGroupLevelConsumerMetrics()) { + String groupId = request.coordinatorKey(); + String groupIdPath = GroupIdUtils.groupIdPathFormat(findCoordinator.getClientHost(), + findCoordinator.getHeader().clientId()); + currentConnectedClientId.add(findCoordinator.getHeader().clientId()); + + // Store group name to metadata store for current client, use to collect consumer metrics. + storeGroupIdFuture = storeGroupId(groupId, groupIdPath); + } else { + storeGroupIdFuture = CompletableFuture.completedFuture(null); + } + // Store group name to metadata store for current client, use to collect consumer metrics. + storeGroupIdFuture.whenComplete((__, e) -> { + if (e != null) { + log.warn("Store groupId failed, the groupId might already stored.", e); + } + findBroker(TopicName.get(pulsarTopicName)) + .whenComplete((node, throwable) -> { + if (node.error() != Errors.NONE || throwable != null) { + log.error("[{}] Request {}: Error while find coordinator.", + ctx.channel(), findCoordinator.getHeader(), throwable); + + resultFuture.complete(KafkaResponseUtils + .newFindCoordinator(Errors.LEADER_NOT_AVAILABLE)); + return; + } + if (log.isDebugEnabled()) { + log.debug("[{}] Found node {} as coordinator for key {} partition {}.", + ctx.channel(), node.leader(), request.coordinatorKey(), partition); + } + resultFuture.complete(KafkaResponseUtils.newFindCoordinator(node.leader())); + }); + }); + }); } else { throw new NotImplementedException("FindCoordinatorRequest not support unknown type " - + request.coordinatorType()); + + request.coordinatorType()); } - - // Store group name to metadata store for current client, use to collect consumer metrics. - storeGroupIdFuture - .whenComplete((__, ex) -> { - if (ex != null) { - log.warn("Store groupId failed, the groupId might already stored.", ex); - } - findBroker(TopicName.get(pulsarTopicName)) - .whenComplete((node, throwable) -> { - if (node.error() != Errors.NONE || throwable != null) { - log.error("[{}] Request {}: Error while find coordinator.", - ctx.channel(), findCoordinator.getHeader(), throwable); - - resultFuture.complete(KafkaResponseUtils - .newFindCoordinator(Errors.LEADER_NOT_AVAILABLE)); - return; - } - - if (log.isDebugEnabled()) { - log.debug("[{}] Found node {} as coordinator for key {} partition {}.", - ctx.channel(), node.leader(), request.coordinatorKey(), partition); - } - - resultFuture.complete(KafkaResponseUtils.newFindCoordinator(node.leader())); - }); - }); } @VisibleForTesting @@ -2625,6 +2651,8 @@ protected CompletableFuture authorize(AclOperation operation, Resource isAuthorizedFuture = authorizer.canLookupAsync(session.getPrincipal(), resource); } else if (resource.getResourceType() == ResourceType.NAMESPACE) { isAuthorizedFuture = authorizer.canGetTopicList(session.getPrincipal(), resource); + } else if (resource.getResourceType() == ResourceType.GROUP) { + isAuthorizedFuture = authorizer.canDescribeConsumerGroup(session.getPrincipal(), resource); } break; case CREATE: diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 716aea996f..166f1694ec 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -113,6 +113,13 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private boolean kafkaEnableMultiTenantMetadata = true; + @FieldContext( + category = CATEGORY_KOP, + required = true, + doc = "Use to enable/disable Kafka authorization force groupId check." + ) + private boolean kafkaEnableAuthorizationForceGroupIdCheck = false; + @FieldContext( category = CATEGORY_KOP, required = true, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/KafkaPrincipal.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/KafkaPrincipal.java index 379f7b1b56..dcc9250362 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/KafkaPrincipal.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/KafkaPrincipal.java @@ -44,5 +44,7 @@ public class KafkaPrincipal implements Principal { */ private final String tenantSpec; + private final String groupId; + private final AuthenticationDataSource authenticationData; -} \ No newline at end of file +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/PlainSaslServer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/PlainSaslServer.java index 316aaf9faf..899b6cb65d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/PlainSaslServer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/PlainSaslServer.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.kop.security; import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.AUTH_DATA_SOURCE_PROP; +import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.GROUP_ID_PROP; import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.USER_NAME_PROP; import io.streamnative.pulsar.handlers.kop.SaslAuth; @@ -150,6 +151,9 @@ public Object getNegotiatedProperty(String propName) { if (USER_NAME_PROP.equals(propName)) { return username; } + if (GROUP_ID_PROP.equals(propName)) { + return ""; + } if (AUTH_DATA_SOURCE_PROP.equals(propName)) { return authDataSource; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java index 4a2992454e..461118da3b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/SaslAuthenticator.java @@ -70,6 +70,7 @@ public class SaslAuthenticator { public static final String USER_NAME_PROP = "username"; + public static final String GROUP_ID_PROP = "groupId"; public static final String AUTH_DATA_SOURCE_PROP = "authDataSource"; public static final String AUTHENTICATION_SERVER_OBJ = "authenticationServerObj"; @@ -439,6 +440,7 @@ private void handleSaslToken(ChannelHandlerContext ctx, newSession = new Session( new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(), safeGetProperty(saslServer, USER_NAME_PROP), + safeGetProperty(saslServer, GROUP_ID_PROP), safeGetProperty(saslServer, AUTH_DATA_SOURCE_PROP)), "old-clientId"); if (!tenantAccessValidationFunction.apply(newSession)) { @@ -499,6 +501,7 @@ private void handleSaslToken(ChannelHandlerContext ctx, this.session = new Session( new KafkaPrincipal(KafkaPrincipal.USER_TYPE, pulsarRole, safeGetProperty(saslServer, USER_NAME_PROP), + safeGetProperty(saslServer, GROUP_ID_PROP), safeGetProperty(saslServer, AUTH_DATA_SOURCE_PROP)), header.clientId()); if (log.isDebugEnabled()) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/Authorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/Authorizer.java index d375fcd580..0e7b131bff 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/Authorizer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/Authorizer.java @@ -114,4 +114,5 @@ public interface Authorizer { */ CompletableFuture canConsumeAsync(KafkaPrincipal principal, Resource resource); + CompletableFuture canDescribeConsumerGroup(KafkaPrincipal principal, Resource resource); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/ResourceType.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/ResourceType.java index f8f0809c22..9ae09d11a3 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/ResourceType.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/ResourceType.java @@ -46,6 +46,10 @@ public enum ResourceType { */ TENANT((byte) 3), + /** + * A consumer group. + */ + GROUP((byte) 4), ; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java index f09ff281af..f10d38b410 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java @@ -13,9 +13,12 @@ */ package io.streamnative.pulsar.handlers.kop.security.auth; +import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.common.naming.NamespaceName; @@ -35,9 +38,12 @@ public class SimpleAclAuthorizer implements Authorizer { private final AuthorizationService authorizationService; - public SimpleAclAuthorizer(PulsarService pulsarService) { + private final boolean forceCheckGroupId; + + public SimpleAclAuthorizer(PulsarService pulsarService, KafkaServiceConfiguration config) { this.pulsarService = pulsarService; this.authorizationService = pulsarService.getBrokerService().getAuthorizationService(); + this.forceCheckGroupId = config.isKafkaEnableAuthorizationForceGroupIdCheck(); } protected PulsarService getPulsarService() { @@ -152,8 +158,27 @@ public CompletableFuture canProduceAsync(KafkaPrincipal principal, Reso public CompletableFuture canConsumeAsync(KafkaPrincipal principal, Resource resource) { checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); + if (forceCheckGroupId && StringUtils.isBlank(principal.getGroupId())) { + return CompletableFuture.completedFuture(false); + } return authorizationService.canConsumeAsync( - topicName, principal.getName(), principal.getAuthenticationData(), ""); + topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId()); + } + + @Override + public CompletableFuture canDescribeConsumerGroup(KafkaPrincipal principal, Resource resource) { + if (!forceCheckGroupId) { + return CompletableFuture.completedFuture(true); + } + if (StringUtils.isBlank(principal.getGroupId())) { + return CompletableFuture.completedFuture(false); + } + boolean isSameGroup = Objects.equals(principal.getGroupId(), resource.getName()); + if (log.isDebugEnabled()) { + log.debug("Principal [{}] for resource [{}] isSameGroup [{}]", principal, resource, isSameGroup); + } + return CompletableFuture.completedFuture(isSameGroup); + } private void checkResourceType(Resource actual, ResourceType expected) { @@ -164,4 +189,5 @@ private void checkResourceType(Resource actual, ResourceType expected) { } } -} \ No newline at end of file +} + diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerSaslServer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerSaslServer.java index 3bf7b31c55..a1af174d0c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerSaslServer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerSaslServer.java @@ -14,11 +14,13 @@ package io.streamnative.pulsar.handlers.kop.security.oauth; import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.AUTH_DATA_SOURCE_PROP; +import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.GROUP_ID_PROP; import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.USER_NAME_PROP; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Map; import java.util.Objects; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -28,9 +30,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; +import org.apache.kafka.common.utils.Utils; +/** + * Migrate from {@link org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer}. + */ @Slf4j public class KopOAuthBearerSaslServer implements SaslServer { @@ -46,6 +55,7 @@ public class KopOAuthBearerSaslServer implements SaslServer { private boolean complete; private KopOAuthBearerToken tokenForNegotiatedProperty = null; private String errorMessage = null; + private SaslExtensions extensions; public KopOAuthBearerSaslServer(CallbackHandler callbackHandler, String defaultKafkaMetadataTenant) { if (!(Objects.requireNonNull(callbackHandler) instanceof AuthenticateCallbackHandler)) { @@ -72,7 +82,7 @@ public KopOAuthBearerSaslServer(CallbackHandler callbackHandler, String defaultK @Override public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException { if (response.length == 1 && response[0] == BYTE_CONTROL_A && errorMessage != null) { - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Received %x01 response from client after it received our error"); } throw new SaslAuthenticationException(errorMessage); @@ -85,7 +95,7 @@ public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthen log.debug(e.getMessage()); throw e; } - return process(clientResponse.tokenValue(), clientResponse.authorizationId()); + return process(clientResponse.tokenValue(), clientResponse.authorizationId(), clientResponse.extensions()); } @Override @@ -107,6 +117,9 @@ public Object getNegotiatedProperty(String propName) { throw new IllegalStateException("Authentication exchange has not completed"); } + if (NEGOTIATED_PROPERTY_KEY_TOKEN.equals(propName)) { + return tokenForNegotiatedProperty; + } if (AUTH_DATA_SOURCE_PROP.equals(propName)) { return tokenForNegotiatedProperty.authDataSource(); } @@ -114,9 +127,20 @@ public Object getNegotiatedProperty(String propName) { if (tokenForNegotiatedProperty.tenant() != null) { return tokenForNegotiatedProperty.tenant(); } + String tenant = extensions.map().get(propName); + if (tenant != null) { + return tenant; + } return defaultKafkaMetadataTenant; } - return NEGOTIATED_PROPERTY_KEY_TOKEN.equals(propName) ? tokenForNegotiatedProperty : null; + if (GROUP_ID_PROP.equals(propName)) { + String groupId = extensions.map().get(propName); + if (groupId != null) { + return groupId; + } + return ""; + } + return extensions.map().get(propName); } @Override @@ -125,7 +149,7 @@ public boolean isComplete() { } @Override - public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException { + public byte[] unwrap(byte[] incoming, int offset, int len) { if (!complete) { throw new IllegalStateException("Authentication exchange has not completed"); } @@ -133,7 +157,7 @@ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException } @Override - public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException { + public byte[] wrap(byte[] outgoing, int offset, int len) { if (!complete) { throw new IllegalStateException("Authentication exchange has not completed"); } @@ -141,21 +165,19 @@ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException { } @Override - public void dispose() throws SaslException { + public void dispose() { complete = false; tokenForNegotiatedProperty = null; + extensions = null; } - private byte[] process(String tokenValue, String authorizationId) throws SaslException { + private byte[] process(String tokenValue, String authorizationId, SaslExtensions extensions) + throws SaslException { KopOAuthBearerValidatorCallback callback = new KopOAuthBearerValidatorCallback(tokenValue); try { - callbackHandler.handle(new Callback[]{callback}); + callbackHandler.handle(new Callback[] {callback}); } catch (IOException | UnsupportedCallbackException e) { - String msg = String.format("%s: %s", INTERNAL_ERROR_ON_SERVER, e.getMessage()); - if (log.isDebugEnabled()) { - log.debug(msg, e); - } - throw new SaslException(msg); + handleCallbackError(e); } KopOAuthBearerToken token = callback.token(); if (token == null) { @@ -176,8 +198,12 @@ private byte[] process(String tokenValue, String authorizationId) throws SaslExc + "that is different from the token's principal name (%s)", authorizationId, token.principalName())); } + Map validExtensions = processExtensions(token, extensions); tokenForNegotiatedProperty = token; + log.info("Successfully authenticate User={}, validExtensions={}", + token.principalName(), validExtensions); + this.extensions = new SaslExtensions(validExtensions); complete = true; if (log.isDebugEnabled()) { log.debug("Successfully authenticate User={}", token.principalName()); @@ -185,6 +211,30 @@ private byte[] process(String tokenValue, String authorizationId) throws SaslExc return new byte[0]; } + private Map processExtensions(OAuthBearerToken token, SaslExtensions extensions) + throws SaslException { + OAuthBearerExtensionsValidatorCallback extensionsCallback = + new OAuthBearerExtensionsValidatorCallback(token, extensions); + try { + callbackHandler.handle(new Callback[] {extensionsCallback}); + } catch (UnsupportedCallbackException e) { + // backwards compatibility - no extensions will be added + } catch (IOException e) { + handleCallbackError(e); + } + if (!extensionsCallback.invalidExtensions().isEmpty()) { + String errorMessage = String.format("Authentication failed: %d extensions are invalid! They are: %s", + extensionsCallback.invalidExtensions().size(), + Utils.mkString(extensionsCallback.invalidExtensions(), "", "", ": ", "; ")); + if (log.isDebugEnabled()) { + log.debug(errorMessage); + } + throw new SaslAuthenticationException(errorMessage); + } + + return extensionsCallback.validatedExtensions(); + } + private static String jsonErrorResponse(String errorStatus, String errorScope, String errorOpenIDConfiguration) { String jsonErrorResponse = String.format("{\"status\":\"%s\"", errorStatus); if (errorScope != null) { @@ -198,4 +248,11 @@ private static String jsonErrorResponse(String errorStatus, String errorScope, S return jsonErrorResponse; } + private void handleCallbackError(Exception e) throws SaslException { + String msg = String.format("%s: %s", INTERNAL_ERROR_ON_SERVER, e.getMessage()); + if (log.isDebugEnabled()) { + log.debug(msg, e); + } + throw new SaslException(msg); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandler.java index 6162405de4..673c3951f3 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandler.java @@ -28,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException; import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerValidationResult; @@ -43,8 +44,6 @@ @Slf4j public class OauthValidatorCallbackHandler implements AuthenticateCallbackHandler { - private static final String DELIMITER = "__with_tenant_"; - private ServerConfig config = null; private AuthenticationService authenticationService; @@ -98,12 +97,20 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback validatorCallback.error(failureScope != null ? "insufficient_scope" : "invalid_token", failureScope, failureReason.failureOpenIdConfig()); } + } else if (callback instanceof OAuthBearerExtensionsValidatorCallback) { + handleExtensionsValidatorCallback((OAuthBearerExtensionsValidatorCallback) callback); } else { throw new UnsupportedCallbackException(callback); } } } + private void handleExtensionsValidatorCallback( + OAuthBearerExtensionsValidatorCallback extensionsValidatorCallback) { + extensionsValidatorCallback.inputExtensions().map() + .forEach((extensionName, v) -> extensionsValidatorCallback.valid(extensionName)); + } + @VisibleForTesting protected void handleCallback(KopOAuthBearerValidatorCallback callback) { if (callback.tokenValue() == null) { diff --git a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java index 1f51442d29..d139c16364 100644 --- a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java +++ b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java @@ -13,8 +13,13 @@ */ package io.streamnative.pulsar.handlers.kop.security.oauth; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; +import java.net.URLConnection; import java.util.Map; import lombok.Getter; @@ -26,6 +31,10 @@ @Getter public class ClientConfig { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final ObjectReader CLIENT_INFO_READER = OBJECT_MAPPER.readerFor(ClientInfo.class); + public static final String OAUTH_ISSUER_URL = "oauth.issuer.url"; public static final String OAUTH_CREDENTIALS_URL = "oauth.credentials.url"; public static final String OAUTH_AUDIENCE = "oauth.audience"; @@ -35,6 +44,7 @@ public class ClientConfig { private final URL credentialsUrl; private final String audience; private final String scope; + private final ClientInfo clientInfo; public ClientConfig(Map configs) { final String issuerUrlString = configs.get(OAUTH_ISSUER_URL); @@ -58,7 +68,15 @@ public ClientConfig(Map configs) { throw new IllegalArgumentException(String.format( "invalid %s \"%s\": %s", OAUTH_CREDENTIALS_URL, credentialsUrlString, e.getMessage())); } - + try { + final URLConnection connection = getCredentialsUrl().openConnection(); + try (InputStream inputStream = connection.getInputStream()) { + this.clientInfo = CLIENT_INFO_READER.readValue(inputStream); + } + } catch (IOException e) { + throw new IllegalArgumentException(String.format( + "failed to load client credentials from %s: %s", credentialsUrlString, e.getMessage())); + } this.audience = configs.getOrDefault(OAUTH_AUDIENCE, null); this.scope = configs.getOrDefault(OAUTH_SCOPE, null); } diff --git a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlow.java b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlow.java index 889df6c754..51ca56fb4c 100644 --- a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlow.java +++ b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlow.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.google.common.annotations.VisibleForTesting; @@ -45,7 +46,6 @@ public class ClientCredentialsFlow implements Closeable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectReader METADATA_READER = OBJECT_MAPPER.readerFor(Metadata.class); - private static final ObjectReader CLIENT_INFO_READER = OBJECT_MAPPER.readerFor(ClientInfo.class); private static final ObjectReader TOKEN_RESULT_READER = OBJECT_MAPPER.readerFor(OAuthBearerTokenImpl.class); private static final ObjectReader TOKEN_ERROR_READER = OBJECT_MAPPER.readerFor(TokenError.class); @@ -71,7 +71,7 @@ protected ClientCredentialsFlow(ClientConfig clientConfig, AsyncHttpClient httpC public OAuthBearerTokenImpl authenticate() throws IOException { final String tokenEndPoint = findAuthorizationServer().getTokenEndPoint(); - final ClientInfo clientInfo = loadPrivateKey(); + final ClientInfo clientInfo = clientConfig.getClientInfo(); try { final String body = buildClientCredentialsBody(clientInfo); final Response response = httpClient.preparePost(tokenEndPoint) @@ -97,7 +97,8 @@ public OAuthBearerTokenImpl authenticate() throws IOException { throw new IOException("Failed to perform HTTP request: " + response.getStatusCode() + " " + response.getStatusText()); } - } catch (UnsupportedEncodingException | InterruptedException | ExecutionException e) { + } catch (UnsupportedEncodingException | InterruptedException + | ExecutionException | JsonProcessingException e) { throw new IOException(e); } } @@ -122,13 +123,6 @@ Metadata findAuthorizationServer() throws IOException { } } - @VisibleForTesting - ClientInfo loadPrivateKey() throws IOException { - final URLConnection connection = clientConfig.getCredentialsUrl().openConnection(); - try (InputStream inputStream = connection.getInputStream()) { - return CLIENT_INFO_READER.readValue(inputStream); - } - } private static String encode(String s) throws UnsupportedEncodingException { return URLEncoder.encode(s, StandardCharsets.UTF_8.name()); @@ -156,20 +150,6 @@ public static class Metadata { private String tokenEndPoint; } - @Getter - @JsonIgnoreProperties(ignoreUnknown = true) - public static class ClientInfo { - - @JsonProperty("client_id") - private String id; - - @JsonProperty("client_secret") - private String secret; - - @JsonProperty("tenant") - private String tenant; - } - @Getter @JsonIgnoreProperties(ignoreUnknown = true) public static class TokenError { diff --git a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientInfo.java b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientInfo.java new file mode 100644 index 0000000000..5ffad5f675 --- /dev/null +++ b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientInfo.java @@ -0,0 +1,45 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.security.oauth; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; + +@Getter +@ToString +@EqualsAndHashCode +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ClientInfo { + + @JsonProperty("client_id") + private String id; + + @JsonProperty("client_secret") + private String secret; + + @JsonProperty("tenant") + private String tenant; + + @JsonProperty("group_id") + private String groupId; +} diff --git a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthLoginCallbackHandler.java b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthLoginCallbackHandler.java index b16763815d..2c1d74696a 100644 --- a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthLoginCallbackHandler.java +++ b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthLoginCallbackHandler.java @@ -14,16 +14,22 @@ package io.streamnative.pulsar.handlers.kop.security.oauth; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import javax.security.auth.callback.Callback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.SaslException; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; /** * OAuth 2.0 login callback handler. @@ -68,6 +74,8 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback } catch (KafkaException e) { throw new IOException(e.getMessage(), e); } + } else if (callback instanceof SaslExtensionsCallback) { + handleExtensionsCallback((SaslExtensionsCallback) callback); } else { throw new UnsupportedCallbackException(callback); } @@ -82,4 +90,26 @@ private void handleCallback(OAuthBearerTokenCallback callback) throws IOExceptio callback.token(flow.authenticate()); } } + + private void handleExtensionsCallback(SaslExtensionsCallback callback) { + + Map extensions = new HashMap<>(); + ClientInfo clientInfo = clientConfig.getClientInfo(); + + if (clientInfo.getTenant() != null) { + extensions.put("tenant", clientInfo.getTenant()); + } + if (clientInfo.getGroupId() != null) { + extensions.put("groupId", clientInfo.getGroupId()); + } + SaslExtensions saslExtensions = new SaslExtensions(extensions); + + try { + OAuthBearerClientInitialResponse.validateExtensions(saslExtensions); + } catch (SaslException e) { + throw new ConfigException(e.getMessage()); + } + + callback.extensions(saslExtensions); + } } diff --git a/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfigTest.java b/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfigTest.java index 71d351c06f..0ebd3d4ad9 100644 --- a/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfigTest.java +++ b/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfigTest.java @@ -15,6 +15,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Objects; import org.testng.Assert; import org.testng.annotations.Test; @@ -27,14 +28,34 @@ public class ClientConfigTest { @Test public void testValidConfig() { + String credentialsUrl = Objects.requireNonNull( + getClass().getClassLoader().getResource("private_key.json")).toString(); final ClientConfig clientConfig = ClientConfigHelper.create( "https://issuer-url.com", - "file:///etc/config/credentials.json", + credentialsUrl, "audience" ); Assert.assertEquals(clientConfig.getIssuerUrl().toString(), "https://issuer-url.com"); - Assert.assertEquals(clientConfig.getCredentialsUrl().toString(), "file:/etc/config/credentials.json"); + Assert.assertEquals(clientConfig.getCredentialsUrl().toString(), credentialsUrl); Assert.assertEquals(clientConfig.getAudience(), "audience"); + Assert.assertEquals(clientConfig.getClientInfo(), + new ClientInfo("my-id", "my-secret", "my-tenant", null)); + } + + @Test + public void testValidConfigWithGroupId() { + String credentialsUrl = Objects.requireNonNull( + getClass().getClassLoader().getResource("private_key_with_group_id.json")).toString(); + final ClientConfig clientConfig = ClientConfigHelper.create( + "https://issuer-url.com", + credentialsUrl, + "audience" + ); + Assert.assertEquals(clientConfig.getIssuerUrl().toString(), "https://issuer-url.com"); + Assert.assertEquals(clientConfig.getCredentialsUrl().toString(), credentialsUrl); + Assert.assertEquals(clientConfig.getAudience(), "audience"); + Assert.assertEquals(clientConfig.getClientInfo(), + new ClientInfo("my-id", "my-secret", "my-tenant", "my-group-id")); } @Test diff --git a/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlowTest.java b/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlowTest.java index 0efeae3003..16ce2aeb4f 100644 --- a/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlowTest.java +++ b/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlowTest.java @@ -35,20 +35,21 @@ public class ClientCredentialsFlowTest { public void testFindAuthorizationServer() throws IOException { final ClientCredentialsFlow flow = new ClientCredentialsFlow(ClientConfigHelper.create( "http://localhost:4444", // a local OAuth2 server started by init_hydra_oauth_server.sh - "file:///tmp/not_exist.json" + Objects.requireNonNull( + getClass().getClassLoader().getResource("private_key.json")).toString() )); final ClientCredentialsFlow.Metadata metadata = flow.findAuthorizationServer(); Assert.assertEquals(metadata.getTokenEndPoint(), "http://127.0.0.1:4444/oauth2/token"); } @Test - public void testLoadPrivateKey() throws Exception { - final ClientCredentialsFlow flow = new ClientCredentialsFlow(ClientConfigHelper.create( + public void testLoadPrivateKey() { + ClientConfig clientConfig = ClientConfigHelper.create( "http://localhost:4444", Objects.requireNonNull( getClass().getClassLoader().getResource("private_key.json")).toString() - )); - final ClientCredentialsFlow.ClientInfo clientInfo = flow.loadPrivateKey(); + ); + ClientInfo clientInfo = clientConfig.getClientInfo(); Assert.assertEquals(clientInfo.getId(), "my-id"); Assert.assertEquals(clientInfo.getSecret(), "my-secret"); Assert.assertEquals(clientInfo.getTenant(), "my-tenant"); diff --git a/oauth-client/src/test/resources/private_key_with_group_id.json b/oauth-client/src/test/resources/private_key_with_group_id.json new file mode 100644 index 0000000000..544894ea19 --- /dev/null +++ b/oauth-client/src/test/resources/private_key_with_group_id.json @@ -0,0 +1,6 @@ +{ + "client_id": "my-id", + "client_secret": "my-secret", + "tenant": "my-tenant", + "group_id": "my-group-id" +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/HydraOAuthUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/HydraOAuthUtils.java index b95dfd813a..062233eb83 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/HydraOAuthUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/HydraOAuthUtils.java @@ -17,9 +17,11 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; import io.fusionauth.jwks.domain.JSONWebKey; import io.jsonwebtoken.SignatureAlgorithm; import io.jsonwebtoken.security.Keys; +import io.streamnative.pulsar.handlers.kop.security.oauth.ClientInfo; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; @@ -41,6 +43,10 @@ public class HydraOAuthUtils { private static final String AUDIENCE = "http://example.com/api/v2/"; private static final AdminApi hydraAdmin = new AdminApi(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final ObjectWriter CLIENT_INFO_WRITER = + OBJECT_MAPPER.writerFor(ClientInfo.class); private static String publicKey; @@ -87,6 +93,11 @@ public static String createOAuthClient(String clientId, String clientSecret) thr } public static String createOAuthClient(String clientId, String clientSecret, String tenant) + throws IOException, ApiException { + return createOAuthClient(clientId, clientSecret, tenant, null); + } + + public static String createOAuthClient(String clientId, String clientSecret, String tenant, String groupId) throws ApiException, IOException { final OAuth2Client oAuth2Client = new OAuth2Client() .audience(Collections.singletonList(AUDIENCE)) @@ -102,18 +113,16 @@ public static String createOAuthClient(String clientId, String clientSecret, Str throw e; } } - return writeCredentialsFile(clientId, clientSecret, tenant, clientId + ".json"); + return writeCredentialsFile(clientId, clientSecret, tenant, groupId, clientId + ".json"); } public static String writeCredentialsFile(String clientId, - String clientSecret, - String tenant, - String basename) throws IOException { - final String content = "{\n" - + " \"client_id\": \"" + clientId + "\",\n" - + " \"client_secret\": \"" + clientSecret + (tenant != null ? "\",\n" : "\"\n") - + (tenant != null ? " \"tenant\": \"" + tenant + "\"\n" : "") - + "}\n"; + String clientSecret, + String tenant, + String groupId, + String basename) throws IOException { + ClientInfo clientInfo = new ClientInfo(clientId, clientSecret, tenant, groupId); + final String content = CLIENT_INFO_WRITER.writeValueAsString(clientInfo); File file = File.createTempFile("oauth-credentials-", basename); try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslOAuthKopHandlersTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslOAuthKopHandlersTest.java index c0f3a5048e..1ca334e96d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslOAuthKopHandlersTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslOAuthKopHandlersTest.java @@ -197,7 +197,7 @@ public void testGrantAndRevokePermission() throws Exception { public void testWrongSecret() throws IOException { final Properties producerProps = newKafkaProducerProperties(); internalConfigureOAuth2(producerProps, HydraOAuthUtils - .writeCredentialsFile(ADMIN_USER, ADMIN_SECRET + "-wrong", null, "test-wrong-secret.json")); + .writeCredentialsFile(ADMIN_USER, ADMIN_SECRET + "-wrong", null, null, "test-wrong-secret.json")); try { new KafkaProducer<>(producerProps); } catch (Exception e) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslOAuthKopHandlersWithGroupIdTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslOAuthKopHandlersWithGroupIdTest.java new file mode 100644 index 0000000000..77a5bf7a00 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslOAuthKopHandlersWithGroupIdTest.java @@ -0,0 +1,203 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import com.google.common.collect.Sets; +import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler; +import io.streamnative.pulsar.handlers.kop.security.oauth.OauthValidatorCallbackHandler; +import java.io.IOException; +import java.net.URL; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2; +import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import sh.ory.hydra.ApiException; + +@Slf4j +public class SaslOAuthKopHandlersWithGroupIdTest extends SaslOAuthBearerTestBase { + + private static final String ADMIN_USER = "simple_client_id"; + private static final String ADMIN_SECRET = "admin_secret"; + private static final String ISSUER_URL = "http://localhost:4444"; + private static final String AUDIENCE = "http://example.com/api/v2/"; + private static final String DEFAULT_GROUP_ID = "my-group"; + + private String adminCredentialPath = null; + + private String tenant = "my-tenant"; + + @BeforeClass(timeOut = 20000) + @Override + protected void setup() throws Exception { + String tokenPublicKey = HydraOAuthUtils.getPublicKeyStr(); + adminCredentialPath = HydraOAuthUtils.createOAuthClient(ADMIN_USER, ADMIN_SECRET); + super.resetConfig(); + // Broker's config + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + conf.setKafkaEnableMultiTenantMetadata(true); + conf.setAuthorizationProvider(SaslOAuthKopHandlersTest.OAuthMockAuthorizationProvider.class.getName()); + conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName())); + conf.setBrokerClientAuthenticationPlugin(AuthenticationOAuth2.class.getName()); + conf.setBrokerClientAuthenticationParameters(String.format("{\"type\":\"client_credentials\"," + + "\"privateKey\":\"%s\",\"issuerUrl\":\"%s\",\"audience\":\"%s\"}", + adminCredentialPath, ISSUER_URL, AUDIENCE)); + conf.setKafkaEnableAuthorizationForceGroupIdCheck(true); + final Properties properties = new Properties(); + properties.setProperty("tokenPublicKey", tokenPublicKey); + conf.setProperties(properties); + + // KoP's config + conf.setSaslAllowedMechanisms(Sets.newHashSet("OAUTHBEARER")); + conf.setKopOauth2AuthenticateCallbackHandler(OauthValidatorCallbackHandler.class.getName()); + conf.setKopOauth2ConfigFile("src/test/resources/kop-handler-oauth2.properties"); + + super.internalSetup(); + + admin.tenants().createTenant(tenant, + TenantInfo.builder() + .adminRoles(Collections.singleton(ADMIN_USER)) + .allowedClusters(Collections.singleton(configClusterName)) + .build()); + TenantInfo tenantInfo = admin.tenants().getTenantInfo(tenant); + log.info("TenantInfo for {} {} in test", tenant, tenantInfo); + assertNotNull(tenantInfo); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void createAdmin() throws Exception { + super.admin = PulsarAdmin.builder() + .serviceHttpUrl(brokerUrl.toString()) + .authentication( + AuthenticationFactoryOAuth2.clientCredentials( + new URL(ISSUER_URL), new URL(adminCredentialPath), AUDIENCE)) + .build(); + } + + @Test(timeOut = 30000) + public void testGrantAndRevokePermissionWithGroupId() throws Exception { + SaslOAuthKopHandlersTest.OAuthMockAuthorizationProvider.NULL_ROLE_STACKS.clear(); + + final String namespace = tenant + "/" + "test-grant-and-revoke-permission-with-group-id-ns"; + admin.namespaces().createNamespace(namespace); + final String topic = "persistent://" + namespace + "/test-grant-and-revoke-permission-with-group-id"; + final String role = "normal-role-" + System.currentTimeMillis(); + final String clientCredentialPath = HydraOAuthUtils.createOAuthClient(role, "secret", tenant); + + admin.namespaces().grantPermissionOnNamespace(namespace, role, Collections.singleton(AuthAction.produce)); + + final Properties consumerProps = newKafkaConsumerProperties(); + internalConfigureOAuth2(consumerProps, clientCredentialPath); + final KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Collections.singleton(topic)); + + admin.namespaces().grantPermissionOnNamespace(namespace, role, Sets.newHashSet(AuthAction.consume)); + // Only have consume permission, can't consume without subscription permission. + Assert.assertThrows(GroupAuthorizationException.class, () -> consumer.poll(Duration.ofSeconds(5))); + + consumer.close(); + + // Pass subscription permission, can consume now. + final String roleWithGroupId = "role-with-groupId" + System.currentTimeMillis(); + final String clientCredentialPathWithGroupId = + HydraOAuthUtils.createOAuthClient(roleWithGroupId, "secret", tenant, DEFAULT_GROUP_ID); + final Properties consumerPropsWithGroupId = newKafkaConsumerProperties(); + internalConfigureOAuth2(consumerPropsWithGroupId, clientCredentialPathWithGroupId); + final KafkaConsumer consumer2 = new KafkaConsumer<>(consumerPropsWithGroupId); + consumer2.subscribe(Collections.singleton(topic)); + + admin.namespaces().grantPermissionOnNamespace(namespace, roleWithGroupId, Sets.newHashSet(AuthAction.consume)); + admin.namespaces().grantPermissionOnSubscription(namespace, DEFAULT_GROUP_ID, Sets.newHashSet(roleWithGroupId)); + + consumer2.poll(Duration.ofSeconds(5)); + + assertEquals(SaslOAuthKopHandlersTest.OAuthMockAuthorizationProvider.NULL_ROLE_STACKS.size(), 0); + } + + @Test(timeOut = 30000, expectedExceptions = org.apache.kafka.common.errors.GroupAuthorizationException.class) + public void testDifferentGroupId() throws PulsarAdminException, IOException, ApiException { + SaslOAuthKopHandlersTest.OAuthMockAuthorizationProvider.NULL_ROLE_STACKS.clear(); + + final String namespace = tenant + "/" + "test-different-group-id-ns"; + admin.namespaces().createNamespace(namespace); + final String topic = "persistent://" + namespace + "/test-grant-and-revoke-permission"; + // Pass subscription permission, can consume now. + final String roleWithGroupId = "role-with-groupId" + System.currentTimeMillis(); + final String clientCredentialPathWithGroupId = + HydraOAuthUtils.createOAuthClient(roleWithGroupId, "secret", tenant, DEFAULT_GROUP_ID); + admin.namespaces().grantPermissionOnNamespace(namespace, roleWithGroupId, Sets.newHashSet(AuthAction.consume)); + admin.namespaces().grantPermissionOnSubscription(namespace, DEFAULT_GROUP_ID, Sets.newHashSet(roleWithGroupId)); + + final Properties consumerPropsWithGroupId = newKafkaConsumerProperties(); + internalConfigureOAuth2(consumerPropsWithGroupId, clientCredentialPathWithGroupId); + consumerPropsWithGroupId.put(ConsumerConfig.GROUP_ID_CONFIG, "different-group-id"); + final KafkaConsumer consumer = new KafkaConsumer<>(consumerPropsWithGroupId); + consumer.subscribe(Collections.singleton(topic)); + + consumer.poll(Duration.ofSeconds(5)); + + assertEquals(SaslOAuthKopHandlersTest.OAuthMockAuthorizationProvider.NULL_ROLE_STACKS.size(), 0); + } + + private void internalConfigureOAuth2(final Properties props, final String credentialPath, + Class callbackHandler) { + props.setProperty("sasl.login.callback.handler.class", callbackHandler.getName()); + props.setProperty("security.protocol", "SASL_PLAINTEXT"); + props.setProperty("sasl.mechanism", "OAUTHBEARER"); + + final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" + + " oauth.issuer.url=\"%s\"" + + " oauth.credentials.url=\"%s\"" + + " oauth.audience=\"%s\";"; + props.setProperty("sasl.jaas.config", String.format(jaasTemplate, + ISSUER_URL, + credentialPath, + AUDIENCE + )); + } + + private void internalConfigureOAuth2(final Properties props, final String credentialPath) { + internalConfigureOAuth2(props, credentialPath, OauthLoginCallbackHandler.class); + } + + @Override + protected void configureOAuth2(final Properties props) { + internalConfigureOAuth2(props, adminCredentialPath); + } + +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java index 593e1e0596..b929f72a12 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizerTest.java @@ -112,7 +112,7 @@ protected void setup() throws Exception { admin.namespaces().grantPermissionOnNamespace(TENANT + "/" + NAMESPACE, CONSUMER_USER, Sets.newHashSet(AuthAction.consume)); - simpleAclAuthorizer = new SimpleAclAuthorizer(pulsar); + simpleAclAuthorizer = new SimpleAclAuthorizer(pulsar, conf); } @Override @@ -131,37 +131,37 @@ protected void cleanup() throws Exception { @Test public void testAuthorizeProduce() throws ExecutionException, InterruptedException { Boolean isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, null, new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null, null, new AuthenticationDataCommand(PRODUCE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null, null, new AuthenticationDataCommand(CONSUMER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null, null, new AuthenticationDataCommand(ANOTHER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, null, new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ADMIN_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ADMIN_USER, null, null, new AuthenticationDataCommand(ADMIN_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); @@ -170,31 +170,31 @@ public void testAuthorizeProduce() throws ExecutionException, InterruptedExcepti @Test public void testAuthorizeConsume() throws ExecutionException, InterruptedException { Boolean isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, null, new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null, null, new AuthenticationDataCommand(PRODUCE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null, null, new AuthenticationDataCommand(CONSUMER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null, null, new AuthenticationDataCommand(ANOTHER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, null, new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertFalse(isAuthorized); @@ -203,31 +203,31 @@ public void testAuthorizeConsume() throws ExecutionException, InterruptedExcepti @Test public void testAuthorizeLookup() throws ExecutionException, InterruptedException { Boolean isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, null, new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, PRODUCE_USER, null, null, new AuthenticationDataCommand(PRODUCE_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, CONSUMER_USER, null, null, new AuthenticationDataCommand(CONSUMER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ANOTHER_USER, null, null, new AuthenticationDataCommand(ANOTHER_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SIMPLE_USER, null, null, new AuthenticationDataCommand(SIMPLE_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertFalse(isAuthorized); @@ -239,28 +239,28 @@ public void testAuthorizeTenantAdmin() throws ExecutionException, InterruptedExc // TENANT_ADMIN_USER can't produce don't exist tenant's topic, // because tenant admin depend on exist tenant. Boolean isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null, null, new AuthenticationDataCommand(TENANT_ADMIN_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertFalse(isAuthorized); // ADMIN_USER can produce don't exist tenant's topic, because is superuser. isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ADMIN_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, ADMIN_USER, null, null, new AuthenticationDataCommand(ADMIN_USER)), Resource.of(ResourceType.TOPIC, NOT_EXISTS_TENANT_TOPIC)).get(); assertTrue(isAuthorized); // TENANT_ADMIN_USER can produce. isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null, null, new AuthenticationDataCommand(TENANT_ADMIN_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); // TENANT_ADMIN_USER can create or delete Topic isAuthorized = simpleAclAuthorizer.canManageTenantAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TENANT_ADMIN_USER, null, null, new AuthenticationDataCommand(TENANT_ADMIN_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertTrue(isAuthorized); @@ -273,27 +273,27 @@ public void testTopicLevelPermissions() throws PulsarAdminException, ExecutionEx admin.topics().grantPermission(topic, TOPIC_LEVEL_PERMISSIONS_USER, Sets.newHashSet(AuthAction.produce)); Boolean isAuthorized = simpleAclAuthorizer.canLookupAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, null, new AuthenticationDataCommand(TOPIC_LEVEL_PERMISSIONS_USER)), Resource.of(ResourceType.TOPIC, topic)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, null, new AuthenticationDataCommand(TOPIC_LEVEL_PERMISSIONS_USER)), Resource.of(ResourceType.TOPIC, topic)).get(); assertTrue(isAuthorized); isAuthorized = simpleAclAuthorizer.canConsumeAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, null, new AuthenticationDataCommand(TOPIC_LEVEL_PERMISSIONS_USER)), Resource.of(ResourceType.TOPIC, topic)).get(); assertFalse(isAuthorized); isAuthorized = simpleAclAuthorizer.canProduceAsync( - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, TOPIC_LEVEL_PERMISSIONS_USER, null, null, new AuthenticationDataCommand(TOPIC_LEVEL_PERMISSIONS_USER)), Resource.of(ResourceType.TOPIC, TOPIC)).get(); assertFalse(isAuthorized); } -} \ No newline at end of file +}