From 66d7b650599f91f2d5f20843dbe0f030e87bb989 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Thu, 29 Jun 2023 13:05:36 +0800 Subject: [PATCH] Use SaslExtensions pass groupId --- .../OAuthBearerClientInitialResponse.java | 197 ------------------ .../security/oauth/ExtensionTokenData.java | 61 ------ .../oauth/KopOAuthBearerSaslServer.java | 91 ++++++-- .../security/oauth/KopOAuthBearerToken.java | 5 - .../oauth/KopOAuthBearerUnsecuredJws.java | 7 - ...arerUnsecuredValidatorCallbackHandler.java | 45 ++-- .../kop/security/oauth/OAuthTokenDecoder.java | 25 +-- .../oauth/OauthValidatorCallbackHandler.java | 29 +-- .../security/oauth/OAuthTokenDecoderTest.java | 18 +- .../kop/security/oauth/ClientConfig.java | 53 ++++- .../security/oauth/ClientCredentialsFlow.java | 42 +--- .../security/oauth/ExtensionTokenData.java | 49 ----- .../security/oauth/OAuthBearerTokenImpl.java | 17 +- .../oauth/OauthLoginCallbackHandler.java | 31 +++ .../oauth/ClientCredentialsFlowTest.java | 30 ++- .../src/test/resources/private_key.json | 3 +- .../resources/private_key_with_tenant.json | 5 - .../private_key_with_tenant_and_groupId.json | 6 - .../CustomOAuthBearerCallbackHandlerTest.java | 5 - .../pulsar/handlers/kop/HydraOAuthUtils.java | 8 +- .../oauth/ExtensionTokenDataTest.java | 33 --- .../OauthValidatorCallbackHandlerTest.java | 5 +- 22 files changed, 227 insertions(+), 538 deletions(-) delete mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/OAuthBearerClientInitialResponse.java delete mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenData.java delete mode 100644 oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenData.java delete mode 100644 oauth-client/src/test/resources/private_key_with_tenant.json delete mode 100644 oauth-client/src/test/resources/private_key_with_tenant_and_groupId.json delete mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenDataTest.java diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/OAuthBearerClientInitialResponse.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/OAuthBearerClientInitialResponse.java deleted file mode 100644 index de997ae4c8..0000000000 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/OAuthBearerClientInitialResponse.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.pulsar.handlers.kop.security.auth; - -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Objects; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.security.sasl.SaslException; -import org.apache.kafka.common.security.auth.SaslExtensions; -import org.apache.kafka.common.utils.Utils; - - -public class OAuthBearerClientInitialResponse { - static final String SEPARATOR = "\u0001"; - - private static final String SASLNAME = "(?:[\\x01-\\x7F&&[^=,]]|=2C|=3D)+"; - private static final String KEY = "[A-Za-z]+"; - private static final String VALUE = "[\\x21-\\x7E \t\r\n]+"; - - private static final String KVPAIRS = String.format("(%s=%s%s)*", KEY, VALUE, SEPARATOR); - private static final Pattern AUTH_PATTERN = - Pattern.compile("(?[\\w]+)[ ]+(?[-_\\.a-zA-Z0-9\\=\\/\\+]+)"); - private static final Pattern CLIENT_INITIAL_RESPONSE_PATTERN = Pattern.compile( - String.format("n,(a=(?%s))?,%s(?%s)%s", SASLNAME, SEPARATOR, KVPAIRS, SEPARATOR)); - public static final String AUTH_KEY = "auth"; - - private final String tokenValue; - private final String authorizationId; - private SaslExtensions saslExtensions; - - public static final Pattern EXTENSION_KEY_PATTERN = Pattern.compile(KEY); - public static final Pattern EXTENSION_VALUE_PATTERN = Pattern.compile(VALUE); - - public OAuthBearerClientInitialResponse(byte[] response) throws SaslException { - String responseMsg = new String(response, StandardCharsets.UTF_8); - Matcher matcher = CLIENT_INITIAL_RESPONSE_PATTERN.matcher(responseMsg); - if (!matcher.matches()) { - throw new SaslException("Invalid OAUTHBEARER client first message"); - } - String authzid = matcher.group("authzid"); - this.authorizationId = authzid == null ? "" : authzid; - String kvPairs = matcher.group("kvpairs"); - Map properties = Utils.parseMap(kvPairs, "=", SEPARATOR); - String auth = properties.get(AUTH_KEY); - if (auth == null) { - throw new SaslException("Invalid OAUTHBEARER client first message: 'auth' not specified"); - } - properties.remove(AUTH_KEY); - SaslExtensions extensions = new SaslExtensions(properties); - validateExtensions(extensions); - this.saslExtensions = extensions; - - Matcher authMatcher = AUTH_PATTERN.matcher(auth); - if (!authMatcher.matches()) { - throw new SaslException("Invalid OAUTHBEARER client first message: invalid 'auth' format"); - } - if (!"bearer".equalsIgnoreCase(authMatcher.group("scheme"))) { - String msg = String.format("Invalid scheme in OAUTHBEARER client first message: %s", - matcher.group("scheme")); - throw new SaslException(msg); - } - this.tokenValue = authMatcher.group("token"); - } - - /** - * Constructor. - * - * @param tokenValue - * the mandatory token value - * @param extensions - * the optional extensions - * @throws SaslException - * if any extension name or value fails to conform to the required - * regular expression as defined by the specification, or if the - * reserved {@code auth} appears as a key - */ - public OAuthBearerClientInitialResponse(String tokenValue, SaslExtensions extensions) throws SaslException { - this(tokenValue, "", extensions); - } - - /** - * Constructor. - * - * @param tokenValue - * the mandatory token value - * @param authorizationId - * the optional authorization ID - * @param extensions - * the optional extensions - * @throws SaslException - * if any extension name or value fails to conform to the required - * regular expression as defined by the specification, or if the - * reserved {@code auth} appears as a key - */ - public OAuthBearerClientInitialResponse(String tokenValue, String authorizationId, SaslExtensions extensions) - throws SaslException { - this.tokenValue = Objects.requireNonNull(tokenValue, "token value must not be null"); - this.authorizationId = authorizationId == null ? "" : authorizationId; - validateExtensions(extensions); - this.saslExtensions = extensions != null ? extensions : SaslExtensions.NO_SASL_EXTENSIONS; - } - - /** - * Return the always non-null extensions. - * - * @return the always non-null extensions - */ - public SaslExtensions extensions() { - return saslExtensions; - } - - public byte[] toBytes() { - String authzid = authorizationId.isEmpty() ? "" : "a=" + authorizationId; - String extensions = extensionsMessage(); - if (extensions.length() > 0) { - extensions = SEPARATOR + extensions; - } - String message = String.format("n,%s,%sauth=Bearer %s%s%s%s", authzid, - SEPARATOR, tokenValue, extensions, SEPARATOR, SEPARATOR); - - return message.getBytes(StandardCharsets.UTF_8); - } - - /** - * Return the always non-null token value. - * - * @return the always non-null toklen value - */ - public String tokenValue() { - return tokenValue; - } - - /** - * Return the always non-null authorization ID. - * - * @return the always non-null authorization ID - */ - public String authorizationId() { - return authorizationId; - } - - /** - * Validates that the given extensions conform to the standard. - * They should also not contain the reserve key name {@link OAuthBearerClientInitialResponse#AUTH_KEY} - * - * @param extensions - * optional extensions to validate - * @throws SaslException - * if any extension name or value fails to conform to the required - * regular expression as defined by the specification, or if the - * reserved {@code auth} appears as a key - * - * @see RFC 7628, - * Section 3.1 - */ - public static void validateExtensions(SaslExtensions extensions) throws SaslException { - if (extensions == null) { - return; - } - if (extensions.map().containsKey(OAuthBearerClientInitialResponse.AUTH_KEY)) { - throw new SaslException("Extension name " + OAuthBearerClientInitialResponse.AUTH_KEY + " is invalid"); - } - for (Map.Entry entry : extensions.map().entrySet()) { - String extensionName = entry.getKey(); - String extensionValue = entry.getValue(); - - if (!EXTENSION_KEY_PATTERN.matcher(extensionName).matches()) { - throw new SaslException("Extension name " + extensionName + " is invalid"); - } - - if (!EXTENSION_VALUE_PATTERN.matcher(extensionValue).matches()) { - throw new SaslException("Extension value (" + extensionValue + ") for extension " - + extensionName + " is invalid"); - } - } - } - - /** - * Converts the SASLExtensions to an OAuth protocol-friendly string. - */ - private String extensionsMessage() { - return Utils.mkString(saslExtensions.map(), "", "", "=", SEPARATOR); - } -} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenData.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenData.java deleted file mode 100644 index e9603e72b2..0000000000 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenData.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.pulsar.handlers.kop.security.oauth; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.ObjectWriter; -import java.io.IOException; -import java.util.Base64; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import lombok.ToString; - -@Data -@ToString -@EqualsAndHashCode -@NoArgsConstructor -@AllArgsConstructor -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -public class ExtensionTokenData { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final ObjectWriter EXTENSION_WRITER = OBJECT_MAPPER.writer(); - private static final ObjectReader EXTENSION_READER = OBJECT_MAPPER.readerFor(ExtensionTokenData.class); - - @JsonProperty("tenant") - private String tenant; - - @JsonProperty("groupId") - private String groupId; - - public static ExtensionTokenData decode(String extensionData) throws IOException { - return EXTENSION_READER.readValue(Base64.getDecoder().decode(extensionData)); - } - - public boolean hasExtensionData() { - return tenant != null || groupId != null; - } - - public String encode() throws JsonProcessingException { - return Base64.getEncoder().encodeToString(EXTENSION_WRITER.writeValueAsBytes(this)); - } -} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerSaslServer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerSaslServer.java index 92605a28f8..093fffe91f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerSaslServer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerSaslServer.java @@ -17,10 +17,10 @@ import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.GROUP_ID_PROP; import static io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator.USER_NAME_PROP; -import io.streamnative.pulsar.handlers.kop.security.auth.OAuthBearerClientInitialResponse; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Map; import java.util.Objects; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -30,12 +30,23 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.authenticator.SaslInternalConfigs; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; +import org.apache.kafka.common.utils.Utils; +/** + * {@code SaslServer} implementation for SASL/OAUTHBEARER in Kafka. An instance + * of {@link OAuthBearerToken} is available upon successful authentication via + * the negotiated property "{@code OAUTHBEARER.token}"; the token could be used + * in a custom authorizer (to authorize based on JWT claims rather than ACLs, + * for example). + */ @Slf4j public class KopOAuthBearerSaslServer implements SaslServer { - - // Copy from OAuthBearerSaslClient.BYTE_CONTROL_A private static final byte BYTE_CONTROL_A = (byte) 0x01; private static final String NEGOTIATED_PROPERTY_KEY_TOKEN = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM + ".token"; private static final String INTERNAL_ERROR_ON_SERVER = @@ -47,6 +58,7 @@ public class KopOAuthBearerSaslServer implements SaslServer { private boolean complete; private KopOAuthBearerToken tokenForNegotiatedProperty = null; private String errorMessage = null; + private SaslExtensions extensions; public KopOAuthBearerSaslServer(CallbackHandler callbackHandler, String defaultKafkaMetadataTenant) { if (!(Objects.requireNonNull(callbackHandler) instanceof AuthenticateCallbackHandler)) { @@ -73,7 +85,7 @@ public KopOAuthBearerSaslServer(CallbackHandler callbackHandler, String defaultK @Override public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException { if (response.length == 1 && response[0] == BYTE_CONTROL_A && errorMessage != null) { - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Received %x01 response from client after it received our error"); } throw new SaslAuthenticationException(errorMessage); @@ -86,7 +98,7 @@ public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthen log.debug(e.getMessage()); throw e; } - return process(clientResponse.tokenValue(), clientResponse.authorizationId()); + return process(clientResponse.tokenValue(), clientResponse.authorizationId(), clientResponse.extensions()); } @Override @@ -108,6 +120,12 @@ public Object getNegotiatedProperty(String propName) { throw new IllegalStateException("Authentication exchange has not completed"); } + if (NEGOTIATED_PROPERTY_KEY_TOKEN.equals(propName)) { + return tokenForNegotiatedProperty; + } + if (SaslInternalConfigs.CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY.equals(propName)) { + return tokenForNegotiatedProperty.lifetimeMs(); + } if (AUTH_DATA_SOURCE_PROP.equals(propName)) { return tokenForNegotiatedProperty.authDataSource(); } @@ -115,16 +133,20 @@ public Object getNegotiatedProperty(String propName) { if (tokenForNegotiatedProperty.tenant() != null) { return tokenForNegotiatedProperty.tenant(); } + String tenant = extensions.map().get(propName); + if (tenant != null) { + return tenant; + } return defaultKafkaMetadataTenant; } if (GROUP_ID_PROP.equals(propName)) { - log.info("getNegotiatedProperty: {}", tokenForNegotiatedProperty.groupId()); - if (tokenForNegotiatedProperty.groupId() == null) { - return ""; + String groupId = extensions.map().get(propName); + if (groupId != null) { + return groupId; } - return tokenForNegotiatedProperty.groupId(); + return ""; } - return NEGOTIATED_PROPERTY_KEY_TOKEN.equals(propName) ? tokenForNegotiatedProperty : null; + return extensions.map().get(propName); } @Override @@ -133,7 +155,7 @@ public boolean isComplete() { } @Override - public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException { + public byte[] unwrap(byte[] incoming, int offset, int len) { if (!complete) { throw new IllegalStateException("Authentication exchange has not completed"); } @@ -141,7 +163,7 @@ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException } @Override - public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException { + public byte[] wrap(byte[] outgoing, int offset, int len) { if (!complete) { throw new IllegalStateException("Authentication exchange has not completed"); } @@ -149,21 +171,19 @@ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException { } @Override - public void dispose() throws SaslException { + public void dispose() { complete = false; tokenForNegotiatedProperty = null; + extensions = null; } - private byte[] process(String tokenValue, String authorizationId) throws SaslException { + private byte[] process(String tokenValue, String authorizationId, SaslExtensions extensions) + throws SaslException { KopOAuthBearerValidatorCallback callback = new KopOAuthBearerValidatorCallback(tokenValue); try { - callbackHandler.handle(new Callback[]{callback}); + callbackHandler.handle(new Callback[] {callback}); } catch (IOException | UnsupportedCallbackException e) { - String msg = String.format("%s: %s", INTERNAL_ERROR_ON_SERVER, e.getMessage()); - if (log.isDebugEnabled()) { - log.debug(msg, e); - } - throw new SaslException(msg); + handleCallbackError(e); } KopOAuthBearerToken token = callback.token(); if (token == null) { @@ -184,8 +204,10 @@ private byte[] process(String tokenValue, String authorizationId) throws SaslExc + "that is different from the token's principal name (%s)", authorizationId, token.principalName())); } + Map validExtensions = processExtensions(token, extensions); tokenForNegotiatedProperty = token; + this.extensions = new SaslExtensions(validExtensions); complete = true; if (log.isDebugEnabled()) { log.debug("Successfully authenticate User={}", token.principalName()); @@ -193,6 +215,30 @@ private byte[] process(String tokenValue, String authorizationId) throws SaslExc return new byte[0]; } + private Map processExtensions(OAuthBearerToken token, SaslExtensions extensions) + throws SaslException { + OAuthBearerExtensionsValidatorCallback extensionsCallback = + new OAuthBearerExtensionsValidatorCallback(token, extensions); + try { + callbackHandler.handle(new Callback[] {extensionsCallback}); + } catch (UnsupportedCallbackException e) { + // backwards compatibility - no extensions will be added + } catch (IOException e) { + handleCallbackError(e); + } + if (!extensionsCallback.invalidExtensions().isEmpty()) { + String errorMessage = String.format("Authentication failed: %d extensions are invalid! They are: %s", + extensionsCallback.invalidExtensions().size(), + Utils.mkString(extensionsCallback.invalidExtensions(), "", "", ": ", "; ")); + if (log.isDebugEnabled()) { + log.debug(errorMessage); + } + throw new SaslAuthenticationException(errorMessage); + } + + return extensionsCallback.validatedExtensions(); + } + private static String jsonErrorResponse(String errorStatus, String errorScope, String errorOpenIDConfiguration) { String jsonErrorResponse = String.format("{\"status\":\"%s\"", errorStatus); if (errorScope != null) { @@ -206,4 +252,9 @@ private static String jsonErrorResponse(String errorStatus, String errorScope, S return jsonErrorResponse; } + private void handleCallbackError(Exception e) throws SaslException { + String msg = String.format("%s: %s", INTERNAL_ERROR_ON_SERVER, e.getMessage()); + log.debug(msg, e); + throw new SaslException(msg); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerToken.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerToken.java index 68437de95e..127eb976f9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerToken.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerToken.java @@ -32,10 +32,5 @@ public interface KopOAuthBearerToken extends OAuthBearerToken { * Pass the tenant to oauth server if credentials set. */ String tenant(); - - /** - * Pass the groupId to oauth server if credentials set. - */ - String groupId(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerUnsecuredJws.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerUnsecuredJws.java index 71e59acf85..108bac47fb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerUnsecuredJws.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerUnsecuredJws.java @@ -31,7 +31,6 @@ public class KopOAuthBearerUnsecuredJws extends OAuthBearerUnsecuredJws implemen private final String tenant; - private final String groupId; /** * Constructor with the given principal and scope claim names. * @@ -47,14 +46,12 @@ public class KopOAuthBearerUnsecuredJws extends OAuthBearerUnsecuredJws implemen */ public KopOAuthBearerUnsecuredJws(String compactSerialization, String tenant, - String groupId, String principalClaimName, String scopeClaimName) throws OAuthBearerIllegalTokenException { super(compactSerialization, principalClaimName, scopeClaimName); this.authData = new AuthenticationDataCommand(compactSerialization); this.tenant = tenant; - this.groupId = groupId; } @Override @@ -67,8 +64,4 @@ public String tenant() { return tenant; } - @Override - public String groupId() { - return groupId; - } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerUnsecuredValidatorCallbackHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerUnsecuredValidatorCallbackHandler.java index 2a2bbcab94..6cd2b2c163 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerUnsecuredValidatorCallbackHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/KopOAuthBearerUnsecuredValidatorCallbackHandler.java @@ -107,7 +107,7 @@ public void close() { // empty } - private void handleCallback(KopOAuthBearerValidatorCallback callback) throws OAuthBearerIllegalTokenException { + private void handleCallback(KopOAuthBearerValidatorCallback callback) { String tokenValue = callback.tokenValue(); if (tokenValue == null) { throw new IllegalArgumentException("Callback missing required token value"); @@ -117,31 +117,24 @@ private void handleCallback(KopOAuthBearerValidatorCallback callback) throws OAu List requiredScope = requiredScope(); int allowableClockSkewMs = allowableClockSkewMs(); // Extract real token. - try { - Pair tokenAndTenant = OAuthTokenDecoder.decode(tokenValue); - final String token = tokenAndTenant.getLeft(); - final ExtensionTokenData extensionTokenData = tokenAndTenant.getRight(); - final String tenant = extensionTokenData != null ? extensionTokenData.getTenant() : null; - final String groupId = extensionTokenData != null ? extensionTokenData.getGroupId() : null; - KopOAuthBearerUnsecuredJws unsecuredJwt = new KopOAuthBearerUnsecuredJws(token, tenant, groupId, - principalClaimName, scopeClaimName); - long now = time.milliseconds(); - OAuthBearerValidationUtils - .validateClaimForExistenceAndType(unsecuredJwt, true, principalClaimName, String.class) - .throwExceptionIfFailed(); - OAuthBearerValidationUtils.validateIssuedAt(unsecuredJwt, false, now, allowableClockSkewMs) - .throwExceptionIfFailed(); - OAuthBearerValidationUtils.validateExpirationTime(unsecuredJwt, now, allowableClockSkewMs) - .throwExceptionIfFailed(); - OAuthBearerValidationUtils.validateTimeConsistency(unsecuredJwt).throwExceptionIfFailed(); - OAuthBearerValidationUtils.validateScope(unsecuredJwt, requiredScope).throwExceptionIfFailed(); - log.info("Successfully validated token with principal {}: {}", unsecuredJwt.principalName(), - unsecuredJwt.claims().toString()); - callback.token(unsecuredJwt); - } catch (IOException e) { - throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult.newFailure(e.getMessage())); - } - + Pair tokenAndTenant = OAuthTokenDecoder.decode(tokenValue); + final String token = tokenAndTenant.getLeft(); + final String tenant = tokenAndTenant.getRight(); + KopOAuthBearerUnsecuredJws unsecuredJwt = new KopOAuthBearerUnsecuredJws(token, tenant, + principalClaimName, scopeClaimName); + long now = time.milliseconds(); + OAuthBearerValidationUtils + .validateClaimForExistenceAndType(unsecuredJwt, true, principalClaimName, String.class) + .throwExceptionIfFailed(); + OAuthBearerValidationUtils.validateIssuedAt(unsecuredJwt, false, now, allowableClockSkewMs) + .throwExceptionIfFailed(); + OAuthBearerValidationUtils.validateExpirationTime(unsecuredJwt, now, allowableClockSkewMs) + .throwExceptionIfFailed(); + OAuthBearerValidationUtils.validateTimeConsistency(unsecuredJwt).throwExceptionIfFailed(); + OAuthBearerValidationUtils.validateScope(unsecuredJwt, requiredScope).throwExceptionIfFailed(); + log.info("Successfully validated token with principal {}: {}", unsecuredJwt.principalName(), + unsecuredJwt.claims().toString()); + callback.token(unsecuredJwt); } private String principalClaimName() { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthTokenDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthTokenDecoder.java index 44cbb8f7a2..bf8a8dd986 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthTokenDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthTokenDecoder.java @@ -13,38 +13,31 @@ */ package io.streamnative.pulsar.handlers.kop.security.oauth; -import java.io.IOException; import lombok.NonNull; import org.apache.commons.lang3.tuple.Pair; - public class OAuthTokenDecoder { - protected static final String EXTENSION_DATA_DELIMITER = "__with_extension_data_"; + protected static final String DELIMITER = "__with_tenant_"; /** * Decode the raw token to token and tenant. * - * @param rawToken Raw token, it contains token and tenant. - * Format: Token + "__with_extension_data_" + base64(json()). + * @param rawToken Raw token, it contains token and tenant. Format: Tenant + "__with_tenant_" + Token. * @return Token and tenant pair. Left is token, right is tenant. */ - public static Pair decode(@NonNull String rawToken) throws IOException { + public static Pair decode(@NonNull String rawToken) { final String token; - final String extensionData; + final String tenant; // Extract real token. - int idx = rawToken.indexOf(EXTENSION_DATA_DELIMITER); + int idx = rawToken.indexOf(DELIMITER); if (idx != -1) { - extensionData = rawToken.substring(idx + EXTENSION_DATA_DELIMITER.length()); - token = rawToken.substring(0, idx); + token = rawToken.substring(idx + DELIMITER.length()); + tenant = rawToken.substring(0, idx); } else { token = rawToken; - extensionData = null; - } - - if (extensionData == null || extensionData.isEmpty()) { - return Pair.of(token, null); + tenant = null; } - return Pair.of(token, ExtensionTokenData.decode(extensionData)); + return Pair.of(token, tenant); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandler.java index e4b02cdbc9..1b0a2094ba 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandler.java @@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException; import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerValidationResult; @@ -102,12 +103,20 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback validatorCallback.error(failureScope != null ? "insufficient_scope" : "invalid_token", failureScope, failureReason.failureOpenIdConfig()); } + } else if (callback instanceof OAuthBearerExtensionsValidatorCallback) { + handleExtensionsValidatorCallback((OAuthBearerExtensionsValidatorCallback) callback); } else { throw new UnsupportedCallbackException(callback); } } } + private void handleExtensionsValidatorCallback( + OAuthBearerExtensionsValidatorCallback extensionsValidatorCallback) { + extensionsValidatorCallback.inputExtensions().map() + .forEach((extensionName, v) -> extensionsValidatorCallback.valid(extensionName)); + } + @VisibleForTesting protected void handleCallback(KopOAuthBearerValidatorCallback callback) { if (callback.tokenValue() == null) { @@ -124,14 +133,12 @@ protected void handleCallback(KopOAuthBearerValidatorCallback callback) { final String tokenWithTenant = callback.tokenValue(); - + // Extract real token. + Pair tokenAndTenant = OAuthTokenDecoder.decode(tokenWithTenant); + final String token = tokenAndTenant.getLeft(); + final String tenant = tokenAndTenant.getRight(); try { - // Extract real token. - Pair tokenAndTenant = OAuthTokenDecoder.decode(tokenWithTenant); - final String token = tokenAndTenant.getLeft(); - ExtensionTokenData extensionTokenData = tokenAndTenant.getRight(); - log.info("ExtensionTokenData: {}", extensionTokenData); AuthData authData = AuthData.of(token.getBytes(StandardCharsets.UTF_8)); final AuthenticationState authState = authenticationProvider.newAuthState( authData, null, null); @@ -167,12 +174,7 @@ public AuthenticationDataSource authDataSource() { @Override public String tenant() { - return extensionTokenData != null ? extensionTokenData.getTenant() : null; - } - - @Override - public String groupId() { - return extensionTokenData != null ? extensionTokenData.getGroupId() : null; + return tenant; } @Override @@ -181,8 +183,7 @@ public Long startTimeMs() { return Long.MAX_VALUE; } }); - } catch (AuthenticationException | InterruptedException | ExecutionException | TimeoutException - | IOException e) { + } catch (AuthenticationException | InterruptedException | ExecutionException | TimeoutException e) { log.error("OAuth validator callback handler new auth state failed: ", e); throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult.newFailure(e.getMessage())); } diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthTokenDecoderTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthTokenDecoderTest.java index 92a0928322..eab6891472 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthTokenDecoderTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthTokenDecoderTest.java @@ -16,25 +16,21 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; -import java.io.IOException; import org.apache.commons.lang3.tuple.Pair; import org.testng.annotations.Test; - /** * Test {@link OAuthTokenDecoder}. */ public class OAuthTokenDecoderTest { @Test - public void testDecode() throws IOException { - Pair result = OAuthTokenDecoder.decode("my-token"); - assertEquals(result.getLeft(), "my-token"); - assertNull(result.getRight()); - result = OAuthTokenDecoder.decode("my-token" - + OAuthTokenDecoder.EXTENSION_DATA_DELIMITER - + "eyJ0ZW5hbnQiOiJteS10ZW5hbnQiLCJncm91cElkIjoibXktZ3JvdXAtaWQifQ=="); - assertEquals(result.getLeft(), "my-token"); - assertEquals(result.getRight(), new ExtensionTokenData("my-tenant", "my-group-id")); + public void testDecode() { + Pair tokenAndTenant = OAuthTokenDecoder.decode("my-token"); + assertEquals(tokenAndTenant.getLeft(), "my-token"); + assertNull(tokenAndTenant.getRight()); + tokenAndTenant = OAuthTokenDecoder.decode("my-tenant" + OAuthTokenDecoder.DELIMITER + "my-token"); + assertEquals(tokenAndTenant.getLeft(), "my-token"); + assertEquals(tokenAndTenant.getRight(), "my-tenant"); } } diff --git a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java index 1f51442d29..ae741b25bf 100644 --- a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java +++ b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java @@ -13,10 +13,22 @@ */ package io.streamnative.pulsar.handlers.kop.security.oauth; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; +import java.net.URLConnection; import java.util.Map; +import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; /** * The client configs associated with OauthLoginCallbackHandler. @@ -26,6 +38,10 @@ @Getter public class ClientConfig { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final ObjectReader CLIENT_INFO_READER = OBJECT_MAPPER.readerFor(ClientInfo.class); + public static final String OAUTH_ISSUER_URL = "oauth.issuer.url"; public static final String OAUTH_CREDENTIALS_URL = "oauth.credentials.url"; public static final String OAUTH_AUDIENCE = "oauth.audience"; @@ -35,6 +51,7 @@ public class ClientConfig { private final URL credentialsUrl; private final String audience; private final String scope; + private final ClientInfo clientInfo; public ClientConfig(Map configs) { final String issuerUrlString = configs.get(OAUTH_ISSUER_URL); @@ -58,8 +75,42 @@ public ClientConfig(Map configs) { throw new IllegalArgumentException(String.format( "invalid %s \"%s\": %s", OAUTH_CREDENTIALS_URL, credentialsUrlString, e.getMessage())); } - + try { + this.clientInfo = loadPrivateKey(); + } catch (IOException e) { + throw new IllegalArgumentException(String.format( + "failed to load client credentials from %s: %s", credentialsUrlString, e.getMessage())); + } this.audience = configs.getOrDefault(OAUTH_AUDIENCE, null); this.scope = configs.getOrDefault(OAUTH_SCOPE, null); } + + @VisibleForTesting + ClientInfo loadPrivateKey() throws IOException { + final URLConnection connection = getCredentialsUrl().openConnection(); + try (InputStream inputStream = connection.getInputStream()) { + return CLIENT_INFO_READER.readValue(inputStream); + } + } + + @Getter + @ToString + @NoArgsConstructor + @AllArgsConstructor + @JsonIgnoreProperties(ignoreUnknown = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class ClientInfo { + + @JsonProperty("client_id") + private String id; + + @JsonProperty("client_secret") + private String secret; + + @JsonProperty("tenant") + private String tenant; + + @JsonProperty("group_id") + private String groupId; + } } diff --git a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlow.java b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlow.java index 85d76b04a3..7245f9877c 100644 --- a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlow.java +++ b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlow.java @@ -14,7 +14,6 @@ package io.streamnative.pulsar.handlers.kop.security.oauth; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -34,10 +33,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.ToString; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClientConfig; @@ -50,7 +46,6 @@ public class ClientCredentialsFlow implements Closeable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectReader METADATA_READER = OBJECT_MAPPER.readerFor(Metadata.class); - private static final ObjectReader CLIENT_INFO_READER = OBJECT_MAPPER.readerFor(ClientInfo.class); private static final ObjectReader TOKEN_RESULT_READER = OBJECT_MAPPER.readerFor(OAuthBearerTokenImpl.class); private static final ObjectReader TOKEN_ERROR_READER = OBJECT_MAPPER.readerFor(TokenError.class); @@ -76,7 +71,7 @@ protected ClientCredentialsFlow(ClientConfig clientConfig, AsyncHttpClient httpC public OAuthBearerTokenImpl authenticate() throws IOException { final String tokenEndPoint = findAuthorizationServer().getTokenEndPoint(); - final ClientInfo clientInfo = loadPrivateKey(); + final ClientConfig.ClientInfo clientInfo = clientConfig.getClientInfo(); try { final String body = buildClientCredentialsBody(clientInfo); final Response response = httpClient.preparePost(tokenEndPoint) @@ -93,11 +88,6 @@ public OAuthBearerTokenImpl authenticate() throws IOException { if (tenant != null) { token.setTenant(tenant); } - String groupId = clientInfo.getGroupId(); - if (groupId != null) { - token.setGroupId(groupId); - } - token.setExtensionTokenData(); return token; case 400: // Bad request case 401: // Unauthorized @@ -133,19 +123,12 @@ Metadata findAuthorizationServer() throws IOException { } } - @VisibleForTesting - ClientInfo loadPrivateKey() throws IOException { - final URLConnection connection = clientConfig.getCredentialsUrl().openConnection(); - try (InputStream inputStream = connection.getInputStream()) { - return CLIENT_INFO_READER.readValue(inputStream); - } - } private static String encode(String s) throws UnsupportedEncodingException { return URLEncoder.encode(s, StandardCharsets.UTF_8.name()); } - private String buildClientCredentialsBody(ClientInfo clientInfo) throws UnsupportedEncodingException { + private String buildClientCredentialsBody(ClientConfig.ClientInfo clientInfo) throws UnsupportedEncodingException { final Map bodyMap = new HashMap<>(); bodyMap.put("grant_type", "client_credentials"); bodyMap.put("client_id", encode(clientInfo.getId())); @@ -167,27 +150,6 @@ public static class Metadata { private String tokenEndPoint; } - @Getter - @ToString - @NoArgsConstructor - @AllArgsConstructor - @JsonIgnoreProperties(ignoreUnknown = true) - @JsonInclude(JsonInclude.Include.NON_NULL) - public static class ClientInfo { - - @JsonProperty("client_id") - private String id; - - @JsonProperty("client_secret") - private String secret; - - @JsonProperty("tenant") - private String tenant; - - @JsonProperty("group_id") - private String groupId; - } - @Getter @JsonIgnoreProperties(ignoreUnknown = true) public static class TokenError { diff --git a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenData.java b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenData.java deleted file mode 100644 index a166e25b1a..0000000000 --- a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenData.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.pulsar.handlers.kop.security.oauth; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import java.util.Base64; -import lombok.Data; -import lombok.ToString; - - -@Data -@ToString -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -public class ExtensionTokenData { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final ObjectWriter EXTENSION_WRITER = OBJECT_MAPPER.writer(); - - @JsonProperty("tenant") - private String tenant; - - @JsonProperty("groupId") - private String groupId; - - public boolean hasExtensionData() { - return tenant != null || groupId != null; - } - - public String encode() throws JsonProcessingException { - return Base64.getEncoder().encodeToString(EXTENSION_WRITER.writeValueAsBytes(this)); - } -} diff --git a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthBearerTokenImpl.java b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthBearerTokenImpl.java index cb63cb1269..af929363a0 100644 --- a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthBearerTokenImpl.java +++ b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OAuthBearerTokenImpl.java @@ -15,7 +15,6 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; import java.util.Arrays; import java.util.Collections; import java.util.Set; @@ -25,9 +24,7 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class OAuthBearerTokenImpl implements OAuthBearerToken { - protected static final String EXTENSION_DATA_DELIMITER = "__with_extension_data_"; - - protected ExtensionTokenData extensionTokenData = new ExtensionTokenData(); + protected static final String DELIMITER = "__with_tenant_"; @JsonProperty("access_token") private String accessToken; @@ -46,17 +43,7 @@ public String value() { } public void setTenant(String tenant) { - this.extensionTokenData.setTenant(tenant); - } - - public void setGroupId(String groupId) { - this.extensionTokenData.setGroupId(groupId); - } - - public void setExtensionTokenData() throws JsonProcessingException { - if (extensionTokenData.hasExtensionData()){ - this.accessToken = this.accessToken + EXTENSION_DATA_DELIMITER + this.extensionTokenData.encode(); - } + this.accessToken = tenant + DELIMITER + accessToken; } @Override diff --git a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthLoginCallbackHandler.java b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthLoginCallbackHandler.java index b16763815d..9c24e6fa86 100644 --- a/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthLoginCallbackHandler.java +++ b/oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthLoginCallbackHandler.java @@ -14,16 +14,22 @@ package io.streamnative.pulsar.handlers.kop.security.oauth; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import javax.security.auth.callback.Callback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.SaslException; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; /** * OAuth 2.0 login callback handler. @@ -49,6 +55,7 @@ public void configure(Map configs, String saslMechanism, List extensions = new HashMap<>(); + ClientConfig.ClientInfo clientInfo = clientConfig.getClientInfo(); + + if (clientInfo.getTenant() != null) { + extensions.put("tenant", clientInfo.getTenant()); + } + if (clientInfo.getGroupId() != null) { + extensions.put("groupId", clientInfo.getGroupId()); + } + SaslExtensions saslExtensions = new SaslExtensions(extensions); + + try { + OAuthBearerClientInitialResponse.validateExtensions(saslExtensions); + } catch (SaslException e) { + throw new ConfigException(e.getMessage()); + } + + callback.extensions(saslExtensions); + } } diff --git a/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlowTest.java b/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlowTest.java index 86b5817bc7..64e114ffe4 100644 --- a/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlowTest.java +++ b/oauth-client/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlowTest.java @@ -19,11 +19,9 @@ import static org.mockito.Mockito.spy; import java.io.IOException; -import java.net.URL; import java.util.Collections; import java.util.Objects; import java.util.concurrent.ExecutionException; -import org.apache.commons.lang3.StringUtils; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.ListenableFuture; @@ -58,38 +56,38 @@ public void testFindAuthorizationServer() throws IOException { @Test public void testLoadPrivateKey() throws Exception { - final ClientCredentialsFlow flow = new ClientCredentialsFlow(ClientConfigHelper.create( + ClientConfig clientConfig = ClientConfigHelper.create( "http://localhost:4444", Objects.requireNonNull( getClass().getClassLoader().getResource("private_key.json")).toString() - )); - final ClientCredentialsFlow.ClientInfo clientInfo = flow.loadPrivateKey(); + ); + ClientConfig.ClientInfo clientInfo = clientConfig.getClientInfo(); Assert.assertEquals(clientInfo.getId(), "my-id"); Assert.assertEquals(clientInfo.getSecret(), "my-secret"); } @Test public void testLoadPrivateKeyWithExtensionData() throws Exception { - final ClientCredentialsFlow flow = new ClientCredentialsFlow(ClientConfigHelper.create( + ClientConfig clientConfig = ClientConfigHelper.create( "http://localhost:4444", Objects.requireNonNull( - getClass().getClassLoader().getResource("private_key_with_tenant_and_groupId.json")) + getClass().getClassLoader().getResource("private_key_with_tenant_and_groupId.json")) .toString() - )); - final ClientCredentialsFlow.ClientInfo clientInfo = flow.loadPrivateKey(); + ); + ClientConfig.ClientInfo clientInfo = clientConfig.getClientInfo(); Assert.assertEquals(clientInfo.getId(), "my-id"); Assert.assertEquals(clientInfo.getSecret(), "my-secret"); Assert.assertEquals(clientInfo.getTenant(), "my-tenant"); Assert.assertEquals(clientInfo.getGroupId(), "my-group-id"); } - @Test(dataProvider = "extensionDataProvider") - public void testTenantToken(URL credentialsUrl, String expectedExtensionData) - throws ExecutionException, InterruptedException, IOException { + @Test + public void testTenantToken() throws ExecutionException, InterruptedException, IOException { AsyncHttpClient mockHttpClient = mock(AsyncHttpClient.class); final ClientCredentialsFlow flow = spy(new ClientCredentialsFlow(ClientConfigHelper.create( "http://localhost:4444", - Objects.requireNonNull(credentialsUrl).toString() + Objects.requireNonNull( + getClass().getClassLoader().getResource("private_key.json")).toString() ), mockHttpClient)); ClientCredentialsFlow.Metadata mockMetadata = mock(ClientCredentialsFlow.Metadata.class); @@ -114,11 +112,7 @@ public void testTenantToken(URL credentialsUrl, String expectedExtensionData) doReturn(mockBuilder).when(mockBuilder).setBody(anyString()); OAuthBearerTokenImpl token = flow.authenticate(); - String expactedToken = "my-token"; - if (StringUtils.isNotBlank(expectedExtensionData)) { - expactedToken = expactedToken + OAuthBearerTokenImpl.EXTENSION_DATA_DELIMITER + expectedExtensionData; - } - Assert.assertEquals(token.value(), expactedToken); + Assert.assertEquals(token.value(), "my-tenant" + OAuthBearerTokenImpl.DELIMITER + "my-token"); Assert.assertEquals(token.scope(), Collections.singleton("test")); } } diff --git a/oauth-client/src/test/resources/private_key.json b/oauth-client/src/test/resources/private_key.json index df809c989c..0730189e0f 100644 --- a/oauth-client/src/test/resources/private_key.json +++ b/oauth-client/src/test/resources/private_key.json @@ -1,4 +1,5 @@ { "client_id": "my-id", - "client_secret": "my-secret" + "client_secret": "my-secret", + "tenant": "my-tenant" } diff --git a/oauth-client/src/test/resources/private_key_with_tenant.json b/oauth-client/src/test/resources/private_key_with_tenant.json deleted file mode 100644 index 0730189e0f..0000000000 --- a/oauth-client/src/test/resources/private_key_with_tenant.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "client_id": "my-id", - "client_secret": "my-secret", - "tenant": "my-tenant" -} diff --git a/oauth-client/src/test/resources/private_key_with_tenant_and_groupId.json b/oauth-client/src/test/resources/private_key_with_tenant_and_groupId.json deleted file mode 100644 index 544894ea19..0000000000 --- a/oauth-client/src/test/resources/private_key_with_tenant_and_groupId.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "client_id": "my-id", - "client_secret": "my-secret", - "tenant": "my-tenant", - "group_id": "my-group-id" -} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CustomOAuthBearerCallbackHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CustomOAuthBearerCallbackHandlerTest.java index 9f311087b3..6c27876775 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CustomOAuthBearerCallbackHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/CustomOAuthBearerCallbackHandlerTest.java @@ -182,11 +182,6 @@ public String tenant() { return null; } - @Override - public String groupId() { - return null; - } - @Override public Long startTimeMs() { return null; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/HydraOAuthUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/HydraOAuthUtils.java index 11c435fc5f..53faa8b26d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/HydraOAuthUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/HydraOAuthUtils.java @@ -21,7 +21,7 @@ import io.fusionauth.jwks.domain.JSONWebKey; import io.jsonwebtoken.SignatureAlgorithm; import io.jsonwebtoken.security.Keys; -import io.streamnative.pulsar.handlers.kop.security.oauth.ClientCredentialsFlow; +import io.streamnative.pulsar.handlers.kop.security.oauth.ClientConfig; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; @@ -46,7 +46,7 @@ public class HydraOAuthUtils { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectWriter CLIENT_INFO_WRITER = - OBJECT_MAPPER.writerFor(ClientCredentialsFlow.ClientInfo.class); + OBJECT_MAPPER.writerFor(ClientConfig.ClientInfo.class); private static String publicKey; @@ -121,8 +121,8 @@ public static String writeCredentialsFile(String clientId, String tenant, String groupId, String basename) throws IOException { - ClientCredentialsFlow.ClientInfo clientInfo = - new ClientCredentialsFlow.ClientInfo(clientId, clientSecret, tenant, groupId); + ClientConfig.ClientInfo clientInfo = + new ClientConfig.ClientInfo(clientId, clientSecret, tenant, groupId); final String content = CLIENT_INFO_WRITER.writeValueAsString(clientInfo); File file = File.createTempFile("oauth-credentials-", basename); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenDataTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenDataTest.java deleted file mode 100644 index 5efaa6838c..0000000000 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ExtensionTokenDataTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.streamnative.pulsar.handlers.kop.security.oauth; - -import static org.testng.Assert.assertEquals; - -import java.io.IOException; -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.Test; - -@Slf4j -public class ExtensionTokenDataTest { - - @Test - public void testEncodeAndDecode() throws IOException { - ExtensionTokenData raw = new ExtensionTokenData("my-tenant", "my-group-id"); - String encode = raw.encode(); - log.info("The encode string: {}", encode); - ExtensionTokenData decode = ExtensionTokenData.decode(encode); - assertEquals(decode, raw); - } -} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandlerTest.java index b31885ed1a..85becdcd1c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthValidatorCallbackHandlerTest.java @@ -51,15 +51,12 @@ public void testHandleCallback() throws AuthenticationException { doReturn(CompletableFuture.completedFuture(null)).when(state).authenticateAsync(any()); KopOAuthBearerValidatorCallback callbackWithTenant = - new KopOAuthBearerValidatorCallback("my-token" - + OAuthTokenDecoder.EXTENSION_DATA_DELIMITER - + "eyJ0ZW5hbnQiOiJteS10ZW5hbnQiLCJncm91cElkIjoibXktZ3JvdXAtaWQifQ=="); + new KopOAuthBearerValidatorCallback("my-tenant" + OAuthTokenDecoder.DELIMITER + "my-token"); handler.handleCallback(callbackWithTenant); KopOAuthBearerToken token = callbackWithTenant.token(); assertEquals(token.tenant(), "my-tenant"); - assertEquals(token.groupId(), "my-group-id"); assertEquals(token.value(), "my-token"); KopOAuthBearerValidatorCallback callbackWithoutTenant =