Skip to content

Commit

Permalink
[improve] Pass group ID to authorizer when using OAuth (streamnative#…
Browse files Browse the repository at this point in the history
…1926)

### Motivation
Currently, the KoP only supports checking the topic permission when
doing the consume authorization, but Kafka support checking the topic
and group ID permission.

Before introducing this change, let's understand why KoP can't check
group ID permission.

When Kafka does the consume authorization check, it will first check the
group ID permission in `handleJoinGroupRequest` method, then it will
check the topic permission in the `handleFetchRequest` method.

However, Pulsar is using another way to check consume permission. See
the
`org.apache.pulsar.broker.authorization.AuthorizationService#canConsumeAsync`
method, it requires passing the topic name and subscription to check
both permissions in the same place, and the topic name is required
param.
```
public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData,String subscription)
```

If we follow the Kafka way to check the consumer permission, we can't
get the topic name when joining a group since the join group request
does not contain the topic name. So we have to authorize permission in
the fetch request.

However, to authorization consume permission in the fetch request, we
can only get the topic name in this request. In this case, we can't
authorize the group ID.

### Modifications
Since we can't get group ID when handling fetch requests, we need to
find a way to pass through group ID when doing the authentication.

In OAuth, we have a `credentials_file.json` file that needs to be
config, for example:
```
{
	"client_id": "my-id",
	"client_secret": "my-secret",
	"tenant": "my-tenant"
}
```
Here we can add a new parameter into the config:
```
{
	"client_id": "my-id",
	"client_secret": "my-secret",
	"tenant": "my-tenant",
        "group_id": "my-group-id"
}
```
Then we can add these parameters to `SaslExtensions`, and send it to the
broker.

(cherry picked from commit e108e44)
  • Loading branch information
Demogorgon314 authored and BewareMyPower committed Jul 12, 2023
1 parent 70ae5ec commit 88fd648
Show file tree
Hide file tree
Showing 22 changed files with 582 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
: null;
final boolean authorizationEnabled = pulsarService.getBrokerService().isAuthorizationEnabled();
this.authorizer = authorizationEnabled && authenticationEnabled
? new SimpleAclAuthorizer(pulsarService)
? new SimpleAclAuthorizer(pulsarService, kafkaConfig)
: null;
this.adminManager = adminManager;
this.producePurgatory = producePurgatory;
Expand Down Expand Up @@ -964,58 +964,86 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato
checkArgument(findCoordinator.getRequest() instanceof FindCoordinatorRequest);
FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest();

String pulsarTopicName;
int partition;
CompletableFuture<Void> storeGroupIdFuture;
if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) {
TransactionCoordinator transactionCoordinator = getTransactionCoordinator();
partition = transactionCoordinator.partitionFor(request.data().key());
pulsarTopicName = transactionCoordinator.getTopicPartitionName(partition);
storeGroupIdFuture = CompletableFuture.completedFuture(null);
} else if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.GROUP.id()) {
partition = getGroupCoordinator().partitionFor(request.data().key());
pulsarTopicName = getGroupCoordinator().getTopicPartitionName(partition);
if (kafkaConfig.isKopEnableGroupLevelConsumerMetrics()) {
String groupId = request.data().key();
String groupIdPath = GroupIdUtils.groupIdPathFormat(findCoordinator.getClientHost(),
findCoordinator.getHeader().clientId());
currentConnectedClientId.add(findCoordinator.getHeader().clientId());
int partition = transactionCoordinator.partitionFor(request.data().key());
String pulsarTopicName = transactionCoordinator.getTopicPartitionName(partition);
findBroker(TopicName.get(pulsarTopicName))
.whenComplete((KafkaResponseUtils.BrokerLookupResult result, Throwable throwable) -> {
if (result.error != Errors.NONE || throwable != null) {
log.error("[{}] Request {}: Error while find coordinator.",
ctx.channel(), findCoordinator.getHeader(), throwable);

resultFuture.complete(KafkaResponseUtils
.newFindCoordinator(Errors.LEADER_NOT_AVAILABLE));
return;
}

// Store group name to metadata store for current client, use to collect consumer metrics.
storeGroupIdFuture = storeGroupId(groupId, groupIdPath);
} else {
storeGroupIdFuture = CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("[{}] Found node {} as coordinator for key {} partition {}.",
ctx.channel(), result.node, request.data().key(), partition);
}
resultFuture.complete(KafkaResponseUtils.newFindCoordinator(result.node));
});
} else if (request.data().keyType() == FindCoordinatorRequest.CoordinatorType.GROUP.id()) {
authorize(AclOperation.DESCRIBE, Resource.of(ResourceType.GROUP, request.data().key()))
.whenComplete((isAuthorized, ex) -> {
if (ex != null) {
log.error("Describe group authorize failed, group - {}. {}",
request.data().key(), ex.getMessage());
resultFuture.complete(KafkaResponseUtils
.newFindCoordinator(Errors.GROUP_AUTHORIZATION_FAILED));
return;
}
if (!isAuthorized) {
resultFuture.complete(
KafkaResponseUtils
.newFindCoordinator(Errors.GROUP_AUTHORIZATION_FAILED));
return;
}
CompletableFuture<Void> storeGroupIdFuture;
int partition = getGroupCoordinator().partitionFor(request.data().key());
String pulsarTopicName = getGroupCoordinator().getTopicPartitionName(partition);
if (kafkaConfig.isKopEnableGroupLevelConsumerMetrics()) {
String groupId = request.data().key();
String groupIdPath = GroupIdUtils.groupIdPathFormat(findCoordinator.getClientHost(),
findCoordinator.getHeader().clientId());
currentConnectedClientId.add(findCoordinator.getHeader().clientId());

// Store group name to metadata store for current client, use to collect consumer metrics.
storeGroupIdFuture = storeGroupId(groupId, groupIdPath);
} else {
storeGroupIdFuture = CompletableFuture.completedFuture(null);
}
// Store group name to metadata store for current client, use to collect consumer metrics.
storeGroupIdFuture.whenComplete((__, e) -> {
if (e != null) {
log.warn("Store groupId failed, the groupId might already stored.", e);
}
findBroker(TopicName.get(pulsarTopicName))
.whenComplete((KafkaResponseUtils.BrokerLookupResult result,
Throwable throwable) -> {
if (result.error != Errors.NONE || throwable != null) {
log.error("[{}] Request {}: Error while find coordinator.",
ctx.channel(), findCoordinator.getHeader(), throwable);

resultFuture.complete(KafkaResponseUtils
.newFindCoordinator(Errors.LEADER_NOT_AVAILABLE));
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Found node {} as coordinator for key {} partition {}.",
ctx.channel(), result.node, request.data().key(), partition);
}
resultFuture.complete(KafkaResponseUtils.newFindCoordinator(result.node));
});
});
});
} else {
throw new NotImplementedException("FindCoordinatorRequest not support unknown type "
+ request.data().keyType());
}

// Store group name to metadata store for current client, use to collect consumer metrics.
storeGroupIdFuture
.whenComplete((__, ex) -> {
if (ex != null) {
log.warn("Store groupId failed, the groupId might already stored.", ex);
}
findBroker(TopicName.get(pulsarTopicName))
.whenComplete((KafkaResponseUtils.BrokerLookupResult result, Throwable throwable) -> {
if (result.error != Errors.NONE || throwable != null) {
log.error("[{}] Request {}: Error while find coordinator.",
ctx.channel(), findCoordinator.getHeader(), throwable);

resultFuture.complete(KafkaResponseUtils
.newFindCoordinator(Errors.LEADER_NOT_AVAILABLE));
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Found node {} as coordinator for key {} partition {}.",
ctx.channel(), result.node, request.data().key(), partition);
}
resultFuture.complete(KafkaResponseUtils.newFindCoordinator(result.node));
});
});
}

@VisibleForTesting
Expand Down Expand Up @@ -2877,6 +2905,8 @@ protected CompletableFuture<Boolean> authorize(AclOperation operation, Resource
isAuthorizedFuture = authorizer.canLookupAsync(session.getPrincipal(), resource);
} else if (resource.getResourceType() == ResourceType.NAMESPACE) {
isAuthorizedFuture = authorizer.canGetTopicList(session.getPrincipal(), resource);
} else if (resource.getResourceType() == ResourceType.GROUP) {
isAuthorizedFuture = authorizer.canDescribeConsumerGroup(session.getPrincipal(), resource);
}
break;
case CREATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private boolean kafkaEnableMultiTenantMetadata = true;

@FieldContext(
category = CATEGORY_KOP,
required = true,
doc = "Use to enable/disable Kafka authorization force groupId check."
)
private boolean kafkaEnableAuthorizationForceGroupIdCheck = false;

@FieldContext(
category = CATEGORY_KOP,
required = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public SchemaRegistryManager(KafkaServiceConfiguration kafkaConfig,
this.kafkaConfig = kafkaConfig;
this.pulsarClient = SystemTopicClient.createPulsarClient(pulsar, kafkaConfig, (___) -> {});
this.pulsar = pulsar;
Authorizer authorizer = new SimpleAclAuthorizer(pulsar);
Authorizer authorizer = new SimpleAclAuthorizer(pulsar, kafkaConfig);
this.schemaRegistryRequestAuthenticator = new HttpRequestAuthenticator(this.kafkaConfig,
authenticationService, authorizer);
}
Expand Down Expand Up @@ -136,7 +136,8 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio
private void performAuthorizationValidation(String username, String role, String tenant)
throws SchemaStorageException {
if (kafkaConfig.isAuthorizationEnabled() && kafkaConfig.isKafkaEnableMultiTenantMetadata()) {
KafkaPrincipal kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null);
KafkaPrincipal kafkaPrincipal =
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null, null);
String topicName = MetadataUtils.constructSchemaRegistryTopicName(tenant, kafkaConfig);
try {
Boolean tenantExists =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@ public class KafkaPrincipal implements Principal {
*/
private final String tenantSpec;

private final String groupId;

private final AuthenticationDataSource authenticationData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.streamnative.pulsar.handlers.kop.security;

import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.AUTH_DATA_SOURCE_PROP;
import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.GROUP_ID_PROP;
import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.USER_NAME_PROP;

import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
Expand Down Expand Up @@ -156,6 +157,9 @@ public Object getNegotiatedProperty(String propName) {
if (USER_NAME_PROP.equals(propName)) {
return username;
}
if (GROUP_ID_PROP.equals(propName)) {
return "";
}
if (AUTH_DATA_SOURCE_PROP.equals(propName)) {
return authDataSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
public class SaslAuthenticator {

public static final String USER_NAME_PROP = "username";
public static final String GROUP_ID_PROP = "groupId";
public static final String AUTH_DATA_SOURCE_PROP = "authDataSource";
public static final String AUTHENTICATION_SERVER_OBJ = "authenticationServerObj";
public static final String REQUEST_TIMEOUT_MS = "requestTimeoutMs";
Expand Down Expand Up @@ -443,6 +444,7 @@ private void handleSaslToken(ChannelHandlerContext ctx,
newSession = new Session(
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(),
safeGetProperty(saslServer, USER_NAME_PROP),
safeGetProperty(saslServer, GROUP_ID_PROP),
safeGetProperty(saslServer, AUTH_DATA_SOURCE_PROP)),
"old-clientId");
if (!tenantAccessValidationFunction.apply(newSession)) {
Expand Down Expand Up @@ -502,6 +504,7 @@ private void handleSaslToken(ChannelHandlerContext ctx,
this.session = new Session(
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, pulsarRole,
safeGetProperty(saslServer, USER_NAME_PROP),
safeGetProperty(saslServer, GROUP_ID_PROP),
safeGetProperty(saslServer, AUTH_DATA_SOURCE_PROP)),
header.clientId());
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,6 @@ public interface Authorizer {
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> canDeleteGroupAsync(KafkaPrincipal principal, Resource resource);

CompletableFuture<Boolean> canDescribeConsumerGroup(KafkaPrincipal principal, Resource resource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public enum ResourceType {
* The Consumer Group.
*/
GROUP((byte) 4)

;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
*/
package io.streamnative.pulsar.handlers.kop.security.auth;

import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.common.naming.NamespaceName;
Expand All @@ -35,9 +38,12 @@ public class SimpleAclAuthorizer implements Authorizer {

private final AuthorizationService authorizationService;

public SimpleAclAuthorizer(PulsarService pulsarService) {
private final boolean forceCheckGroupId;

public SimpleAclAuthorizer(PulsarService pulsarService, KafkaServiceConfiguration config) {
this.pulsarService = pulsarService;
this.authorizationService = pulsarService.getBrokerService().getAuthorizationService();
this.forceCheckGroupId = config.isKafkaEnableAuthorizationForceGroupIdCheck();
}

protected PulsarService getPulsarService() {
Expand Down Expand Up @@ -152,8 +158,27 @@ public CompletableFuture<Boolean> canProduceAsync(KafkaPrincipal principal, Reso
public CompletableFuture<Boolean> canConsumeAsync(KafkaPrincipal principal, Resource resource) {
checkResourceType(resource, ResourceType.TOPIC);
TopicName topicName = TopicName.get(resource.getName());
if (forceCheckGroupId && StringUtils.isBlank(principal.getGroupId())) {
return CompletableFuture.completedFuture(false);
}
return authorizationService.canConsumeAsync(
topicName, principal.getName(), principal.getAuthenticationData(), "");
topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId());
}

@Override
public CompletableFuture<Boolean> canDescribeConsumerGroup(KafkaPrincipal principal, Resource resource) {
if (!forceCheckGroupId) {
return CompletableFuture.completedFuture(true);
}
if (StringUtils.isBlank(principal.getGroupId())) {
return CompletableFuture.completedFuture(false);
}
boolean isSameGroup = Objects.equals(principal.getGroupId(), resource.getName());
if (log.isDebugEnabled()) {
log.debug("Principal [{}] for resource [{}] isSameGroup [{}]", principal, resource, isSameGroup);
}
return CompletableFuture.completedFuture(isSameGroup);

}

/**
Expand Down
Loading

0 comments on commit 88fd648

Please sign in to comment.