Skip to content

Commit

Permalink
Support mqtt broker TLS-PSK. (#157)
Browse files Browse the repository at this point in the history
## Motivation
Support mqtt broker TLS-PSK.
  • Loading branch information
Technoboy- committed Sep 29, 2021
1 parent 58298d8 commit eb94a80
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 43 deletions.
36 changes: 28 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ openssl req -new -x509 -nodes -sha256 -days 365 -key server.key -out server.crt
```conf
...
tlsEnabled=true
mqttListeners=mqtt+ssl://127.0.0.1:8883
tlsCertificateFilePath=/xxx/server.crt
tlsKeyFilePath=/xxx/server.key
...
Expand Down Expand Up @@ -249,17 +250,15 @@ connection.connect();
...
```
#### TLS PSK with proxy
#### TLS PSK with broker
Please reference [here](https://en.wikipedia.org/wiki/TLS-PSK) to learn more about TLS-PSK.
1. Config mqtt proxy to load tls psk config.
1. Config mqtt broker to load tls psk config.
```conf
...
mqttProxyEnable=true
mqttProxyTlsPskEnabled=true
// default tls psk port
mqttProxyTlsPskPort=5684
tlsPskEnabled=true
mqttListeners=mqtt+ssl+psk://127.0.0.1:8884
// any string can be specified
tlsPskIdentityHint=alpha
// identity is semicolon list of string with identity:secret format
Expand All @@ -277,14 +276,35 @@ Optional configs
2. As current known mqtt Java client does not support TLS-PSK, it's better to verify this by `mosquitto cli`
```cli
# Default with tlsv1.2
mosquitto_pub --psk-identity mqtt --psk 6d717474313233 -p 5684 -t "/a/b/c" -m "hello mqtt"
mosquitto_pub --psk-identity mqtt --psk 6d717474313233 -p 8884 -t "/a/b/c" -m "hello mqtt"
# Test with tlsv1.1
mosquitto_pub --psk-identity mqtt --psk 6d717474313233 -p 5684 -t "/a/b/c" -m "hello mqtt" --tls-version tlsv1.1
mosquitto_pub --psk-identity mqtt --psk 6d717474313233 -p 8884 -t "/a/b/c" -m "hello mqtt" --tls-version tlsv1.1
```
- Download [mosquitto](https://mosquitto.org/download/) with Mac version.
- The secret `mqtt123` is converted to `6d717474313233` using [Hex Code Converter](https://www.rapidtables.com/convert/number/ascii-to-hex.html)
#### TLS PSK with proxy
1. Config mqtt proxy to load tls psk config.
```conf
...
mqttProxyEnable=true
mqttProxyTlsPskEnabled=true
// default tls psk port
mqttProxyTlsPskPort=5684
// any string can be specified
tlsPskIdentityHint=alpha
// identity is semicolon list of string with identity:secret format
tlsPskIdentity=mqtt:mqtt123
...
```
2. Test with `mosquitto cli`
```
mosquitto_pub --psk-identity mqtt --psk 6d717474313233 -p 5684 -t "/a/b/c" -m "hello mqtt"
```
## Topic Names & Filters
For Apache Pulsar, The topic name consists of 4 parts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl;
import io.streamnative.pulsar.handlers.mqtt.support.psk.PSKConfiguration;
import io.streamnative.pulsar.handlers.mqtt.support.psk.PSKUtils;
import java.util.Map;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -42,19 +44,29 @@ public class MQTTChannelInitializer extends ChannelInitializer<SocketChannel> {

private final Map<String, AuthenticationProvider> authProviders;
private final boolean enableTls;
private final boolean enableTlsPsk;
private final boolean tlsEnabledWithKeyStore;
private SslContextAutoRefreshBuilder<SslContext> sslCtxRefresher;
private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;
private PSKConfiguration pskConfiguration;

public MQTTChannelInitializer(PulsarService pulsarService,
MQTTServerConfiguration mqttConfig,
Map<String, AuthenticationProvider> authProviders,
boolean enableTls) {
this(pulsarService, mqttConfig, authProviders, enableTls, false);
}

public MQTTChannelInitializer(PulsarService pulsarService,
MQTTServerConfiguration mqttConfig,
Map<String, AuthenticationProvider> authProviders,
boolean enableTls, boolean enableTlsPsk) {
super();
this.pulsarService = pulsarService;
this.mqttConfig = mqttConfig;
this.authProviders = authProviders;
this.enableTls = enableTls;
this.enableTlsPsk = enableTlsPsk;
this.tlsEnabledWithKeyStore = mqttConfig.isTlsEnabledWithKeyStore();
if (this.enableTls) {
if (tlsEnabledWithKeyStore) {
Expand Down Expand Up @@ -82,21 +94,30 @@ public MQTTChannelInitializer(PulsarService pulsarService,
mqttConfig.isTlsRequireTrustedClientCertOnConnect(),
mqttConfig.getTlsCertRefreshCheckDurationSec());
}
} else if (this.enableTlsPsk) {
pskConfiguration = new PSKConfiguration();
pskConfiguration.setIdentityHint(mqttConfig.getTlsPskIdentityHint());
pskConfiguration.setIdentity(mqttConfig.getTlsPskIdentity());
pskConfiguration.setIdentityFile(mqttConfig.getTlsPskIdentityFile());
pskConfiguration.setProtocols(mqttConfig.getTlsProtocols());
pskConfiguration.setCiphers(mqttConfig.getTlsCiphers());
} else {
this.sslCtxRefresher = null;
}
}

@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(10, 0, 0));
ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(30, 0, 0));
if (this.enableTls) {
if (this.tlsEnabledWithKeyStore) {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
} else {
ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
}
} else if (this.enableTlsPsk) {
ch.pipeline().addLast(TLS_HANDLER, new SslHandler(PSKUtils.createServerEngine(ch, pskConfiguration)));
}
ch.pipeline().addLast("decoder", new MqttDecoder());
ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ public class MQTTCommonConfiguration extends ServiceConfiguration {
)
private int mqttProxyNumIOThreads = Runtime.getRuntime().availableProcessors();

@FieldContext(
category = CATEGORY_TLS,
required = false,
doc = "Whether broker start mqtt protocol handler with tls psk"
)
private boolean tlsPskEnabled = false;

@FieldContext(
category = CATEGORY_TLS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
package io.streamnative.pulsar.handlers.mqtt;

import static com.google.common.base.Preconditions.checkArgument;
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.LISTENER_DEL;
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.PLAINTEXT_PREFIX;
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.PROTOCOL_NAME;
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.SSL_PREFIX;
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.SSL_PSK_PREFIX;
import static io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils.getListenerPort;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -37,12 +43,6 @@
@Slf4j
public class MQTTProtocolHandler implements ProtocolHandler {

public static final String PROTOCOL_NAME = "mqtt";
public static final String PLAINTEXT_PREFIX = "mqtt://";
public static final String SSL_PREFIX = "mqtt+ssl://";
public static final String LISTENER_DEL = ",";
public static final String LISTENER_PATTEN = "^(mqtt)(\\+ssl)?://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";

private Map<String, AuthenticationProvider> authProviders;

@Getter
Expand Down Expand Up @@ -171,13 +171,20 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTChannelInitializer(brokerService.pulsar(), mqttConfig, authProviders, false));
} else if (listener.startsWith(SSL_PREFIX)) {

} else if (listener.startsWith(SSL_PREFIX) && mqttConfig.isTlsEnabled()) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTChannelInitializer(brokerService.pulsar(), mqttConfig, authProviders, true));

} else if (listener.startsWith(SSL_PSK_PREFIX) && mqttConfig.isTlsPskEnabled()) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTChannelInitializer(brokerService.pulsar(), mqttConfig, authProviders, false, true));

} else {
log.error("MQTT listener {} not supported. supports {} or {}",
listener, PLAINTEXT_PREFIX, SSL_PREFIX);
log.error("MQTT listener {} not supported. supports {}, {} or {}",
listener, PLAINTEXT_PREFIX, SSL_PREFIX, SSL_PSK_PREFIX);
}
}

Expand All @@ -195,10 +202,4 @@ public void close() {
}
}

public static int getListenerPort(String listener) {
checkArgument(listener.matches(LISTENER_PATTEN), "listener not match patten");

int lastIndex = listener.lastIndexOf(':');
return Integer.parseInt(listener.substring(lastIndex + 1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public MQTTProxyChannelInitializer(MQTTProxyService proxyService, MQTTProxyConfi

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(10, 0, 0));
ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(30, 0, 0));
if (this.enableTls) {
if (serverSslCtxRefresher != null) {
SslContext sslContext = serverSslCtxRefresher.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.mqtt.utils;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static org.apache.pulsar.common.util.FieldParser.setEmptyValue;
Expand All @@ -32,6 +33,13 @@
*/
public final class ConfigurationUtils {

public static final String PROTOCOL_NAME = "mqtt";
public static final String PLAINTEXT_PREFIX = "mqtt://";
public static final String SSL_PREFIX = "mqtt+ssl://";
public static final String SSL_PSK_PREFIX = "mqtt+ssl+psk://";
public static final String LISTENER_DEL = ",";
public static final String LISTENER_PATTERN = "^(mqtt)(\\+ssl)?(\\+psk)?://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";

/**
* Creates PulsarConfiguration and loads it with populated attribute values loaded from provided property file.
*
Expand Down Expand Up @@ -128,6 +136,14 @@ public static <T> void update(Map<String, String> properties, T obj) throws Ille
});
}


public static int getListenerPort(String listener) {
checkArgument(listener.matches(LISTENER_PATTERN), "listener not match pattern");

int lastIndex = listener.lastIndexOf(':');
return Integer.parseInt(listener.substring(lastIndex + 1));
}

private ConfigurationUtils() {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.untils;

import io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils;
import org.junit.Assert;
import org.testng.annotations.Test;

/**
* Unit test for the ConfigurationUtils.
*/
public class ConfigurationUtilsTest {

@Test
public void testGetListenerPort() {
String plainTextListener = "mqtt://127.0.0.1:1883";
Assert.assertEquals(ConfigurationUtils.getListenerPort(plainTextListener), 1883);
String sslListener = "mqtt+ssl://127.0.0.1:8883";
Assert.assertEquals(ConfigurationUtils.getListenerPort(sslListener), 8883);
String sslPskListener = "mqtt+ssl+psk://127.0.0.1:8884";
Assert.assertEquals(ConfigurationUtils.getListenerPort(sslPskListener), 8884);
try {
String sslInvalidListener = "mqtt+ssl+://127.0.0.1:8883";
ConfigurationUtils.getListenerPort(sslInvalidListener);
} catch (IllegalArgumentException ex) {
Assert.assertEquals(ex.getMessage(), "listener not match pattern");
}
try {
String sslPskInvalidListener = "mqtt+ssl+psk+://127.0.0.1:8884";
ConfigurationUtils.getListenerPort(sslPskInvalidListener);
} catch (IllegalArgumentException ex) {
Assert.assertEquals(ex.getMessage(), "listener not match pattern");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@
package io.streamnative.pulsar.handlers.mqtt;

import static org.mockito.Mockito.verify;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
import io.streamnative.pulsar.handlers.mqtt.psk.PSKClient;
import io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils;
import java.io.EOFException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -43,6 +52,16 @@ public class SimpleIntegrationTest extends MQTTTestBase {

private final int numMessages = 1000;

@Override
protected MQTTServerConfiguration initConfig() throws Exception {
MQTTServerConfiguration mqtt = super.initConfig();

mqtt.setTlsPskEnabled(true);
mqtt.setTlsPskIdentityHint("alpha");
mqtt.setTlsPskIdentity("mqtt:mqtt123");
return mqtt;
}

@Test(dataProvider = "mqttTopicNames", timeOut = TIMEOUT)
public void testSimpleMqttPubAndSubQos0(String topicName) throws Exception {
MQTT mqtt = createMQTTClient();
Expand Down Expand Up @@ -343,4 +362,22 @@ public void testInvalidClientId() throws Exception {
connection.connect();
verify(connection, Mockito.times(2)).connect();
}

@Test
@SneakyThrows
public void testTlsPskWithTlsv1() {
Bootstrap client = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
client.group(group);
client.channel(NioSocketChannel.class);
client.handler(new PSKClient("alpha", "mqtt", "mqtt123"));
AtomicBoolean connected = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
client.connect("localhost", mqttBrokerPortTlsPskList.get(0)).addListener((ChannelFutureListener) future -> {
connected.set(future.isSuccess());
latch.countDown();
});
latch.await();
Assert.assertTrue(connected.get());
}
}
Loading

0 comments on commit eb94a80

Please sign in to comment.