diff --git a/osgp/shared/osgp-kafka-config/src/main/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/BasicOAuthBearerToken.java b/osgp/shared/osgp-kafka-config/src/main/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/BasicOAuthBearerToken.java
deleted file mode 100644
index 2cf96e34559..00000000000
--- a/osgp/shared/osgp-kafka-config/src/main/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/BasicOAuthBearerToken.java
+++ /dev/null
@@ -1,133 +0,0 @@
-// Copyright 2022 Alliander N.V.
-// SPDX-FileCopyrightText: Contributors to the GXF project
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package org.opensmartgridplatform.shared.application.config.kafka.oauth;
-
-import java.util.Set;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
-
-/**
- * An implementation of the {@link OAuthBearerToken} that fairly straightforwardly stores the values
- * given to its constructor (except the scope set which is copied to avoid modifications).
- *
- *
Very little validation is applied here with respect to the validity of the given values. All
- * validation is assumed to happen by users of this class.
- *
- * @see RFC 7515: JSON Web Signature (JWS)
- */
-public class BasicOAuthBearerToken implements OAuthBearerToken {
-
- private final String token;
-
- private final Set scopes;
-
- private final Long lifetimeMs;
-
- private final String principalName;
-
- private final Long startTimeMs;
-
- /**
- * Creates a new OAuthBearerToken instance around the given values.
- *
- * @param token Value containing the compact serialization as a base 64 string that can be parsed,
- * decoded, and validated as a well-formed JWS. Must be non-null
, non-blank, and
- * non-whitespace only.
- * @param scopes Set of non-null
scopes. May contain case-sensitive "duplicates". The
- * given set is copied and made unmodifiable so neither the caller of this constructor nor any
- * downstream users can modify it.
- * @param lifetimeMs The token's lifetime, expressed as the number of milliseconds since the
- * epoch. Must be non-negative.
- * @param principalName The name of the principal to which this credential applies. Must be non-
- * null
, non-blank, and non-whitespace only.
- * @param startTimeMs The token's start time, expressed as the number of milliseconds since the
- * epoch, if available, otherwise null
. Must be non-negative if a non-null
- *
value is provided.
- */
- public BasicOAuthBearerToken(
- String token, Set scopes, long lifetimeMs, String principalName, Long startTimeMs) {
- this.token = token;
- this.scopes = scopes;
- this.lifetimeMs = lifetimeMs;
- this.principalName = principalName;
- this.startTimeMs = startTimeMs;
- }
-
- /**
- * The b64token
value as defined in RFC 6750 Section 2.1
- *
- * @return b64token
value as defined in RFC 6750 Section 2.1
- */
- @Override
- public String value() {
- return token;
- }
-
- /**
- * The token's scope of access, as per RFC 6749 Section 1.4
- *
- * @return the token's (always non-null but potentially empty) scope of access, as per RFC 6749 Section 1.4. Note that
- * all values in the returned set will be trimmed of preceding and trailing whitespace, and
- * the result will never contain the empty string.
- */
- @Override
- public Set scope() {
- // Immutability of the set is performed in the constructor/validation utils class, so
- // we don't need to repeat it here.
- return scopes;
- }
-
- /**
- * The token's lifetime, expressed as the number of milliseconds since the epoch, as per RFC 6749 Section 1.4
- *
- * @return the token's lifetime, expressed as the number of milliseconds since the epoch, as per
- * RFC 6749 Section 1.4.
- */
- @Override
- public long lifetimeMs() {
- return lifetimeMs;
- }
-
- /**
- * The name of the principal to which this credential applies
- *
- * @return the always non-null/non-empty principal name
- */
- @Override
- public String principalName() {
- return principalName;
- }
-
- /**
- * When the credential became valid, in terms of the number of milliseconds since the epoch, if
- * known, otherwise null. An expiring credential may not necessarily indicate when it was created
- * -- just when it expires -- so we need to support a null return value here.
- *
- * @return the time when the credential became valid, in terms of the number of milliseconds since
- * the epoch, if known, otherwise null
- */
- @Override
- public Long startTimeMs() {
- return startTimeMs;
- }
-
- @Override
- public String toString() {
- return new ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE)
- .append("token", this.token)
- .append("scopes", this.scopes)
- .append("lifetimeMs", this.lifetimeMs)
- .append("principalName", this.principalName)
- .append("startTimeMs", this.startTimeMs)
- .build();
- }
-}
diff --git a/osgp/shared/osgp-kafka-config/src/main/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/KafkaOAuthException.java b/osgp/shared/osgp-kafka-config/src/main/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/KafkaOAuthException.java
deleted file mode 100644
index dc30255398e..00000000000
--- a/osgp/shared/osgp-kafka-config/src/main/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/KafkaOAuthException.java
+++ /dev/null
@@ -1,11 +0,0 @@
-// SPDX-FileCopyrightText: Contributors to the GXF project
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package org.opensmartgridplatform.shared.application.config.kafka.oauth;
-
-public class KafkaOAuthException extends RuntimeException {
- public KafkaOAuthException(String s, Throwable throwable) {
- super(s, throwable);
- }
-}
diff --git a/osgp/shared/osgp-kafka-config/src/main/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/OAuthAuthenticateCallbackHandler.java b/osgp/shared/osgp-kafka-config/src/main/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/OAuthAuthenticateCallbackHandler.java
deleted file mode 100644
index 201adfe5c9c..00000000000
--- a/osgp/shared/osgp-kafka-config/src/main/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/OAuthAuthenticateCallbackHandler.java
+++ /dev/null
@@ -1,164 +0,0 @@
-// Copyright 2022 Alliander N.V.
-// SPDX-FileCopyrightText: Contributors to the GXF project
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package org.opensmartgridplatform.shared.application.config.kafka.oauth;
-
-import com.microsoft.aad.msal4j.ClientCredentialFactory;
-import com.microsoft.aad.msal4j.ClientCredentialParameters;
-import com.microsoft.aad.msal4j.ConfidentialClientApplication;
-import com.microsoft.aad.msal4j.IAuthenticationResult;
-import com.microsoft.aad.msal4j.IClientAssertion;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.AppConfigurationEntry;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OAuthAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(OAuthAuthenticateCallbackHandler.class);
-
- public static final String CLIENT_ID_CONFIG = "clientId";
- public static final String CLIENT_ID_DOC = "Client id of the azure ad OAuth client";
- public static final String TOKEN_ENDPOINT_CONFIG = "tokenEndpoint";
- public static final String TOKEN_ENDPOINT_DOC = "Token endpoint of the azure ad OAuth client";
- public static final String SCOPE_CONFIG = "scope";
- public static final String SCOPE_DOC = "Scope of the OAuth JWT token";
- public static final String TOKEN_FILE_CONFIG = "tokenFile";
- public static final String TOKEN_FILE_DOC =
- "Path of the file containing the token needed for retrieving the OAuth JWT token";
-
- protected String tokenFilePath;
- protected String tokenEndPoint;
- protected String clientId;
- protected Set scope;
-
- @Override
- public void configure(
- final Map configs,
- final String saslMechanism,
- final List jaasConfigEntries) {
- Map options = getOptions(saslMechanism, jaasConfigEntries);
- setFields(options);
- }
-
- void setFields(final Map options) {
- this.clientId = getProperty(CLIENT_ID_CONFIG, options);
- this.tokenEndPoint = getProperty(TOKEN_ENDPOINT_CONFIG, options);
- this.scope =
- Arrays.stream(getProperty(SCOPE_CONFIG, options).split(",")).collect(Collectors.toSet());
- this.tokenFilePath = getProperty(TOKEN_FILE_CONFIG, options);
- }
-
- private Map getOptions(
- String saslMechanism, List jaasConfigEntries) {
- if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
- throw new IllegalArgumentException(
- String.format("Unexpected SASL mechanism: %s", saslMechanism));
-
- if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
- throw new IllegalArgumentException(
- String.format(
- "Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
- jaasConfigEntries.size()));
-
- return Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
- }
-
- private String getProperty(final String propertyName, final Map properties) {
- if (!properties.containsKey(propertyName)) {
- throw new ConfigException(String.format("Kafka property: %s, not supplied", propertyName));
- }
- if (properties.get(propertyName) == null) {
- throw new ConfigException(String.format("Kafka property: %s, is null", propertyName));
- }
- if (!(properties.get(propertyName) instanceof String)) {
- throw new ConfigException(
- String.format("Kafka property: %s, is not of type String", propertyName));
- }
- return (String) properties.get(propertyName);
- }
-
- @Override
- public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (final Callback callback : callbacks) {
- if (callback instanceof OAuthBearerTokenCallback) {
- final OAuthBearerToken token = this.getToken();
- ((OAuthBearerTokenCallback) callback).token(token);
- } else if (callback instanceof OAuthBearerValidatorCallback) {
- LOGGER.info("Validate callback");
- throw new UnsupportedCallbackException(callback, "Validate not yet implemented");
- } else {
- throw new UnsupportedCallbackException(
- callback, "Unknown callback type " + callback.getClass().getName());
- }
- }
- }
-
- /** Retrieves a new JWT token from Azure Active Directory. */
- protected OAuthBearerToken getToken() {
- try {
- LOGGER.debug("Retrieving Kafka OAuth Token");
-
- String token = readTokenFile(tokenFilePath);
- final IClientAssertion credential = ClientCredentialFactory.createFromClientAssertion(token);
- ClientCredentialParameters aadParameters = ClientCredentialParameters.builder(scope).build();
- ConfidentialClientApplication aadClient =
- ConfidentialClientApplication.builder(clientId, credential)
- .authority(tokenEndPoint)
- .build();
- final IAuthenticationResult authResult = aadClient.acquireToken(aadParameters).get();
-
- return new BasicOAuthBearerToken(
- authResult.accessToken(),
- scope,
- authResult.expiresOnDate().toInstant().toEpochMilli(),
- aadClient.clientId(),
- System.currentTimeMillis());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new KafkaOAuthException("Retrieving JWT token was interrupted", e);
- } catch (final Exception e) {
- throw new KafkaOAuthException("Caught an exception while retrieving JWT token", e);
- }
- }
-
- /**
- * Reads the content of the token file
- *
- * @return content of the token file
- */
- protected static String readTokenFile(final String tokenFilePath) {
- try {
- File tokenFile = new File(tokenFilePath);
- final byte[] bytes = Files.readAllBytes(tokenFile.toPath());
- return new String(bytes, StandardCharsets.UTF_8);
- } catch (IOException e) {
- throw new KafkaOAuthException("Could not read Token file from: " + tokenFilePath, e);
- }
- }
-
- @Override
- public void close() {
- // No need to close an oauth session, the token will expire automatically
- }
-}
diff --git a/osgp/shared/osgp-kafka-config/src/test/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/OAuthAuthenticateCallbackHandlerTest.java b/osgp/shared/osgp-kafka-config/src/test/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/OAuthAuthenticateCallbackHandlerTest.java
deleted file mode 100644
index 7b7d258277f..00000000000
--- a/osgp/shared/osgp-kafka-config/src/test/java/org/opensmartgridplatform/shared/application/config/kafka/oauth/OAuthAuthenticateCallbackHandlerTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-// SPDX-FileCopyrightText: Contributors to the GXF project
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package org.opensmartgridplatform.shared.application.config.kafka.oauth;
-
-import static org.junit.jupiter.api.Assertions.*;
-import static org.opensmartgridplatform.shared.application.config.kafka.oauth.OAuthAuthenticateCallbackHandler.*;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import javax.security.auth.callback.Callback;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-class OAuthAuthenticateCallbackHandlerTest {
-
- private Map correctProperties() {
- Map properties = new HashMap<>();
- properties.put(CLIENT_ID_CONFIG, "client-id");
- properties.put(TOKEN_ENDPOINT_CONFIG, "https://token.server.com");
- properties.put(TOKEN_FILE_CONFIG, "file");
- properties.put(SCOPE_CONFIG, "scope-one,scope-two");
- return properties;
- }
-
- @Test
- void successfulConfigure() {
- final Map properties = correctProperties();
-
- OAuthAuthenticateCallbackHandler handler = new OAuthAuthenticateCallbackHandler();
- handler.setFields(properties);
-
- assertEquals("client-id", handler.clientId);
- assertEquals("file", handler.tokenFilePath);
- assertEquals("https://token.server.com", handler.tokenEndPoint);
- assertEquals(new HashSet<>(Arrays.asList("scope-one", "scope-two")), handler.scope);
- }
-
- @Test
- void noClientIdConfigured() {
- final Map properties = correctProperties();
- properties.remove(CLIENT_ID_CONFIG);
-
- OAuthAuthenticateCallbackHandler handler = new OAuthAuthenticateCallbackHandler();
- KafkaException kafkaException =
- assertThrows(KafkaException.class, () -> handler.setFields(properties));
-
- assertEquals("Kafka property: clientId, not supplied", kafkaException.getMessage());
- }
-
- @Test
- void noTokenEndpointConfigured() {
- final Map properties = correctProperties();
- properties.remove(TOKEN_ENDPOINT_CONFIG);
-
- OAuthAuthenticateCallbackHandler handler = new OAuthAuthenticateCallbackHandler();
-
- KafkaException kafkaException =
- assertThrows(KafkaException.class, () -> handler.setFields(properties));
- assertEquals("Kafka property: tokenEndpoint, not supplied", kafkaException.getMessage());
- }
-
- @Test
- void noFileConfigured() {
- final Map properties = correctProperties();
- properties.remove(TOKEN_FILE_CONFIG);
-
- OAuthAuthenticateCallbackHandler handler = new OAuthAuthenticateCallbackHandler();
- KafkaException kafkaException =
- assertThrows(KafkaException.class, () -> handler.setFields(properties));
- assertEquals("Kafka property: tokenFile, not supplied", kafkaException.getMessage());
- }
-
- @Test
- void noScopeConfigured() {
- final Map properties = correctProperties();
- properties.remove(SCOPE_CONFIG);
-
- OAuthAuthenticateCallbackHandler handler = new OAuthAuthenticateCallbackHandler();
-
- KafkaException kafkaException =
- assertThrows(KafkaException.class, () -> handler.setFields(properties));
- assertEquals("Kafka property: scope, not supplied", kafkaException.getMessage());
- }
-
- @Test
- void incorrectConfig() {
- final Map properties = correctProperties();
- properties.replace(SCOPE_CONFIG, new String[] {"one", "two"});
-
- OAuthAuthenticateCallbackHandler handler = new OAuthAuthenticateCallbackHandler();
-
- KafkaException kafkaException =
- assertThrows(KafkaException.class, () -> handler.setFields(properties));
- assertEquals("Kafka property: scope, is not of type String", kafkaException.getMessage());
- }
-
- @Test
- void nullConfig() {
- final Map properties = correctProperties();
- properties.replace(SCOPE_CONFIG, null);
-
- OAuthAuthenticateCallbackHandler handler = new OAuthAuthenticateCallbackHandler();
-
- KafkaException kafkaException =
- assertThrows(KafkaException.class, () -> handler.setFields(properties));
- assertEquals("Kafka property: scope, is null", kafkaException.getMessage());
- }
-
- @Test
- void testTokenHandle() {
- OAuthAuthenticateCallbackHandler handler = Mockito.mock(OAuthAuthenticateCallbackHandler.class);
- Mockito.when(handler.getToken())
- .thenReturn(
- new BasicOAuthBearerToken(
- "test-jwt-token",
- new HashSet<>(Arrays.asList("scope-one", "scope-two")),
- 100,
- "principal-name",
- 10000L));
-
- final Map properties = correctProperties();
-
- handler.setFields(properties);
-
- try {
- OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
- handler.handle(new Callback[] {callback});
- assertEquals("test-jwt-token", callback.token().value());
- } catch (Exception ignored) {
- }
- }
-
- @Test
- void readFile() {
- String testTokenPath =
- OAuthAuthenticateCallbackHandlerTest.class.getResource("/token-file").getPath();
- String token = OAuthAuthenticateCallbackHandler.readTokenFile(testTokenPath);
-
- assertEquals("test-token-data\n", token);
- }
-
- @Test
- void readNonExistentFile() {
- KafkaOAuthException kafkaOAuthException =
- assertThrows(
- KafkaOAuthException.class,
- () -> OAuthAuthenticateCallbackHandler.readTokenFile("/non-existent-file"));
- assertEquals(
- "Could not read Token file from: /non-existent-file", kafkaOAuthException.getMessage());
- }
-}