diff --git a/.github/workflows/pr_test.yml b/.github/workflows/pr_test.yml index af2eaf6ad..3b74a0051 100644 --- a/.github/workflows/pr_test.yml +++ b/.github/workflows/pr_test.yml @@ -23,17 +23,17 @@ jobs: check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Cache Maven packages - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: ~/.m2 key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2 - name: Set up JDK 17 - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: distribution: 'temurin' java-version: 17 @@ -55,7 +55,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 5 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Get All Tests id: list-test @@ -78,17 +78,17 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 20 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Cache Maven packages - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: ~/.m2 key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2 - name: Set up JDK 17 - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: distribution: 'temurin' java-version: 17 @@ -100,7 +100,7 @@ jobs: run: ./scripts/retry.sh mvn -B -ntp test -Dtest=${{ matrix.test }} -DfailIfNoTests=false - name: Upload jacoco artifact - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: ${{ matrix.test }}-jacoco-artifact path: '**/*.exec' @@ -125,17 +125,17 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 10 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Cache Maven packages - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: ~/.m2 key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2 - name: Set up JDK 17 - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: distribution: 'temurin' java-version: 17 @@ -144,7 +144,7 @@ jobs: run: mvn clean install -DskipTests - name: Download jacoco artifact - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v3 with: path: mqtt-impl/target diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index a2eda7925..4c30d1062 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -69,7 +70,8 @@ public class Connection { @Getter private final TopicSubscriptionManager topicSubscriptionManager; @Getter - private final MqttConnectMessage connectMessage; + @Setter + private MqttConnectMessage connectMessage; @Getter private final ClientRestrictions clientRestrictions; @Getter diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java index a0c663936..c5da59cf8 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java @@ -24,6 +24,8 @@ public final class Constants { public static final String AUTH_BASIC = "basic"; public static final String AUTH_TOKEN = "token"; + public static final String AUTH_MTLS = "mTls"; + public static final String ATTR_TOPIC_SUBS = "topicSubs"; public static final String MQTT_PROPERTIES = "MQTT_PROPERTIES_%d_"; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java index c474d2103..25dd2d74f 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java @@ -14,14 +14,17 @@ package io.streamnative.pulsar.handlers.mqtt; import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_BASIC; +import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS; import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_TOKEN; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.streamnative.pulsar.handlers.mqtt.identitypool.AuthenticationProviderMTls; import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.naming.AuthenticationException; +import javax.net.ssl.SSLSession; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -29,6 +32,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.service.BrokerService; /** * MQTT authentication service. @@ -42,8 +46,15 @@ public class MQTTAuthenticationService { @Getter private final Map authenticationProviders; - public MQTTAuthenticationService(AuthenticationService authenticationService, List authenticationMethods) { - this.authenticationService = authenticationService; + private final BrokerService brokerService; + + private final boolean mqttProxyMTlsAuthenticationEnabled; + + public MQTTAuthenticationService(BrokerService brokerService, List authenticationMethods, boolean + mqttProxyMTlsAuthenticationEnabled) { + this.brokerService = brokerService; + this.mqttProxyMTlsAuthenticationEnabled = mqttProxyMTlsAuthenticationEnabled; + this.authenticationService = brokerService.getAuthenticationService(); this.authenticationProviders = getAuthenticationProviders(authenticationMethods); } @@ -57,34 +68,49 @@ private Map getAuthenticationProviders(List entry : authenticationProviders.entrySet()) { String authMethod = entry.getKey(); try { - AuthenticationDataSource authData = getAuthData(authMethod, payload); + AuthenticationDataSource authData = getAuthData(authMethod, payload, session); userRole = entry.getValue().authenticate(authData); authenticated = true; authenticationDataSource = authData; @@ -116,12 +142,14 @@ public AuthenticationResult authenticate(String clientIdentifier, return new AuthenticationResult(authenticated, userRole, command); } - public AuthenticationDataSource getAuthData(String authMethod, MqttConnectPayload payload) { + public AuthenticationDataSource getAuthData(String authMethod, MqttConnectPayload payload, SSLSession session) { switch (authMethod) { case AUTH_BASIC: return new AuthenticationDataCommand(payload.userName() + ":" + payload.password()); case AUTH_TOKEN: return new AuthenticationDataCommand(payload.password()); + case AUTH_MTLS: + return new AuthenticationDataCommand(null, null, session); default: throw new IllegalArgumentException( String.format("Unsupported authentication method : %s!", authMethod)); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java index 676dc5a0e..a1a0c7f08 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java @@ -145,6 +145,13 @@ public class MQTTCommonConfiguration extends ServiceConfiguration { ) private boolean mqttProxyTlsEnabled = false; + @FieldContext( + category = CATEGORY_MQTT_PROXY, + required = false, + doc = "Whether use mTLS authenticate for mTLS connection" + ) + private boolean mqttProxyMTlsAuthenticationEnabled = false; + @FieldContext( category = CATEGORY_MQTT_PROXY, required = false, diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java index 26db52fa4..b47ccf460 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java @@ -96,8 +96,9 @@ public MQTTService(BrokerService brokerService, MQTTServerConfiguration serverCo this.metricsProvider = new MQTTMetricsProvider(metricsCollector); this.pulsarService.addPrometheusRawMetricsProvider(metricsProvider); this.authenticationService = serverConfiguration.isMqttAuthenticationEnabled() - ? new MQTTAuthenticationService(brokerService.getAuthenticationService(), - serverConfiguration.getMqttAuthenticationMethods()) : null; + ? new MQTTAuthenticationService(brokerService, + serverConfiguration.getMqttAuthenticationMethods(), + serverConfiguration.isMqttProxyMTlsAuthenticationEnabled()) : null; this.connectionManager = new MQTTConnectionManager(pulsarService.getAdvertisedAddress()); this.subscriptionManager = new MQTTSubscriptionManager(); if (getServerConfiguration().isMqttProxyEnabled()) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/AdapterChannel.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/AdapterChannel.java index ccf483d4c..1e43b74af 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/AdapterChannel.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/AdapterChannel.java @@ -51,7 +51,7 @@ public CompletableFuture writeAndFlush(final MqttAdapterMessage adapterMsg }); future.exceptionally(ex -> { log.warn("[AdapterChannel][{}] Proxy write to broker {} failed." - + " error message: {}", clientId, broker, ex.getMessage()); + + " adapterMsg message: {}", clientId, broker, adapterMsg, ex); return null; }); return future; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTAuthException.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTAuthException.java new file mode 100644 index 000000000..267536539 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTAuthException.java @@ -0,0 +1,35 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.exception; + +/** + * Internal server exception. + */ +public class MQTTAuthException extends Exception { + + public MQTTAuthException() { + } + + public MQTTAuthException(String message) { + super(message); + } + + public MQTTAuthException(String message, Throwable cause) { + super(message, cause); + } + + public MQTTAuthException(Throwable cause) { + super(cause); + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java index 6cad681cf..2f043aeda 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.mqtt.identitypool; - import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN; import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN_KEYS; import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SAN; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java index a64df78fa..6dafd3db5 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java @@ -13,6 +13,10 @@ */ package io.streamnative.pulsar.handlers.mqtt.proxy; +import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttConnectMessage; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttPublishMessage; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttSubscribeMessage; import com.google.common.collect.Lists; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttConnectMessage; @@ -64,17 +68,19 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; + /** * Proxy inbound handler is the bridge between proxy and MoP. */ @Slf4j public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMethodProcessor { + private final PulsarService pulsarService; + @Getter private Connection connection; private final LookupHandler lookupHandler; private final MQTTProxyConfiguration proxyConfig; - private final PulsarService pulsarService; private final Map> topicBrokers; private final Map adapterChannels; @Getter @@ -86,6 +92,7 @@ public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMeth private final MQTTConnectionManager connectionManager; private final SystemEventService eventService; private final MQTTProxyAdapter proxyAdapter; + private final AtomicBoolean isDisconnected = new AtomicBoolean(false); private final AutoSubscribeHandler autoSubscribeHandler; @@ -95,8 +102,9 @@ public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMeth public MQTTProxyProtocolMethodProcessor(MQTTProxyService proxyService, ChannelHandlerContext ctx) { super(proxyService.getAuthenticationService(), - proxyService.getProxyConfig().isMqttAuthenticationEnabled(), ctx); - this.pulsarService = proxyService.getPulsarService(); + proxyService.getProxyConfig().isMqttAuthenticationEnabled(), + ctx); + pulsarService = proxyService.getPulsarService(); this.lookupHandler = proxyService.getLookupHandler(); this.proxyConfig = proxyService.getProxyConfig(); this.connectionManager = proxyService.getConnectionManager(); @@ -115,7 +123,7 @@ public MQTTProxyProtocolMethodProcessor(MQTTProxyService proxyService, ChannelHa @Override public void doProcessConnect(MqttAdapterMessage adapter, String userRole, AuthenticationDataSource authData, ClientRestrictions clientRestrictions) { - final MqttConnectMessage msg = (MqttConnectMessage) adapter.getMqttMessage(); + MqttConnectMessage msg = (MqttConnectMessage) adapter.getMqttMessage(); final ServerRestrictions serverRestrictions = ServerRestrictions.builder() .receiveMaximum(proxyConfig.getReceiveMaximum()) .maximumPacketSize(proxyConfig.getMqttMessageMaxLength()) @@ -133,6 +141,12 @@ public void doProcessConnect(MqttAdapterMessage adapter, String userRole, .processor(this) .build(); connection.sendConnAck(); + if (proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { + MqttConnectMessage connectMessage = createMqttConnectMessage(msg, AUTH_MTLS, userRole); + msg = connectMessage; + connection.setConnectMessage(msg); + } + ConnectEvent connectEvent = ConnectEvent.builder() .clientId(connection.getClientId()) .address(pulsarService.getAdvertisedAddress()) @@ -152,6 +166,10 @@ public void processPublish(MqttAdapterMessage adapter) { proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(), TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain())); adapter.setClientId(connection.getClientId()); + if (proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { + MqttPublishMessage mqttMessage = createMqttPublishMessage(msg, AUTH_MTLS, connection.getUserRole()); + adapter.setMqttMessage(mqttMessage); + } startPublish() .thenCompose(__ -> writeToBroker(pulsarTopicName, adapter)) .whenComplete((unused, ex) -> { @@ -282,6 +300,10 @@ public void processSubscribe(final MqttAdapterMessage adapter) { log.debug("[Proxy Subscribe] [{}] msg: {}", clientId, msg); } registerTopicListener(adapter); + if (proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { + MqttSubscribeMessage mqttMessage = createMqttSubscribeMessage(msg, AUTH_MTLS, connection.getUserRole()); + adapter.setMqttMessage(mqttMessage); + } doSubscribe(adapter, false) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); @@ -447,8 +469,10 @@ private CompletableFuture connectToBroker(final String topic) { key -> lookupHandler.findBroker(TopicName.get(topic)).thenApply(mqttBroker -> adapterChannels.computeIfAbsent(mqttBroker, key1 -> { AdapterChannel adapterChannel = proxyAdapter.getAdapterChannel(mqttBroker); + final MqttConnectMessage connectMessage = connection.getConnectMessage(); + adapterChannel.writeAndFlush(new MqttAdapterMessage(connection.getClientId(), - connection.getConnectMessage())); + connectMessage)); return adapterChannel; }) ) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java index bc17954ab..968f2a40b 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java @@ -117,7 +117,7 @@ public void start() throws MQTTProxyException { throw new MQTTProxyException(e); } - if (proxyConfig.isMqttProxyTlsEnabled()) { + if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { ServerBootstrap tlsBootstrap = serverBootstrap.clone(); tlsBootstrap.childHandler(new MQTTProxyChannelInitializer( this, proxyConfig, true, sslContextRefresher)); @@ -148,7 +148,6 @@ public void start() throws MQTTProxyException { throw new MQTTProxyException(e); } } - this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig); this.eventService.start(); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java index 678e4c1e3..57f25a99f 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java @@ -21,10 +21,12 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; +import io.netty.handler.ssl.SslHandler; import io.streamnative.pulsar.handlers.mqtt.Connection; import io.streamnative.pulsar.handlers.mqtt.MQTTAuthenticationService; import io.streamnative.pulsar.handlers.mqtt.ProtocolMethodProcessor; import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTAuthException; import io.streamnative.pulsar.handlers.mqtt.exception.restrictions.InvalidReceiveMaximumException; import io.streamnative.pulsar.handlers.mqtt.messages.MqttPropertyUtils; import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttConnectAck; @@ -33,6 +35,7 @@ import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils; import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils; import io.streamnative.pulsar.handlers.mqtt.utils.NettyUtils; +import javax.net.ssl.SSLSession; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -123,8 +126,10 @@ public void processConnect(MqttAdapterMessage adapter) { clientId, username); } } else { - MQTTAuthenticationService.AuthenticationResult authResult = authenticationService - .authenticate(connectMessage); + MQTTAuthenticationService.AuthenticationResult authResult; + SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); + SSLSession session = (sslHandler != null) ? sslHandler.engine().getSession() : null; + authResult = authenticationService.authenticate(adapter.fromProxy(), session, connectMessage); if (authResult.isFailed()) { MqttMessage mqttMessage = MqttConnectAck.errorBuilder().authFail(protocolVersion); log.error("[CONNECT] Invalid or incorrect authentication. CId={}, username={}", clientId, username); @@ -157,6 +162,10 @@ public void processConnect(MqttAdapterMessage adapter) { } } + protected MQTTAuthenticationService.AuthenticationResult mtlsAuth(boolean fromProxy) throws MQTTAuthException { + return MQTTAuthenticationService.AuthenticationResult.FAILED; + } + @Override public void processPubAck(MqttAdapterMessage msg) { if (log.isDebugEnabled()) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index df7e5b4b7..3a4bed839 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.mqtt.support; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createWillMessage; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getMtlsAuthMethodAndData; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.pingResp; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.topicSubscriptions; import io.netty.channel.ChannelHandlerContext; @@ -74,7 +75,9 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -91,6 +94,7 @@ */ @Slf4j public class MQTTBrokerProtocolMethodProcessor extends AbstractCommonProtocolMethodProcessor { + private final PulsarService pulsarService; private final QosPublishHandlers qosPublishHandlers; private final MQTTServerConfiguration configuration; @@ -104,6 +108,7 @@ public class MQTTBrokerProtocolMethodProcessor extends AbstractCommonProtocolMet private final WillMessageHandler willMessageHandler; private final RetainedMessageHandler retainedMessageHandler; private final AutoSubscribeHandler autoSubscribeHandler; + @Getter private final CompletableFuture inactiveFuture = new CompletableFuture<>(); @@ -180,19 +185,28 @@ public void processPubAck(MqttAdapterMessage adapter) { @Override public void processPublish(MqttAdapterMessage adapter) { if (log.isDebugEnabled()) { - log.debug("[Publish] [{}] msg: {}", connection.getClientId(), adapter); + log.debug("[Publish] [{}] msg: {}", adapter.getClientId(), adapter); } MqttPublishMessage msg = (MqttPublishMessage) adapter.getMqttMessage(); CompletableFuture result; if (!configuration.isMqttAuthorizationEnabled()) { if (log.isDebugEnabled()) { log.debug("[Publish] authorization is disabled, allowing client. CId={}, userRole={}", - connection.getClientId(), connection.getUserRole()); + adapter.getClientId(), connection.getUserRole()); } result = doPublish(adapter); } else { + String userRole = connection.getUserRole(); + AuthenticationDataSource authData = connection.getAuthData(); + if (adapter.fromProxy()) { + final Optional> mtlsAuthMethodAndData = getMtlsAuthMethodAndData(msg); + if (mtlsAuthMethodAndData.isPresent()) { + userRole = mtlsAuthMethodAndData.get().getKey(); + authData = new AuthenticationDataCommand(new String(mtlsAuthMethodAndData.get().getValue())); + } + } result = this.authorizationService.canProduceAsync(TopicName.get(msg.variableHeader().topicName()), - connection.getUserRole(), connection.getAuthData()) + userRole, authData) .thenCompose(authorized -> authorized ? doPublish(adapter) : doUnauthorized(adapter)); } result.thenAccept(__ -> msg.release()) @@ -340,7 +354,7 @@ public boolean connectionEstablished() { public void processSubscribe(MqttAdapterMessage adapter) { MqttSubscribeMessage msg = (MqttSubscribeMessage) adapter.getMqttMessage(); final String clientId = connection.getClientId(); - final String userRole = connection.getUserRole(); + String userRole = connection.getUserRole(); final int packetId = msg.variableHeader().messageId(); if (log.isDebugEnabled()) { log.debug("[Subscribe] [{}] msg: {}", clientId, msg); @@ -351,15 +365,24 @@ public void processSubscribe(MqttAdapterMessage adapter) { } doSubscribe(msg); } else { + AuthenticationDataSource authData = connection.getAuthData(); + if (adapter.fromProxy()) { + final Optional> mtlsAuthMethodAndData = getMtlsAuthMethodAndData(msg); + if (mtlsAuthMethodAndData.isPresent()) { + userRole = mtlsAuthMethodAndData.get().getKey(); + authData = new AuthenticationDataCommand(new String(mtlsAuthMethodAndData.get().getValue())); + } + } List> authorizationFutures = new ArrayList<>(); AtomicBoolean authorizedFlag = new AtomicBoolean(true); for (MqttTopicSubscription topic: msg.payload().topicSubscriptions()) { + String finalUserRole = userRole; authorizationFutures.add(this.authorizationService.canConsumeAsync(TopicName.get(topic.topicName()), - userRole, connection.getAuthData(), userRole).thenAccept((authorized) -> { + userRole, authData, userRole).thenAccept((authorized) -> { if (!authorized) { authorizedFlag.set(false); log.warn("[Subscribe] no authorization to sub topic={}, userRole={}, CId= {}", - topic.topicName(), userRole, clientId); + topic.topicName(), finalUserRole, clientId); } })); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java index 706e1964c..dac7a9f24 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java @@ -15,26 +15,33 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; +import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import io.netty.handler.codec.mqtt.MqttVersion; import io.streamnative.pulsar.handlers.mqtt.support.MessageBuilder; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.tuple.Pair; /** * Mqtt message utils. @@ -182,6 +189,99 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage) return builder.build(); } + public static MqttConnectMessage createMqttConnectMessage(MqttConnectMessage connectMessage, + String authMethod, + String authData) { + final MqttConnectVariableHeader header = connectMessage.variableHeader(); + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value() + , authMethod)); + properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value() + , authData.getBytes())); + MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( + MqttVersion.MQTT_5.protocolName(), MqttVersion.MQTT_5.protocolLevel(), header.hasUserName(), + header.hasPassword(), header.isWillRetain(), header.willQos(), header.isWillFlag(), + header.isCleanSession(), header.keepAliveTimeSeconds(), properties + ); + MqttConnectMessage newConnectMessage = new MqttConnectMessage(connectMessage.fixedHeader(), variableHeader, + connectMessage.payload()); + return newConnectMessage; + } + + public static MqttPublishMessage createMqttPublishMessage(MqttPublishMessage publishMessage, + String authMethod, + String authData) { + final MqttPublishVariableHeader header = publishMessage.variableHeader(); + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value() + , authMethod)); + properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value() + , authData.getBytes())); + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader( + header.topicName(), header.packetId(), properties); + MqttPublishMessage newPublishMessage = new MqttPublishMessage(publishMessage.fixedHeader(), variableHeader, + publishMessage.payload()); + return newPublishMessage; + } + + public static Optional> getMtlsAuthMethodAndData(MqttConnectMessage connectMessage) { + final MqttConnectVariableHeader header = connectMessage.variableHeader(); + MqttProperties properties = header.properties(); + final MqttProperties.MqttProperty property = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()); + if (property != null && property.value() instanceof String + && ((String) property.value()).equalsIgnoreCase(AUTH_MTLS)) { + final MqttProperties.MqttProperty data = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()); + return Optional.of(Pair.of((String) property.value(), (byte[]) data.value())); + } + return Optional.empty(); + } + + public static Optional> getMtlsAuthMethodAndData(MqttPublishMessage publishMessage) { + final MqttPublishVariableHeader header = publishMessage.variableHeader(); + MqttProperties properties = header.properties(); + final MqttProperties.MqttProperty property = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()); + if (property != null && property.value() instanceof String + && ((String) property.value()).equalsIgnoreCase(AUTH_MTLS)) { + final MqttProperties.MqttProperty data = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()); + return Optional.of(Pair.of((String) property.value(), (byte[]) data.value())); + } + return Optional.empty(); + } + + public static Optional> getMtlsAuthMethodAndData(MqttSubscribeMessage subscribeMessage) { + final MqttMessageIdAndPropertiesVariableHeader header = subscribeMessage.idAndPropertiesVariableHeader(); + MqttProperties properties = header.properties(); + final MqttProperties.MqttProperty property = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()); + if (property != null && property.value() instanceof String + && ((String) property.value()).equalsIgnoreCase(AUTH_MTLS)) { + final MqttProperties.MqttProperty data = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()); + return Optional.of(Pair.of((String) property.value(), (byte[]) data.value())); + } + return Optional.empty(); + } + + public static MqttSubscribeMessage createMqttSubscribeMessage(MqttSubscribeMessage subscribeMessage, + String authMethod, + String authData) { + final MqttMessageIdAndPropertiesVariableHeader header = subscribeMessage.idAndPropertiesVariableHeader(); + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value() + , authMethod)); + properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value() + , authData.getBytes())); + MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader( + header.messageId(), properties); + MqttSubscribeMessage newSubscribeMessage = new MqttSubscribeMessage(subscribeMessage.fixedHeader(), + variableHeader, subscribeMessage.payload()); + return newSubscribeMessage; + } + public static MqttMessage createMqttDisconnectMessage() { return MessageBuilder.disconnect().build(); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java index c33529516..b64b851e7 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java @@ -42,6 +42,10 @@ public static boolean isMqtt3(int version) { public static boolean isNotMqtt3(int version) { return !isMqtt3(version); } + + public static boolean isMqtt5(int version) { + return version == MqttVersion.MQTT_5.protocolLevel(); + } public static boolean isQosSupported(MqttConnectMessage msg) { return isQosSupported(msg.fixedHeader().qosLevel()); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/Paths.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/Paths.java new file mode 100644 index 000000000..14d76b29e --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/Paths.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.utils; + +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import javax.validation.constraints.NotNull; +import lombok.experimental.UtilityClass; + +@UtilityClass +public final class Paths { + + public String getUrlEncodedPath(@NotNull String name) { + return URLEncoder.encode(name, StandardCharsets.UTF_8); + } + + public String getUrlDecodedPath(@NotNull String name) { + return URLDecoder.decode(name, StandardCharsets.UTF_8); + } +} diff --git a/pom.xml b/pom.xml index d5041e5b5..c95faa726 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,8 @@ + + io.grpc grpc-all diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java index 636c03c55..37a39d05f 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java @@ -96,7 +96,7 @@ public void testBacklogShouldBeZeroWithQos0() throws Exception { Topic[] topics = { new Topic(topicName, QoS.AT_MOST_ONCE) }; connection.subscribe(topics); String message = "Hello MQTT"; - int numMessages = 20000; + int numMessages = 200; for (int i = 0; i < numMessages; i++) { connection.publish(topicName, (message + i).getBytes(), QoS.AT_MOST_ONCE, false); } @@ -127,7 +127,7 @@ public void testBacklogShouldBeZeroWithQos1() throws Exception { Topic[] topics = { new Topic(topicName, QoS.AT_LEAST_ONCE) }; connection.subscribe(topics); String message = "Hello MQTT"; - int numMessages = 20000; + int numMessages = 200; for (int i = 0; i < numMessages; i++) { connection.publish(topicName, (message + i).getBytes(), QoS.AT_LEAST_ONCE, false); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ClientUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ClientUtils.java index 8183f4281..7307f152a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ClientUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ClientUtils.java @@ -36,6 +36,14 @@ public static Mqtt5BlockingClient createMqtt5ProxyClient(int proxyPort) { .buildBlocking(); } + public static Mqtt5BlockingClient createMqtt5ProxyClient(int proxyPort, String x) { + return Mqtt5Client.builder() + .identifier(UUID.randomUUID().toString()) + .serverHost("127.0.0.1") + .serverPort(proxyPort) + .buildBlocking(); + } + public static void publishQos1ARandomMsg(Mqtt5BlockingClient client, String topic) { client.publishWith() .topic(topic) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/ProxyMtlsTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/ProxyMtlsTest.java new file mode 100644 index 000000000..3bb89651f --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/ProxyMtlsTest.java @@ -0,0 +1,244 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.mqtt5.hivemq.base; + +import static io.streamnative.oidc.broker.common.pojo.Pool.AUTH_TYPE_MTLS; +import static org.mockito.Mockito.spy; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.hivemq.client.mqtt.MqttClientSslConfig; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import io.jsonwebtoken.SignatureAlgorithm; +import io.streamnative.oidc.broker.common.OIDCPoolResources; +import io.streamnative.oidc.broker.common.pojo.Pool; +import io.streamnative.pulsar.handlers.mqtt.MQTTCommonConfiguration; +import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import java.io.File; +import java.io.FileInputStream; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.util.Optional; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import javax.crypto.SecretKey; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.util.SecurityUtility; +import org.awaitility.Awaitility; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ProxyMtlsTest extends MQTTTestBase { + + + String path = "./src/test/resources/mtls/proxy/"; + + String token; + + MQTTCommonConfiguration localConfig; + + @Override + protected MQTTCommonConfiguration initConfig() throws Exception { + System.setProperty("jdk.security.allowNonCaAnchor", "true"); + enableTls = true; + MQTTCommonConfiguration mqtt = super.initConfig(); + + mqtt.setSystemTopicEnabled(false); + mqtt.setMqttProxyEnabled(true); + mqtt.setMqttProxyMTlsAuthenticationEnabled(true); + mqtt.setMqttProxyTlsEnabled(true); + mqtt.setMqttTlsRequireTrustedClientCertOnConnect(true); + mqtt.setMqttTlsAllowInsecureConnection(false); + + mqtt.setMqttTlsEnabledWithKeyStore(true); + mqtt.setMqttTlsKeyStoreType("JKS"); + mqtt.setMqttTlsKeyStore(path + "serverkeystore.jks"); + mqtt.setMqttTlsKeyStorePassword("123456"); + mqtt.setMqttTlsTrustStoreType("JKS"); + mqtt.setMqttTlsTrustStore(path + "truststore.jks"); + mqtt.setMqttTlsTrustStorePassword("123456"); + + SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); + token = AuthTokenUtils.createToken(secretKey, "superUser", Optional.empty()); + + mqtt.setAuthenticationEnabled(true); + mqtt.setMqttAuthenticationEnabled(true); + mqtt.setMqttAuthenticationMethods(ImmutableList.of("token")); + mqtt.setSuperUserRoles(ImmutableSet.of("superUser")); + mqtt.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName())); + + mqtt.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + mqtt.setBrokerClientAuthenticationParameters("token:" + token); + mqtt.setProperties(properties); + + localConfig = mqtt; + + return mqtt; + } + + @Override + public void afterSetup() throws Exception { + AuthenticationToken authToken = new AuthenticationToken(); + authToken.configure("token:" + token); + + pulsarClient = PulsarClient.builder() + .serviceUrl(brokerUrl.toString()) + .authentication(authToken) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + admin = spy(PulsarAdmin.builder() + .serviceHttpUrl(brokerUrl.toString()) + .authentication(authToken) + .build()); + + final PulsarService pulsarService = pulsarServiceList.get(0); + OIDCPoolResources oidcPoolResources = new OIDCPoolResources(pulsarService.getLocalMetadataStore()); + + Pool pool = new Pool("test-pool", AUTH_TYPE_MTLS, "d", "provider-1", "CN=='CLIENT'"); + oidcPoolResources.createPool(pool); + + Awaitility.await().until(() -> oidcPoolResources.getPool("test-pool") != null); + } + + public SSLContext createSSLContext() throws Exception { + File caCertFile = new File(path + "ca.cer"); + Certificate caCert = CertificateFactory + .getInstance("X.509").generateCertificate(new FileInputStream(caCertFile)); + + File clientCertFile = new File(path + "client.cer"); + Certificate clientCert = CertificateFactory + .getInstance("X.509").generateCertificate(new FileInputStream(clientCertFile)); + + PrivateKey privateKey = SecurityUtility.loadPrivateKeyFromPemFile(path + "client.key"); + + final SSLContext sslContext = SecurityUtility.createSslContext(true, + new Certificate[]{caCert}, new Certificate[]{clientCert}, privateKey); + + return sslContext; + } + + @Test + public void testMqtt3() throws Exception { + SSLContext sslContext = createSSLContext(); + MQTT mqtt = createMQTTProxyTlsClient(); + mqtt.setSslContext(sslContext); + + String topicName = "mqtt3"; + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + Topic[] topics = { new Topic(topicName, QoS.AT_MOST_ONCE) }; + connection.subscribe(topics); + String message = "Hello MQTT3"; + connection.publish(topicName, message.getBytes(), QoS.AT_MOST_ONCE, false); + Message received = connection.receive(); + Assert.assertEquals(received.getTopic(), topicName); + Assert.assertEquals(new String(received.getPayload()), message); + received.ack(); + connection.disconnect(); + } + + @Test + public void testMqtt5() throws Exception { + + KeyStore keyStore = KeyStore.getInstance("JKS"); + keyStore.load(new FileInputStream(path + "clientkeystore.jks"), "123456".toCharArray()); + + KeyStore trustStore = KeyStore.getInstance("JKS"); + trustStore.load(new FileInputStream(path + "truststore.jks"), "123456".toCharArray()); + + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, "123456".toCharArray()); + + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + + final MqttClientSslConfig sslConfig = MqttClientSslConfig.builder().keyManagerFactory(keyManagerFactory) + .trustManagerFactory(trustManagerFactory) + .build(); + + Random random = new Random(); + final Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("client-1") + .serverHost("localhost") + .sslConfig(sslConfig) + .serverPort(getMqttProxyPortTlsList().get(random.nextInt(getMqttProxyPortTlsList().size()))) + .buildBlocking(); + final Mqtt5BlockingClient client2 = Mqtt5Client.builder() + .identifier("client-2") + .serverHost("localhost") + .sslConfig(sslConfig) + .serverPort(getMqttProxyPortTlsList().get(random.nextInt(getMqttProxyPortTlsList().size()))) + .buildBlocking(); + + String topic1 = "testMqtt5-client-1"; + String topic2 = "testMqtt5-client-2"; + + client1.connect(); + client2.connect(); + client1.subscribeWith().topicFilter(topic1).qos(MqttQos.AT_LEAST_ONCE).send(); + client2.subscribeWith().topicFilter(topic2).qos(MqttQos.AT_LEAST_ONCE).send(); + byte[] msg1 = "client-1-payload".getBytes(); + byte[] msg2 = "client-2-payload".getBytes(); + client1.publishWith() + .topic(topic1) + .qos(MqttQos.AT_LEAST_ONCE) + .payload(msg1) + .send(); + + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5Publish publish = publishes.receive(); + Assert.assertEquals(publish.getTopic(), MqttTopic.of(topic1)); + Assert.assertEquals(publish.getPayloadAsBytes(), msg1); + } + // + client2.publishWith() + .topic(topic2) + .qos(MqttQos.AT_LEAST_ONCE) + .payload(msg2) + .send(); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client2.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5Publish publish = publishes.receive(); + Assert.assertEquals(publish.getTopic(), MqttTopic.of(topic2)); + Assert.assertEquals(publish.getPayloadAsBytes(), msg2); + } + client1.unsubscribeWith().topicFilter(topic1).send(); + client1.disconnect(); + client2.unsubscribeWith().topicFilter(topic1).send(); + client2.disconnect(); + } +} diff --git a/tests/src/test/resources/mtls/proxy/ca.cer b/tests/src/test/resources/mtls/proxy/ca.cer new file mode 100644 index 000000000..0fd518e1f --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca.cer @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDEDCCAfigAwIBAgIUMsRb5hhnGL727b9Lbdj/kVI2H1cwDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGUk9PVENBMB4XDTI0MDgyMzEwNTcxN1oXDTM0MDgyMTEw +NTcxN1owETEPMA0GA1UEAwwGUk9PVENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAoEpINahEpHzitKLvYJYuEGz1lDrdLqxbV39P2Lkxy9o+4nunNvwH +1dT3e0FBEzNBmA1ply0465MXGoTzljgnugeit2hfAfYTYLoDWZnKXfEz6qs1Oe4/ +K9LanD+W30nzoE5huq9jsoxHX6FDDu7driV/U+ffxGzYEteVAHj6EpwdOdLjW2uV +zDlL5V23NR3sSI23xIbIKng6/U6t8SSX70gCM7HG38KK8LzBDcaz10tixVQ0vtIm +g+xkNoDG1re21e6hI6Q62KNQqzygqEQ7AzbW0fe2M8RdviFGe2UW131FqD0DQ3J8 +C2F5aq6bjWPZc7vr/HQJf4n4od1P9cSg5QIDAQABo2AwXjAdBgNVHQ4EFgQUlwCH +1eZpUIPJWoAaFjn2GpUNiWcwHwYDVR0jBBgwFoAUlwCH1eZpUIPJWoAaFjn2GpUN +iWcwDwYDVR0TAQH/BAUwAwEB/zALBgNVHQ8EBAMCAYYwDQYJKoZIhvcNAQELBQAD +ggEBAHcjg7XIVgjKwAq79FVDc+l53YDpw0PpGvL6mI29Wrp2cR2gx0A89hyXGfZD +SxxWmQyx2Z/S+bocaJdJrFIikA1XgHD2LQYjLx2D5ONM8uFcsM1KfPcCAc18D0Y4 +o1lAkC4rk/b/Mv9OKGjtvpwHxDpgoXVBR4yFE9h9dvpOvae8pOPWpeY/P2dEVOo/ +1N7bc8K/yhOjdT/umB1x3xdpTX6mO90tzDSjWerCJTUZgADjHui5N8CHc4RH9+1b +xQNPFBIXgHi/UOYD6ekxoAfbnoAjC3BBKwsICnBDz6qX07uSl/5zbXl06CFWnOQr +O9LMX4bSgPDmi437cnBB0np3NvA= +-----END CERTIFICATE----- diff --git a/tests/src/test/resources/mtls/proxy/ca.crt b/tests/src/test/resources/mtls/proxy/ca.crt new file mode 100644 index 000000000..e0203cd22 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDEDCCAfigAwIBAgIUEJ+TKT+jKJrdq1wAsjhIukssB04wDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGUk9PVENBMB4XDTI0MDgyMzEwNTA1OFoXDTM0MDgyMTEw +NTA1OFowETEPMA0GA1UEAwwGUk9PVENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAoEpINahEpHzitKLvYJYuEGz1lDrdLqxbV39P2Lkxy9o+4nunNvwH +1dT3e0FBEzNBmA1ply0465MXGoTzljgnugeit2hfAfYTYLoDWZnKXfEz6qs1Oe4/ +K9LanD+W30nzoE5huq9jsoxHX6FDDu7driV/U+ffxGzYEteVAHj6EpwdOdLjW2uV +zDlL5V23NR3sSI23xIbIKng6/U6t8SSX70gCM7HG38KK8LzBDcaz10tixVQ0vtIm +g+xkNoDG1re21e6hI6Q62KNQqzygqEQ7AzbW0fe2M8RdviFGe2UW131FqD0DQ3J8 +C2F5aq6bjWPZc7vr/HQJf4n4od1P9cSg5QIDAQABo2AwXjAdBgNVHQ4EFgQUlwCH +1eZpUIPJWoAaFjn2GpUNiWcwHwYDVR0jBBgwFoAUlwCH1eZpUIPJWoAaFjn2GpUN +iWcwDwYDVR0TAQH/BAUwAwEB/zALBgNVHQ8EBAMCAYYwDQYJKoZIhvcNAQELBQAD +ggEBACh6PR76yNLMErrEqZcUhJngOQK0yZR9zjcDoAM4YDSx/qHP/rZA9j2HJdVz +oPAf1rU7QGmMojyQ7sj0jtViWtX9QJFtPcCR8vnwONDyC4hOwNAWLx9TeRp6ZZVL +Y8TN9ydo3jTj4ZjG9rPzQBH3+vErr+WtuIK5H5AvQCzRZPv1r78FBdhA6uQtm6EV +WZ81xJPLQirIDl7yWtCkE+325pOW8J48I6wa3EG/30quXjcaGzlPCAQMGsyjPBvC +m4c8dtEYbYEynMdRDhvgd3OI91L6eeGvEMQvOrLJWtB1zDHZ1xleHmNkBV625UTX +h+Ojtlo/0Nlw3mPABR/B++Rdeo8= +-----END CERTIFICATE----- diff --git a/tests/src/test/resources/mtls/proxy/ca.key b/tests/src/test/resources/mtls/proxy/ca.key new file mode 100644 index 000000000..f846ae7c3 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCgSkg1qESkfOK0 +ou9gli4QbPWUOt0urFtXf0/YuTHL2j7ie6c2/AfV1Pd7QUETM0GYDWmXLTjrkxca +hPOWOCe6B6K3aF8B9hNgugNZmcpd8TPqqzU57j8r0tqcP5bfSfOgTmG6r2OyjEdf +oUMO7t2uJX9T59/EbNgS15UAePoSnB050uNba5XMOUvlXbc1HexIjbfEhsgqeDr9 +Tq3xJJfvSAIzscbfworwvMENxrPXS2LFVDS+0iaD7GQ2gMbWt7bV7qEjpDrYo1Cr +PKCoRDsDNtbR97YzxF2+IUZ7ZRbXfUWoPQNDcnwLYXlqrpuNY9lzu+v8dAl/ifih +3U/1xKDlAgMBAAECggEAANcS1NEqdvA+ofK+rXNsD2L60ImUcjOuEOHWcczasXZV +4QsD39pnUhwZJfi8FhUtMVZUqNmwVC/DrbxnqHBn6OY0WKC/6rs288lVzijrsh1b +B85Y65JPm3Ox+KKusEHreHogsgFMVPm+QAaQ2umumMSSi8aJ2jY11bdyjPuVV/ae +YeFxs7C6Mk9uTb9KvESxjfxk0rOznXX5WGZLy1p4F2o1NX8I1ueVneeR7h5fDlzF +Tedl+BSU86JT1AFdbhsZJhzGShLPILACGQX6Rteim+C0sJyykuHQy+vh6/LKnYbW +j+JUkJWP3Hdk8gPyxwJ2foYh7Hw725RlPY5JlGtaYQKBgQDX4yf4UeuGgtNhGvVX +WshUrO37zlKsMu8FWBTyTfudDVcbcojeU8Kv0vW7PlCfyCvXnFE80/ipL/zedHYf +ZVCaZEPA+rOeCdBgVZ3FY59lw1h1DwQqYybwj/IuSzw3AqQ5f9tzxNG7noxdFX1T +BrY4VHbFYWPEMJJc5v2zGgALkQKBgQC+EprVUvZE6wbPdKFmg++fTKnwDybUrDKH +uB/garImkTqXOzPWVUdiw3XpWXvpCCwWxJ8yb9eaO4GfmCmhgKjgql1w2vxjDVNI +/3xZubeAR3WfAmIUgzB76L2Oz9xvzc1t/rnpvGQNihSGOnEogsXZBvRT+raVMPe+ +DHWu9qXOFQKBgAw2Gh2urI7YOZKljrkZNnmrqm5y1jRNUT3RJKYsCQ5yIbo4uUsy +G7IMUb/8n1zaWriAbAvvxYH0Z+5BUikmdu+0uixhQeWvkmzQivMOVobQDOHaLpcj +MqGq0r0Rnl9SM+3YsJYUzPQ63J+rRoJ6v7Xh+THi91yyjqTYoAMQdm4xAoGBAIYb +WmNpRZkaupNlFvvd2xPqY3ydNCiZ1o0rvFH69feAQHazrr9rLBLjFi6ulF63BWSL +Fkff4Z9QnQSdt8HbpUve6E7YM3svy7OVj4c/IdnAkZy/cbRHW84RSK2au02nR2p0 +b3gbE/z5j8GlOnH60t1tqrYWDvz0r9fHssDgBdyBAoGBAKB8sZCKpyKL66weEz+H +LFAqw+Fm3MdXFLx9tRm/0H0jlZXWviViYvbT1A+gZFMDqlX6Ivqxcxy/W0a3BXMn +9QKYMJFvbOjCI4qh+M/eyEofOtEAh/c5x7g1MYPaE08YaxoFyutP8MJszvWjnsuq +NgdFFE0gyRsHGmJdCDQ5dmVi +-----END PRIVATE KEY----- diff --git a/tests/src/test/resources/mtls/proxy/ca.srl b/tests/src/test/resources/mtls/proxy/ca.srl new file mode 100644 index 000000000..e46db4553 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca.srl @@ -0,0 +1 @@ +1EA9E0336372A6164A1F22BFB7C56CD4EA142F9F diff --git a/tests/src/test/resources/mtls/proxy/ca_config.cnf b/tests/src/test/resources/mtls/proxy/ca_config.cnf new file mode 100644 index 000000000..6c1edb524 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca_config.cnf @@ -0,0 +1,15 @@ +[ req ] +default_bits = 2048 +prompt = no +default_md = sha256 +distinguished_name = dn +x509_extensions = v3_ca + +[ dn ] +CN = ROOTCA + +[ v3_ca ] +subjectKeyIdentifier = hash +authorityKeyIdentifier = keyid:always,issuer +basicConstraints = critical,CA:true +keyUsage = digitalSignature, cRLSign, keyCertSign diff --git a/tests/src/test/resources/mtls/proxy/client.cer b/tests/src/test/resources/mtls/proxy/client.cer new file mode 100644 index 000000000..36f70ea7d --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/client.cer @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC9DCCAdygAwIBAgIUHqngM2NyphZKHyK/t8Vs1OoUL58wDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGUk9PVENBMCAXDTI0MDgyMzEwNTgzNloYDzIxMjQwNzMw +MTA1ODM2WjARMQ8wDQYDVQQDDAZDTElFTlQwggEiMA0GCSqGSIb3DQEBAQUAA4IB +DwAwggEKAoIBAQC0S6AnsYUBJJP3ZiOqPdqvjW4nbslKvTYFo2tfLnobOakuSKyD +Iisx9FlUHLFMslL3QZ6mp/U8pmZU6YORWo/f6eElLuGtQPSVxqTEZH0v+GY8N8HA +XshvXmh7CSkGFzbKJ9vyAN2rZ/XrgEP3bBSQZxVSloLaNjV+lXzs3JaULxvQL+EJ +ef38RC//qI7qPflaNveIpnaCpG+Rx8QnFDYeyi8BDS2PdeKwLzj4aXaW5qjoBC3v +qk9/XLqxMXTAX8Ty32E7HrgxhG0mA48gYg2T/Ba8RPykNGvM/cncRgC+B0IJJ1jY +XAgdg/C5Xsz7DveWeezWibripMQN0UrkTjcLAgMBAAGjQjBAMB0GA1UdDgQWBBQY +djFGcYoRFXb7lyZkoFqQJnML1DAfBgNVHSMEGDAWgBSXAIfV5mlQg8lagBoWOfYa +lQ2JZzANBgkqhkiG9w0BAQsFAAOCAQEAiATbrV3/uIuIJQbXBssuVOlTcUWXPAHf +bFGz31NgPnb0za2ApoJ+912orSxSAvlG10g+2Z34iW8+bciH8e1DF+cPQKVg0lkm +jaMQO1cUVw6e2aRHagIMvgEwcz+PqVJoLvWWp5sjqnlafu/ZuXjzeyefuFbxD3kI +EAuSp4juklHrjLZbDulUgGuodKnJ/plzRLKUYoKArdmCAulZRmxcKBw6oYjQWo3A +6lLLHgqaV2g2RuAQFP6qCTMGEWXu4F8ZWJC6vV0zEDMh6QKJdNH1RShbsWlfIxsW +TU6Pswt9EyDIo2Wd72n/sAC8pFxvM8tfhFsAOihHB1XBb2YhtNCORQ== +-----END CERTIFICATE----- diff --git a/tests/src/test/resources/mtls/proxy/client.csr b/tests/src/test/resources/mtls/proxy/client.csr new file mode 100644 index 000000000..0be3c4a35 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/client.csr @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICVjCCAT4CAQAwETEPMA0GA1UEAwwGQ0xJRU5UMIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEAtEugJ7GFASST92Yjqj3ar41uJ27JSr02BaNrXy56Gzmp +LkisgyIrMfRZVByxTLJS90Gepqf1PKZmVOmDkVqP3+nhJS7hrUD0lcakxGR9L/hm +PDfBwF7Ib15oewkpBhc2yifb8gDdq2f164BD92wUkGcVUpaC2jY1fpV87NyWlC8b +0C/hCXn9/EQv/6iO6j35Wjb3iKZ2gqRvkcfEJxQ2HsovAQ0tj3XisC84+Gl2luao +6AQt76pPf1y6sTF0wF/E8t9hOx64MYRtJgOPIGINk/wWvET8pDRrzP3J3EYAvgdC +CSdY2FwIHYPwuV7M+w73lnns1om64qTEDdFK5E43CwIDAQABoAAwDQYJKoZIhvcN +AQELBQADggEBAIoQFcm5h17AMtuZ2hdERwptzzQ8HxOC6Eb3NpfItRHLDKKMSuD9 +WZPixwz/53uF4jlcN+Uc5AONSKognp70pR3Ku9G47cEBb/iYF20OU6nd6a9+X4wr +r9SwkS/FlNb3A1UM0HuO9pXX3Raq3WR04gSWdJ2S7gbGWJp5vJiHbEwbIqb05lYG +sRdkVlJdGRbPmpVJmPib3wMidbm8K8SzLIDyLyTiMl92z7dBZq83tYAKdZ7D0TEb +k4rKu19Wi+EkOVe9qSdzWlgXLoNxct4rxpqbrWUg8DROtWFyQLA4WvjDzRnxFJgU +fhYb2IQXM1GUhNLGAdJOu8iaV2Dpd7fPGSw= +-----END CERTIFICATE REQUEST----- diff --git a/tests/src/test/resources/mtls/proxy/client.key b/tests/src/test/resources/mtls/proxy/client.key new file mode 100644 index 000000000..7d95fe83d --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/client.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0S6AnsYUBJJP3 +ZiOqPdqvjW4nbslKvTYFo2tfLnobOakuSKyDIisx9FlUHLFMslL3QZ6mp/U8pmZU +6YORWo/f6eElLuGtQPSVxqTEZH0v+GY8N8HAXshvXmh7CSkGFzbKJ9vyAN2rZ/Xr +gEP3bBSQZxVSloLaNjV+lXzs3JaULxvQL+EJef38RC//qI7qPflaNveIpnaCpG+R +x8QnFDYeyi8BDS2PdeKwLzj4aXaW5qjoBC3vqk9/XLqxMXTAX8Ty32E7HrgxhG0m +A48gYg2T/Ba8RPykNGvM/cncRgC+B0IJJ1jYXAgdg/C5Xsz7DveWeezWibripMQN +0UrkTjcLAgMBAAECggEAQ8Q9IU5HGMPf3diFRULUhLGbGrU4caAmwv3GqNL2UG9e +2Ke2N9/K7o7SWJwkRBiuuILwl+F/etlskzPmIOcyNs5YsropVw6YIAe2/J5ss3Ah +NTcb2yuFGN8aVEyAH+rvzBIpSI/swbVkqKzgXwo/vHsSd6Vc75n6h2a2uuy6qF1f +C4Jr1XXD4TBFRxzfdxpJOZlEHlDQ7Qj9csrHNrrN3QUtFzIPV8XYqAxqd/aoRkXL +VnmTZSH2fJvQR1XWdEcvLF9okulczUdTw0wUrYJAEh+rzhiGY/At3A1UVtzwqprZ +hVv866sC6PYY8q/tOCkTphBK5efzZ04dhJTNtKWqyQKBgQDYy7IsDFtDOmTQCL2h +sRO7qd+QBRnaIWNy6dcfo1WWGdsfBF8CpEnENBDaUT1v7NHYxkZdDeVeufXX6PDv +m55Yaz3NXC0kHyhH0PpWIhtOjEzCiyocdohiZalTFG+1sbpp7/XIagU+Vgoynotm +Nd0FipskzQmNbno8QWwZz4FxzQKBgQDU5jEe4TsGYmz6pLiphsIqm/ovndqAJ74G +dV4Q3Lm+qzQ79EAa1StYd05eG9EZWdbwkxgToulO3ANTh5PxgXCwmGYSAzGO2RXO ++/kv6hhBFsxgd8Ve+9AfLx1Wd9e2IJoUkhBwr3BRFIv2MhiV8PSBs4i4kidxUIZP +HDJJNUXUNwKBgQCC9cGClFBI8yxU8wLCevqFoZ9YG4y7VPIDR7jY9szLqIDSYsyW +BvI8oIsRpoOrae51uYhly/Aj4cfdjmyFAYeMt/OUazsll+C4SUf/4giG0X/JAVIF +8aB/eBPqCO1WX69RMVBSqaDTQBxW6akhrCYAo/MGLwm3MuaKIacQjGYQfQKBgC9r +OgObvO7WG1nUOIEhz7t31EioyxMCRxPfLl1pHEH4lgDIjUKsuiPRJvZVEcSouvQI +fzNYdMiovmDrcKs43mWm/A0FAxPDDFV2z/C5Hj/ZGRpfcumOArP/ZXRt6vDY4Bi2 +08yVdtnITsg+LjWvXnZJC6m4e+qEOfYC3LxrjisPAoGAUH1jW7oGLXUY/4YJpbRe +RNuRjX7K8ynJ/ztwsTFPM8AIl0lEdwdi1U6/Fo6CT1ls7jBPASOt8KCmYg9g8us5 +SWhMKvRD6ryWFqpOJUDQq3b7N2illdNW/kACi8aI5Ghup56iZT7yq+J+h0jxlj9y +gbL73lKp6ZOHeGKBOqDJLhI= +-----END PRIVATE KEY----- diff --git a/tests/src/test/resources/mtls/proxy/client.p12 b/tests/src/test/resources/mtls/proxy/client.p12 new file mode 100644 index 000000000..bffb931b5 Binary files /dev/null and b/tests/src/test/resources/mtls/proxy/client.p12 differ diff --git a/tests/src/test/resources/mtls/proxy/clientkeystore.jks b/tests/src/test/resources/mtls/proxy/clientkeystore.jks new file mode 100644 index 000000000..a5033dc9f Binary files /dev/null and b/tests/src/test/resources/mtls/proxy/clientkeystore.jks differ diff --git a/tests/src/test/resources/mtls/proxy/server.cer b/tests/src/test/resources/mtls/proxy/server.cer new file mode 100644 index 000000000..4da91bbc4 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/server.cer @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC9zCCAd+gAwIBAgIUHqngM2NyphZKHyK/t8Vs1OoUL54wDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGUk9PVENBMCAXDTI0MDgyMzEwNTcyN1oYDzIxMjQwNzMw +MTA1NzI3WjAUMRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUA +A4IBDwAwggEKAoIBAQCOLCxMqDrQQr+BME/PpoYlhpLFwi6YN0SdDlGRUfGZalfU +lpNc0S0frK4Oo4rVxpCPVlaAQYiw08vvjO4m3B9rZaizj34WqO+2NaowX/O/8Oc/ +oL1WwBd8cG2Le85sZ4Pz/8RKQePwTlHL6JaXT2UV0VOFF4J4msdvT1ybmKnTJEpK +2UC9t6h+FVH3BtFezIBECCPHzg5zOYlkk5ghPkCSAZkBmae6RgF4uCmd+fWDBGF6 +mfS6Ro9wXLGut1noSAJ5iJdJukiyKSUsiZcLEnY2VlHZ1rOY8O89gof2y8flQOFr +TWdap7hr7QtqzL1hrWxXhB2pywjlG3qmvajKZlzhAgMBAAGjQjBAMB0GA1UdDgQW +BBR3wGlX1K6gFp8wB6ezQbAVQ9SXPTAfBgNVHSMEGDAWgBSXAIfV5mlQg8lagBoW +OfYalQ2JZzANBgkqhkiG9w0BAQsFAAOCAQEAaiGKs18tRjVhTjaSx6K2qwSR6oWQ +YjXV2NhI39kq86NEs5SBBSwPJE2iTN25C/a5H3ijxuFokUGad26Q6vBJKHkXCPR8 +Ut/QCKJ4XID1WwBxzImWvRLV23ErrCL8QjavRdv4uRtPhJNk6EHB9T0BNODG98ua +t7/bsCVlfXXr9QKRwzuijbjbUXyY9TnslbfuY9I60TkVYhELHE9DEelQjh3WMZsA +vh6KA6rCYkhekiKG1MSppIZrqzOXi1KxEl6LUrmjML0kyLvTNGXAYv1yl9ZicAC+ +AdxYVn/+HiUNMfTnoLpnUMDQuVslq71Zxjd4NBWXsrZEpBX6iwjEkXmbRQ== +-----END CERTIFICATE----- diff --git a/tests/src/test/resources/mtls/proxy/server.csr b/tests/src/test/resources/mtls/proxy/server.csr new file mode 100644 index 000000000..2f074dc04 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/server.csr @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICWTCCAUECAQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0B +AQEFAAOCAQ8AMIIBCgKCAQEAjiwsTKg60EK/gTBPz6aGJYaSxcIumDdEnQ5RkVHx +mWpX1JaTXNEtH6yuDqOK1caQj1ZWgEGIsNPL74zuJtwfa2Wos49+FqjvtjWqMF/z +v/DnP6C9VsAXfHBti3vObGeD8//ESkHj8E5Ry+iWl09lFdFThReCeJrHb09cm5ip +0yRKStlAvbeofhVR9wbRXsyARAgjx84OczmJZJOYIT5AkgGZAZmnukYBeLgpnfn1 +gwRhepn0ukaPcFyxrrdZ6EgCeYiXSbpIsiklLImXCxJ2NlZR2dazmPDvPYKH9svH +5UDha01nWqe4a+0Lasy9Ya1sV4QdqcsI5Rt6pr2oymZc4QIDAQABoAAwDQYJKoZI +hvcNAQELBQADggEBAEYLR+oszYUnbhtJNSom3sIOrQbJhB4taBiixduF9PjCZSxs +EwyNcHnC3mw6NrYTdAJmPEIYIc567Wakqb03pZDeGnlmhZYjWpmI6/GTB1awgKsw +NP9x9HdDHq5U8LDjutPUvQD+CeWmtfHEdL6l44Vfg+G69sW06RlMt8vM+02eJbN2 +JY0zWBTQingn7BiIxPDkwzlMNo7uoxDa7fw7BWRBvg4F8VufJ4MR+gHCTkRHPjcT +TckJcOzzSMSxc1lD8Yhvam8sb9BDyWHid66YgYBht2+eXSBfwR1U0bXjay/uuzM9 +13JhknHfP4BLjvwqx8uySUfDhrx4AKBX7rnfMuQ= +-----END CERTIFICATE REQUEST----- diff --git a/tests/src/test/resources/mtls/proxy/server.key b/tests/src/test/resources/mtls/proxy/server.key new file mode 100644 index 000000000..03109a6b4 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCOLCxMqDrQQr+B +ME/PpoYlhpLFwi6YN0SdDlGRUfGZalfUlpNc0S0frK4Oo4rVxpCPVlaAQYiw08vv +jO4m3B9rZaizj34WqO+2NaowX/O/8Oc/oL1WwBd8cG2Le85sZ4Pz/8RKQePwTlHL +6JaXT2UV0VOFF4J4msdvT1ybmKnTJEpK2UC9t6h+FVH3BtFezIBECCPHzg5zOYlk +k5ghPkCSAZkBmae6RgF4uCmd+fWDBGF6mfS6Ro9wXLGut1noSAJ5iJdJukiyKSUs +iZcLEnY2VlHZ1rOY8O89gof2y8flQOFrTWdap7hr7QtqzL1hrWxXhB2pywjlG3qm +vajKZlzhAgMBAAECggEAJUZalPyUcu7VI8tlWXV4/VSznYrESZ9P63eKFjYsSENT +5MZXu4GSOZg+CFODfnnxg8soobbM3kOhV9yUwfZ7fF5qZS2NauZ+9sBKLyWxFBVW +o92CVsDFR5h4eBYNMGnRdEMOFUQCPYpjMrl35hLoV1iK872OtQ0rlbjyeE8F03fv +vc5OwiJwJAClKbmYN1T5bI4+NpRnqBUjq4EVNJVVsRPv/2us98MaHGwZwu/QkS7L +szMyp0PRFy35z5QOrLXVqmAKME8UIvT91GRxR/7fjUfoPc1gMCeNdsw/129N3m9t +EiUoNymDnq/fl5fQHm7i8Xw0/hkscrsBN6u4Nn2k4wKBgQDClwqXGd0BvTPHMZ3Q +W69L9xxZsD2rD4eg/5a9HVTjk6oF7jW+RlsKAaqkhqcgzC3VFLtNiqGEl4Iy7dye +vVXDRLwZPZvzqLKxNXSWyb1Oi2S2ZCQ8UNXEdguOTeWKqN0YTpoMjpS9aE2lnqjI +j+AtejT3QBYM5Gk7Tjq8H5by0wKBgQC7ClEfrWLvTe6RlPSGf/EGeoBueVnepeay +vdGKlrHadzrL0+OACWWytNPu18GgWiWQ/opCs4AO3MVakCq2RP1IB9xYBSvvh8YE +JLLHDUf2FuTrqEUGv/5q/tzw6zg1VBbqK/I64u7chy4K9Z7PMFH1jIwtGvx7wSz0 +lJ80TEOY+wKBgQC9uHfZ1JyXKnpGNwjv8cRa42Zfx6aIls6c2TWF0whGDl2SBKuf +hYcTnYZbfPGL16bD/r+TnZtW5EVMFH+qwVzR0r02OsxHZ245uq2eqrkI0H10GvT9 +Xa7eKl2f9DDd3UAbh99IaOd6OCykUqmrSiO/E0x62J/nF/fFm8NGUeJhgwKBgGZj +i20v2Uzu+H4xcHqAZ00Imm3feFbtspB+YMrhG5NgWYti/tMEUeu2GZ5R9ej18EKj +VMSWxq9cI50j9n243n4j73cvvZFuXfO+sy9MEeyhM8fdYYwEZh3kTsaDvU1ULwcZ +74xU1jVautW9B6ab6QHmYoX+k+CbKyTohw52ATspAoGAVgfESvUZiUJ+O78Nouoc +etCA594/JxAqOPg0pZuvoagiU2PP+B3ND7i9B89idT2tyj2FUYdpe1EoijllazZ6 +0Qm+hERo5vRykzUtpyUNcrf4AE66y2B2Rj/k0kQtc8T9VEbyKdX6SJeBxMCF4rQ7 +Jl7BtimswvszVVHLCcU0s3M= +-----END PRIVATE KEY----- diff --git a/tests/src/test/resources/mtls/proxy/server.p12 b/tests/src/test/resources/mtls/proxy/server.p12 new file mode 100644 index 000000000..630d515aa Binary files /dev/null and b/tests/src/test/resources/mtls/proxy/server.p12 differ diff --git a/tests/src/test/resources/mtls/proxy/serverkeystore.jks b/tests/src/test/resources/mtls/proxy/serverkeystore.jks new file mode 100644 index 000000000..6f9e628d3 Binary files /dev/null and b/tests/src/test/resources/mtls/proxy/serverkeystore.jks differ diff --git a/tests/src/test/resources/mtls/proxy/truststore.jks b/tests/src/test/resources/mtls/proxy/truststore.jks new file mode 100644 index 000000000..cb0fe1562 Binary files /dev/null and b/tests/src/test/resources/mtls/proxy/truststore.jks differ