Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mTLS authentication for MoP #1414

Merged
merged 23 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions .github/workflows/pr_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Cache Maven packages
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2

- name: Set up JDK 17
uses: actions/setup-java@v2
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: 17
Expand All @@ -55,7 +55,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Get All Tests
id: list-test
Expand All @@ -78,17 +78,17 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Cache Maven packages
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2

- name: Set up JDK 17
uses: actions/setup-java@v2
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: 17
Expand All @@ -100,7 +100,7 @@ jobs:
run: ./scripts/retry.sh mvn -B -ntp test -Dtest=${{ matrix.test }} -DfailIfNoTests=false

- name: Upload jacoco artifact
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: ${{ matrix.test }}-jacoco-artifact
path: '**/*.exec'
Expand All @@ -125,17 +125,17 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Cache Maven packages
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2

- name: Set up JDK 17
uses: actions/setup-java@v2
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: 17
Expand All @@ -144,7 +144,7 @@ jobs:
run: mvn clean install -DskipTests

- name: Download jacoco artifact
uses: actions/download-artifact@v2
uses: actions/download-artifact@v3
with:
path: mqtt-impl/target

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;

Expand All @@ -69,7 +70,8 @@ public class Connection {
@Getter
private final TopicSubscriptionManager topicSubscriptionManager;
@Getter
private final MqttConnectMessage connectMessage;
@Setter
private MqttConnectMessage connectMessage;
@Getter
private final ClientRestrictions clientRestrictions;
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public final class Constants {
public static final String AUTH_BASIC = "basic";
public static final String AUTH_TOKEN = "token";

public static final String AUTH_MTLS = "mTls";

public static final String ATTR_TOPIC_SUBS = "topicSubs";

public static final String MQTT_PROPERTIES = "MQTT_PROPERTIES_%d_";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@
package io.streamnative.pulsar.handlers.mqtt;

import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_BASIC;
import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS;
import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_TOKEN;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.streamnative.pulsar.handlers.mqtt.identitypool.AuthenticationProviderMTls;
import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.service.BrokerService;

/**
* MQTT authentication service.
Expand All @@ -42,8 +46,15 @@ public class MQTTAuthenticationService {
@Getter
private final Map<String, AuthenticationProvider> authenticationProviders;

public MQTTAuthenticationService(AuthenticationService authenticationService, List<String> authenticationMethods) {
this.authenticationService = authenticationService;
private final BrokerService brokerService;

private final boolean mqttProxyMTlsAuthenticationEnabled;

public MQTTAuthenticationService(BrokerService brokerService, List<String> authenticationMethods, boolean
mqttProxyMTlsAuthenticationEnabled) {
this.brokerService = brokerService;
this.mqttProxyMTlsAuthenticationEnabled = mqttProxyMTlsAuthenticationEnabled;
this.authenticationService = brokerService.getAuthenticationService();
this.authenticationProviders = getAuthenticationProviders(authenticationMethods);
}

Expand All @@ -57,34 +68,49 @@ private Map<String, AuthenticationProvider> getAuthenticationProviders(List<Stri
log.error("MQTT authentication method {} is not enabled in Pulsar configuration!", method);
}
}
if (mqttProxyMTlsAuthenticationEnabled) {
AuthenticationProviderMTls providerMTls = new AuthenticationProviderMTls();
try {
providerMTls.initialize(brokerService.pulsar().getLocalMetadataStore());
providers.put(AUTH_MTLS, providerMTls);
} catch (Exception e) {
log.error("Failed to initialize MQTT authentication method {} ", AUTH_MTLS, e);
}
}
if (providers.isEmpty()) {
throw new IllegalArgumentException(
"MQTT authentication is enabled but no providers were successfully configured");
}

return providers;
}

public AuthenticationResult authenticate(MqttConnectMessage connectMessage) {
public AuthenticationResult authenticate(boolean fromProxy,
SSLSession session, MqttConnectMessage connectMessage) {
String authMethod = MqttMessageUtils.getAuthMethod(connectMessage);
if (authMethod != null) {
byte[] authData = MqttMessageUtils.getAuthData(connectMessage);
if (authData == null) {
return AuthenticationResult.FAILED;
}
if (fromProxy && AUTH_MTLS.equalsIgnoreCase(authMethod)) {
return new AuthenticationResult(true, new String(authData),
new AuthenticationDataCommand(new String(authData), null, session));
}
return authenticate(connectMessage.payload().clientIdentifier(), authMethod,
new AuthenticationDataCommand(new String(authData)));
new AuthenticationDataCommand(new String(authData), null, session));
}
return authenticate(connectMessage.payload());
return authenticate(connectMessage.payload(), session);
}

public AuthenticationResult authenticate(MqttConnectPayload payload) {
public AuthenticationResult authenticate(MqttConnectPayload payload, SSLSession session) {
String userRole = null;
boolean authenticated = false;
AuthenticationDataSource authenticationDataSource = null;
for (Map.Entry<String, AuthenticationProvider> entry : authenticationProviders.entrySet()) {
String authMethod = entry.getKey();
try {
AuthenticationDataSource authData = getAuthData(authMethod, payload);
AuthenticationDataSource authData = getAuthData(authMethod, payload, session);
userRole = entry.getValue().authenticate(authData);
authenticated = true;
authenticationDataSource = authData;
Expand Down Expand Up @@ -116,12 +142,14 @@ public AuthenticationResult authenticate(String clientIdentifier,
return new AuthenticationResult(authenticated, userRole, command);
}

public AuthenticationDataSource getAuthData(String authMethod, MqttConnectPayload payload) {
public AuthenticationDataSource getAuthData(String authMethod, MqttConnectPayload payload, SSLSession session) {
switch (authMethod) {
case AUTH_BASIC:
return new AuthenticationDataCommand(payload.userName() + ":" + payload.password());
case AUTH_TOKEN:
return new AuthenticationDataCommand(payload.password());
case AUTH_MTLS:
return new AuthenticationDataCommand(null, null, session);
default:
throw new IllegalArgumentException(
String.format("Unsupported authentication method : %s!", authMethod));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
)
private boolean mqttProxyTlsEnabled = false;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
doc = "Whether use mTLS authenticate for mTLS connection"
)
private boolean mqttProxyMTlsAuthenticationEnabled = false;

@FieldContext(
category = CATEGORY_MQTT_PROXY,
required = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ public MQTTService(BrokerService brokerService, MQTTServerConfiguration serverCo
this.metricsProvider = new MQTTMetricsProvider(metricsCollector);
this.pulsarService.addPrometheusRawMetricsProvider(metricsProvider);
this.authenticationService = serverConfiguration.isMqttAuthenticationEnabled()
? new MQTTAuthenticationService(brokerService.getAuthenticationService(),
serverConfiguration.getMqttAuthenticationMethods()) : null;
? new MQTTAuthenticationService(brokerService,
serverConfiguration.getMqttAuthenticationMethods(),
serverConfiguration.isMqttProxyMTlsAuthenticationEnabled()) : null;
this.connectionManager = new MQTTConnectionManager(pulsarService.getAdvertisedAddress());
this.subscriptionManager = new MQTTSubscriptionManager();
if (getServerConfiguration().isMqttProxyEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public CompletableFuture<Void> writeAndFlush(final MqttAdapterMessage adapterMsg
});
future.exceptionally(ex -> {
log.warn("[AdapterChannel][{}] Proxy write to broker {} failed."
+ " error message: {}", clientId, broker, ex.getMessage());
+ " adapterMsg message: {}", clientId, broker, adapterMsg, ex);
return null;
});
return future;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.exception;

/**
* Internal server exception.
*/
public class MQTTAuthException extends Exception {

public MQTTAuthException() {
}

public MQTTAuthException(String message) {
super(message);
}

public MQTTAuthException(String message, Throwable cause) {
super(message, cause);
}

public MQTTAuthException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.streamnative.pulsar.handlers.mqtt.identitypool;


import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN;
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN_KEYS;
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SAN;
Expand Down
Loading
Loading