Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Pass groupId to authorizer when using OAuth
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Jun 28, 2023
1 parent dfb3285 commit d123a7c
Show file tree
Hide file tree
Showing 31 changed files with 802 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
: null;
final boolean authorizationEnabled = pulsarService.getBrokerService().isAuthorizationEnabled();
this.authorizer = authorizationEnabled && authenticationEnabled
? new SimpleAclAuthorizer(pulsarService)
? new SimpleAclAuthorizer(pulsarService, kafkaConfig)
: null;
this.adminManager = adminManager;
this.producePurgatory = producePurgatory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private boolean kafkaEnableMultiTenantMetadata = true;

@FieldContext(
category = CATEGORY_KOP,
required = true,
doc = "Use to enable/disable Kafka authorization force groupId check."
)
private boolean kafkaEnableAuthorizationForceGroupIdCheck = false;

@FieldContext(
category = CATEGORY_KOP,
required = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public SchemaRegistryManager(KafkaServiceConfiguration kafkaConfig,
this.kafkaConfig = kafkaConfig;
this.pulsarClient = SystemTopicClient.createPulsarClient(pulsar, kafkaConfig, (___) -> {});
this.pulsar = pulsar;
Authorizer authorizer = new SimpleAclAuthorizer(pulsar);
Authorizer authorizer = new SimpleAclAuthorizer(pulsar, kafkaConfig);
this.schemaRegistryRequestAuthenticator = new HttpRequestAuthenticator(this.kafkaConfig,
authenticationService, authorizer);
}
Expand Down Expand Up @@ -136,7 +136,8 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio
private void performAuthorizationValidation(String username, String role, String tenant)
throws SchemaStorageException {
if (kafkaConfig.isAuthorizationEnabled() && kafkaConfig.isKafkaEnableMultiTenantMetadata()) {
KafkaPrincipal kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null);
KafkaPrincipal kafkaPrincipal =
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null, null);
String topicName = MetadataUtils.constructSchemaRegistryTopicName(tenant, kafkaConfig);
try {
Boolean tenantExists =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@ public class KafkaPrincipal implements Principal {
*/
private final String tenantSpec;

private final String groupId;

private final AuthenticationDataSource authenticationData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.streamnative.pulsar.handlers.kop.security;

import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.AUTH_DATA_SOURCE_PROP;
import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.GROUP_ID_PROP;
import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.USER_NAME_PROP;

import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
Expand Down Expand Up @@ -156,6 +157,9 @@ public Object getNegotiatedProperty(String propName) {
if (USER_NAME_PROP.equals(propName)) {
return username;
}
if (GROUP_ID_PROP.equals(propName)) {
return "";
}
if (AUTH_DATA_SOURCE_PROP.equals(propName)) {
return authDataSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
public class SaslAuthenticator {

public static final String USER_NAME_PROP = "username";
public static final String GROUP_ID_PROP = "groupId";
public static final String AUTH_DATA_SOURCE_PROP = "authDataSource";
public static final String AUTHENTICATION_SERVER_OBJ = "authenticationServerObj";
public static final String REQUEST_TIMEOUT_MS = "requestTimeoutMs";
Expand Down Expand Up @@ -443,6 +444,7 @@ private void handleSaslToken(ChannelHandlerContext ctx,
newSession = new Session(
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(),
safeGetProperty(saslServer, USER_NAME_PROP),
safeGetProperty(saslServer, GROUP_ID_PROP),
safeGetProperty(saslServer, AUTH_DATA_SOURCE_PROP)),
"old-clientId");
if (!tenantAccessValidationFunction.apply(newSession)) {
Expand Down Expand Up @@ -502,6 +504,7 @@ private void handleSaslToken(ChannelHandlerContext ctx,
this.session = new Session(
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, pulsarRole,
safeGetProperty(saslServer, USER_NAME_PROP),
safeGetProperty(saslServer, GROUP_ID_PROP),
safeGetProperty(saslServer, AUTH_DATA_SOURCE_PROP)),
header.clientId());
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.security.auth;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.security.sasl.SaslException;
import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.utils.Utils;


public class OAuthBearerClientInitialResponse {
static final String SEPARATOR = "\u0001";

private static final String SASLNAME = "(?:[\\x01-\\x7F&&[^=,]]|=2C|=3D)+";
private static final String KEY = "[A-Za-z]+";
private static final String VALUE = "[\\x21-\\x7E \t\r\n]+";

private static final String KVPAIRS = String.format("(%s=%s%s)*", KEY, VALUE, SEPARATOR);
private static final Pattern AUTH_PATTERN =
Pattern.compile("(?<scheme>[\\w]+)[ ]+(?<token>[-_\\.a-zA-Z0-9\\=\\/\\+]+)");
private static final Pattern CLIENT_INITIAL_RESPONSE_PATTERN = Pattern.compile(
String.format("n,(a=(?<authzid>%s))?,%s(?<kvpairs>%s)%s", SASLNAME, SEPARATOR, KVPAIRS, SEPARATOR));
public static final String AUTH_KEY = "auth";

private final String tokenValue;
private final String authorizationId;
private SaslExtensions saslExtensions;

public static final Pattern EXTENSION_KEY_PATTERN = Pattern.compile(KEY);
public static final Pattern EXTENSION_VALUE_PATTERN = Pattern.compile(VALUE);

public OAuthBearerClientInitialResponse(byte[] response) throws SaslException {
String responseMsg = new String(response, StandardCharsets.UTF_8);
Matcher matcher = CLIENT_INITIAL_RESPONSE_PATTERN.matcher(responseMsg);
if (!matcher.matches()) {
throw new SaslException("Invalid OAUTHBEARER client first message");
}
String authzid = matcher.group("authzid");
this.authorizationId = authzid == null ? "" : authzid;
String kvPairs = matcher.group("kvpairs");
Map<String, String> properties = Utils.parseMap(kvPairs, "=", SEPARATOR);
String auth = properties.get(AUTH_KEY);
if (auth == null) {
throw new SaslException("Invalid OAUTHBEARER client first message: 'auth' not specified");
}
properties.remove(AUTH_KEY);
SaslExtensions extensions = new SaslExtensions(properties);
validateExtensions(extensions);
this.saslExtensions = extensions;

Matcher authMatcher = AUTH_PATTERN.matcher(auth);
if (!authMatcher.matches()) {
throw new SaslException("Invalid OAUTHBEARER client first message: invalid 'auth' format");
}
if (!"bearer".equalsIgnoreCase(authMatcher.group("scheme"))) {
String msg = String.format("Invalid scheme in OAUTHBEARER client first message: %s",
matcher.group("scheme"));
throw new SaslException(msg);
}
this.tokenValue = authMatcher.group("token");
}

/**
* Constructor.
*
* @param tokenValue
* the mandatory token value
* @param extensions
* the optional extensions
* @throws SaslException
* if any extension name or value fails to conform to the required
* regular expression as defined by the specification, or if the
* reserved {@code auth} appears as a key
*/
public OAuthBearerClientInitialResponse(String tokenValue, SaslExtensions extensions) throws SaslException {
this(tokenValue, "", extensions);
}

/**
* Constructor.
*
* @param tokenValue
* the mandatory token value
* @param authorizationId
* the optional authorization ID
* @param extensions
* the optional extensions
* @throws SaslException
* if any extension name or value fails to conform to the required
* regular expression as defined by the specification, or if the
* reserved {@code auth} appears as a key
*/
public OAuthBearerClientInitialResponse(String tokenValue, String authorizationId, SaslExtensions extensions)
throws SaslException {
this.tokenValue = Objects.requireNonNull(tokenValue, "token value must not be null");
this.authorizationId = authorizationId == null ? "" : authorizationId;
validateExtensions(extensions);
this.saslExtensions = extensions != null ? extensions : SaslExtensions.NO_SASL_EXTENSIONS;
}

/**
* Return the always non-null extensions.
*
* @return the always non-null extensions
*/
public SaslExtensions extensions() {
return saslExtensions;
}

public byte[] toBytes() {
String authzid = authorizationId.isEmpty() ? "" : "a=" + authorizationId;
String extensions = extensionsMessage();
if (extensions.length() > 0) {
extensions = SEPARATOR + extensions;
}
String message = String.format("n,%s,%sauth=Bearer %s%s%s%s", authzid,
SEPARATOR, tokenValue, extensions, SEPARATOR, SEPARATOR);

return message.getBytes(StandardCharsets.UTF_8);
}

/**
* Return the always non-null token value.
*
* @return the always non-null toklen value
*/
public String tokenValue() {
return tokenValue;
}

/**
* Return the always non-null authorization ID.
*
* @return the always non-null authorization ID
*/
public String authorizationId() {
return authorizationId;
}

/**
* Validates that the given extensions conform to the standard.
* They should also not contain the reserve key name {@link OAuthBearerClientInitialResponse#AUTH_KEY}
*
* @param extensions
* optional extensions to validate
* @throws SaslException
* if any extension name or value fails to conform to the required
* regular expression as defined by the specification, or if the
* reserved {@code auth} appears as a key
*
* @see <a href="https://tools.ietf.org/html/rfc7628#section-3.1">RFC 7628,
* Section 3.1</a>
*/
public static void validateExtensions(SaslExtensions extensions) throws SaslException {
if (extensions == null) {
return;
}
if (extensions.map().containsKey(OAuthBearerClientInitialResponse.AUTH_KEY)) {
throw new SaslException("Extension name " + OAuthBearerClientInitialResponse.AUTH_KEY + " is invalid");
}
for (Map.Entry<String, String> entry : extensions.map().entrySet()) {
String extensionName = entry.getKey();
String extensionValue = entry.getValue();

if (!EXTENSION_KEY_PATTERN.matcher(extensionName).matches()) {
throw new SaslException("Extension name " + extensionName + " is invalid");
}

if (!EXTENSION_VALUE_PATTERN.matcher(extensionValue).matches()) {
throw new SaslException("Extension value (" + extensionValue + ") for extension "
+ extensionName + " is invalid");
}
}
}

/**
* Converts the SASLExtensions to an OAuth protocol-friendly string.
*/
private String extensionsMessage() {
return Utils.mkString(saslExtensions.map(), "", "", "=", SEPARATOR);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

import static com.google.common.base.Preconditions.checkArgument;

import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.common.naming.NamespaceName;
Expand All @@ -38,9 +40,12 @@ public class SimpleAclAuthorizer implements Authorizer {

private final AuthorizationService authorizationService;

public SimpleAclAuthorizer(PulsarService pulsarService) {
private final boolean forceCheckGroupId;

public SimpleAclAuthorizer(PulsarService pulsarService, KafkaServiceConfiguration config) {
this.pulsarService = pulsarService;
this.authorizationService = pulsarService.getBrokerService().getAuthorizationService();
this.forceCheckGroupId = config.isKafkaEnableAuthorizationForceGroupIdCheck();
}

protected PulsarService getPulsarService() {
Expand Down Expand Up @@ -169,8 +174,11 @@ public CompletableFuture<Boolean> canConsumeAsync(KafkaPrincipal principal, Reso
checkArgument(resource.getResourceType() == ResourceType.TOPIC,
String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType()));
TopicName topicName = TopicName.get(resource.getName());
if (forceCheckGroupId && StringUtils.isBlank(principal.getGroupId())) {
return CompletableFuture.completedFuture(false);
}
return authorizationService.canConsumeAsync(
topicName, principal.getName(), principal.getAuthenticationData(), "");
topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId());
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.IOException;
import java.util.Base64;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class ExtensionTokenData {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectWriter EXTENSION_WRITER = OBJECT_MAPPER.writer();
private static final ObjectReader EXTENSION_READER = OBJECT_MAPPER.readerFor(ExtensionTokenData.class);

@JsonProperty("tenant")
private String tenant;

@JsonProperty("groupId")
private String groupId;

public static ExtensionTokenData decode(String extensionData) throws IOException {
return EXTENSION_READER.readValue(Base64.getDecoder().decode(extensionData));
}

public boolean hasExtensionData() {
return tenant != null || groupId != null;
}

public String encode() throws JsonProcessingException {
return Base64.getEncoder().encodeToString(EXTENSION_WRITER.writeValueAsBytes(this));
}
}
Loading

0 comments on commit d123a7c

Please sign in to comment.