diff --git a/src/main/generated/io/vertx/mqtt/MqttClientOptionsConverter.java b/src/main/generated/io/vertx/mqtt/MqttClientOptionsConverter.java index 63eb15b0..f200fe10 100644 --- a/src/main/generated/io/vertx/mqtt/MqttClientOptionsConverter.java +++ b/src/main/generated/io/vertx/mqtt/MqttClientOptionsConverter.java @@ -80,9 +80,9 @@ static void fromJson(Iterable> json, MqttCli obj.setWillFlag((Boolean)member.getValue()); } break; - case "willMessage": + case "willMessageBytes": if (member.getValue() instanceof String) { - obj.setWillMessage((String)member.getValue()); + obj.setWillMessageBytes(io.vertx.core.buffer.Buffer.buffer(BASE64_DECODER.decode((String)member.getValue()))); } break; case "willQoS": @@ -127,8 +127,8 @@ static void toJson(MqttClientOptions obj, java.util.Map json) { json.put("username", obj.getUsername()); } json.put("willFlag", obj.isWillFlag()); - if (obj.getWillMessage() != null) { - json.put("willMessage", obj.getWillMessage()); + if (obj.getWillMessageBytes() != null) { + json.put("willMessageBytes", BASE64_ENCODER.encodeToString(obj.getWillMessageBytes().getBytes())); } json.put("willQoS", obj.getWillQoS()); json.put("willRetain", obj.isWillRetain()); diff --git a/src/main/java/io/vertx/mqtt/MqttClientOptions.java b/src/main/java/io/vertx/mqtt/MqttClientOptions.java index 12e50c00..6f0692f9 100644 --- a/src/main/java/io/vertx/mqtt/MqttClientOptions.java +++ b/src/main/java/io/vertx/mqtt/MqttClientOptions.java @@ -17,16 +17,13 @@ package io.vertx.mqtt; import io.vertx.codegen.annotations.DataObject; +import io.vertx.codegen.annotations.GenIgnore; import io.vertx.core.buffer.Buffer; import io.vertx.core.impl.Arguments; import io.vertx.core.json.JsonObject; -import io.vertx.core.net.JksOptions; -import io.vertx.core.net.KeyCertOptions; -import io.vertx.core.net.NetClientOptions; -import io.vertx.core.net.PemKeyCertOptions; -import io.vertx.core.net.PemTrustOptions; -import io.vertx.core.net.PfxOptions; -import io.vertx.core.net.TrustOptions; +import io.vertx.core.net.*; + +import java.nio.charset.StandardCharsets; /** * Represents options used by the MQTT client. @@ -51,7 +48,7 @@ public class MqttClientOptions extends NetClientOptions { private String username; private String password; private String willTopic; - private String willMessage; + private Buffer willMessageBytes; private boolean cleanSession = DEFAULT_CLEAN_SESSION; private boolean willFlag = DEFAULT_WILL_FLAG; private int willQoS = DEFAULT_WILL_QOS; @@ -95,6 +92,9 @@ public MqttClientOptions(JsonObject json) { super(json); init(); MqttClientOptionsConverter.fromJson(json, this); + if (!json.containsKey("willMessageBytes") && json.containsKey("willMessage")) { + willMessageBytes = Buffer.buffer(json.getString("willMessage")); + } } /** @@ -108,7 +108,7 @@ public MqttClientOptions(MqttClientOptions other) { this.username = other.username; this.password = other.password; this.willTopic = other.willTopic; - this.willMessage = other.willMessage; + this.willMessageBytes = other.willMessageBytes; this.cleanSession = other.cleanSession; this.willFlag = other.willFlag; this.willQoS = other.willQoS; @@ -202,8 +202,17 @@ public String getWillTopic() { /** * @return will message content */ + @Deprecated + @GenIgnore public String getWillMessage() { - return willMessage; + return willMessageBytes.toString(StandardCharsets.UTF_8); + } + + /** + * @return will message bytes content + */ + public Buffer getWillMessageBytes() { + return willMessageBytes; } /** @@ -256,8 +265,21 @@ public MqttClientOptions setWillTopic(String willTopic) { * @param willMessage content of the will message * @return current options instance */ + @Deprecated + @GenIgnore public MqttClientOptions setWillMessage(String willMessage) { - this.willMessage = willMessage; + this.willMessageBytes = Buffer.buffer(willMessage.getBytes(StandardCharsets.UTF_8)); + return this; + } + + /** + * Set the content of the will message + * + * @param willMessage content of the will message + * @return current options instance + */ + public MqttClientOptions setWillMessageBytes(Buffer willMessage) { + this.willMessageBytes = willMessage; return this; } @@ -572,7 +594,7 @@ public String toString() { ", username='" + username + '\'' + ", password='" + password + '\'' + ", willTopic='" + willTopic + '\'' + - ", willMessage='" + willMessage + '\'' + + ", willMessageBytes='" + willMessageBytes + '\'' + ", cleanSession=" + cleanSession + ", willFlag=" + willFlag + ", willQoS=" + willQoS + diff --git a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java index 5d365e33..b6209f34 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java @@ -21,22 +21,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DecoderResult; -import io.netty.handler.codec.mqtt.MqttConnectPayload; -import io.netty.handler.codec.mqtt.MqttConnectReturnCode; -import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; -import io.netty.handler.codec.mqtt.MqttDecoder; -import io.netty.handler.codec.mqtt.MqttEncoder; -import io.netty.handler.codec.mqtt.MqttFixedHeader; -import io.netty.handler.codec.mqtt.MqttMessageFactory; -import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader; -import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; -import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttProperties; -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; -import io.netty.handler.codec.mqtt.MqttQoS; -import io.netty.handler.codec.mqtt.MqttSubscribePayload; -import io.netty.handler.codec.mqtt.MqttTopicSubscription; -import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; +import io.netty.handler.codec.mqtt.*; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; @@ -46,11 +31,11 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.future.PromiseInternal; -import io.vertx.core.net.impl.NetClientBuilder; -import io.vertx.core.net.impl.NetSocketInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.net.NetClient; +import io.vertx.core.net.impl.NetClientBuilder; +import io.vertx.core.net.impl.NetSocketInternal; import io.vertx.core.net.impl.VertxHandler; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; @@ -63,14 +48,7 @@ import io.vertx.mqtt.messages.impl.MqttPublishMessageImpl; import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayDeque; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -306,7 +284,7 @@ private Future doConnect(int port, String host, String serve MqttConnectPayload payload = new MqttConnectPayload( options.getClientId() == null ? "" : options.getClientId(), options.getWillTopic(), - options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null, + options.getWillMessageBytes() != null ? options.getWillMessageBytes().getBytes() : null, options.hasUsername() ? options.getUsername() : null, options.hasPassword() ? options.getPassword().getBytes() : null ); @@ -1206,14 +1184,14 @@ private void handlePublish(MqttPublishMessage msg) { switch (msg.qosLevel()) { - case AT_MOST_ONCE: + case AT_MOST_ONCE: if (handler != null) { handler.handle(msg); } break; - case AT_LEAST_ONCE: - if (options.isAutoAck()) { + case AT_LEAST_ONCE: + if (options.isAutoAck()) { this.publishAcknowledge(msg.messageId()); } else { ((MqttPublishMessageImpl) msg).setAckCallback(() -> this.publishAcknowledge(msg.messageId())); @@ -1247,7 +1225,7 @@ private void handlePubrel(int pubrelMessageId) { return; } } - + if (options.isAutoAck()) { this.publishComplete(pubrelMessageId); } else { diff --git a/src/test/java/io/vertx/mqtt/test/server/MqttServerBadClientTest.java b/src/test/java/io/vertx/mqtt/test/server/MqttServerBadClientTest.java index edff0a18..b4c82e5e 100644 --- a/src/test/java/io/vertx/mqtt/test/server/MqttServerBadClientTest.java +++ b/src/test/java/io/vertx/mqtt/test/server/MqttServerBadClientTest.java @@ -27,16 +27,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.mqtt.MqttConnectPayload; -import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; -import io.netty.handler.codec.mqtt.MqttEncoder; -import io.netty.handler.codec.mqtt.MqttFixedHeader; -import io.netty.handler.codec.mqtt.MqttMessage; -import io.netty.handler.codec.mqtt.MqttMessageFactory; -import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; -import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.*; import io.netty.util.CharsetUtil; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClient; @@ -216,7 +207,7 @@ private MqttMessage createConnectPacket(MqttClientOptions options) { MqttConnectPayload payload = new MqttConnectPayload( options.getClientId() == null ? "" : options.getClientId(), options.getWillTopic(), - options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null, + options.getWillMessageBytes() != null ? options.getWillMessageBytes().getBytes() : null, options.hasUsername() ? options.getUsername() : null, options.hasPassword() ? options.getPassword().getBytes(StandardCharsets.UTF_8) : null ); diff --git a/src/test/java/io/vertx/mqtt/test/server/MqttServerWillTest.java b/src/test/java/io/vertx/mqtt/test/server/MqttServerWillTest.java index 5ef3e257..75feabbe 100644 --- a/src/test/java/io/vertx/mqtt/test/server/MqttServerWillTest.java +++ b/src/test/java/io/vertx/mqtt/test/server/MqttServerWillTest.java @@ -20,16 +20,10 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonObject; -import io.vertx.core.logging.Logger; -import io.vertx.core.logging.LoggerFactory; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.mqtt.MqttClient; -import io.vertx.mqtt.MqttClientOptions; -import io.vertx.mqtt.MqttServer; -import io.vertx.mqtt.MqttServerOptions; -import io.vertx.mqtt.MqttWill; +import io.vertx.mqtt.*; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -104,7 +98,7 @@ public void testWill(TestContext context) { client = MqttClient.create(vertx, new MqttClientOptions() .setWillFlag(true) .setWillQoS(2) - .setWillMessage("the-message") + .setWillMessageBytes(Buffer.buffer("the-message")) ); client.connect(MQTT_SERVER_PORT, MQTT_SERVER_HOST, context.asyncAssertSuccess(ack -> { }));