Skip to content

Commit

Permalink
Use Netty's MqttMessageBuilders when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
François Travais committed Oct 24, 2019
1 parent 44c8a2f commit 45ba163
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 112 deletions.
92 changes: 28 additions & 64 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,19 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
Expand All @@ -60,13 +59,11 @@
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.netty.handler.codec.mqtt.MqttQoS.*;

Expand All @@ -83,8 +80,6 @@ public class MqttClientImpl implements MqttClient {
private static final int MAX_MESSAGE_ID = 65535;
private static final int MAX_TOPIC_LEN = 65535;
private static final int MIN_TOPIC_LEN = 1;
private static final String PROTOCOL_NAME = "MQTT";
private static final int PROTOCOL_VERSION = 4;
private static final int DEFAULT_IDLE_TIMEOUT = 0;

private final MqttClientOptions options;
Expand Down Expand Up @@ -211,33 +206,20 @@ private void doConnect(int port, String host, String serverName, Handler<AsyncRe
// an exception at connection level
soi.exceptionHandler(this::handleException);

MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
AT_MOST_ONCE,
false,
0);

MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
options.hasUsername(),
options.hasPassword(),
options.isWillRetain(),
options.getWillQoS(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveTimeSeconds()
);

MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes() : null
);

io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
final MqttConnectMessage connect = MqttMessageBuilders.connect()
.hasUser(options.hasUsername())
.hasPassword(options.hasPassword())
.willRetain(options.isWillRetain())
.willQoS(MqttQoS.valueOf(options.getWillQoS()))
.willFlag(options.isWillFlag())
.willTopic(options.getWillTopic())
.willMessage(options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null)
.cleanSession(options.isCleanSession())
.keepAlive(options.getKeepAliveTimeSeconds())
.clientId(options.getClientId() == null ? "" : options.getClientId())
.username(options.hasUsername() ? options.getUsername() : null)
.password(options.hasPassword() ? options.getPassword().getBytes() : null)
.build();

this.write(connect);
}
Expand Down Expand Up @@ -440,27 +422,17 @@ public MqttClient subscribe(Map<String, Integer> topics, Handler<AsyncResult<Int
return this;
}

MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);
final MqttMessageBuilders.SubscribeBuilder subscribeBuilder = MqttMessageBuilders.subscribe()
.messageId(nextMessageId());

MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());
List<MqttTopicSubscription> subscriptions = topics.entrySet()
.stream()
.map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue())))
.collect(Collectors.toList());

MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);
topics.forEach((topic, qos) -> subscribeBuilder.addSubscription(MqttQoS.valueOf(qos), topic));

io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
final MqttSubscribeMessage subscribe = subscribeBuilder.build();

this.write(subscribe);

if (subscribeSentHandler != null) {
subscribeSentHandler.handle(Future.succeededFuture(variableHeader.messageId()));
subscribeSentHandler.handle(Future.succeededFuture(subscribe.variableHeader().messageId()));
}
return this;
}
Expand All @@ -486,23 +458,15 @@ private synchronized Handler<Integer> unsubscribeCompletionHandler() {
@Override
public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubscribeSentHandler) {

MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);

MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());

MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));

io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
final MqttUnsubscribeMessage unsubscribe = MqttMessageBuilders.unsubscribe()
.addTopicFilter(topic)
.messageId(nextMessageId())
.build();

this.write(unsubscribe);

if (unsubscribeSentHandler != null) {
unsubscribeSentHandler.handle(Future.succeededFuture(variableHeader.messageId()));
unsubscribeSentHandler.handle(Future.succeededFuture(unsubscribe.variableHeader().messageId()));
}
return this;
}
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
Expand Down Expand Up @@ -298,12 +299,10 @@ public MqttEndpointImpl exceptionHandler(Handler<Throwable> handler) {

private MqttEndpointImpl connack(MqttConnectReturnCode returnCode, boolean sessionPresent) {

MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader variableHeader =
new MqttConnAckVariableHeader(returnCode, sessionPresent);

io.netty.handler.codec.mqtt.MqttMessage connack = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
MqttConnAckMessage connack = MqttMessageBuilders.connAck()
.returnCode(returnCode)
.sessionPresent(sessionPresent)
.build();

this.write(connack);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,10 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.CharsetUtil;
import io.vertx.core.buffer.Buffer;
Expand All @@ -57,8 +52,6 @@
@RunWith(VertxUnitRunner.class)
public class MqttServerBadClientTest extends MqttServerBaseTest {

private static final String PROTOCOL_NAME = "MQTT";
private static final int PROTOCOL_VERSION = 4;
private static final String MQTT_TOPIC = "/my_topic";
private static final String MQTT_MESSAGE = "I'm a bad client";

Expand Down Expand Up @@ -183,44 +176,32 @@ public void unknownMessageType(TestContext context) {

private MqttPublishMessage createPublishMessage() {

MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, true, 0);

MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(MQTT_TOPIC, 1);

ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes(MQTT_MESSAGE.getBytes(CharsetUtil.UTF_8));

return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload);
return MqttMessageBuilders.publish()
.qos(MqttQoS.AT_LEAST_ONCE)
.retained(true)
.topicName(MQTT_TOPIC)
.messageId(1)
.payload(payload)
.build();
}

private MqttMessage createConnectPacket(MqttClientOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
MqttQoS.AT_MOST_ONCE,
false,
0);

MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
options.hasUsername(),
options.hasPassword(),
options.isWillRetain(),
options.getWillQoS(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveTimeSeconds()
);

MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes(StandardCharsets.UTF_8) : null
);

return MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
return MqttMessageBuilders.connect()
.hasUser(options.hasUsername())
.hasPassword(options.hasPassword())
.willRetain(options.isWillRetain())
.willQoS(MqttQoS.valueOf(options.getWillQoS()))
.willFlag(options.isWillFlag())
.willTopic(options.getWillTopic())
.willMessage(options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null)
.cleanSession(options.isCleanSession())
.keepAlive(options.getKeepAliveTimeSeconds())
.clientId(options.getClientId() == null ? "" : options.getClientId())
.username(options.hasUsername() ? options.getUsername() : null)
.password(options.hasPassword() ? options.getPassword().getBytes() : null)
.build();
}
}

0 comments on commit 45ba163

Please sign in to comment.