Skip to content

Commit

Permalink
Rename MqttClientOptions willMessage to willMessageBytes
Browse files Browse the repository at this point in the history
Fixes #245

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont committed Nov 8, 2023
1 parent cecbde6 commit 34c91a3
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, MqttCli
obj.setWillFlag((Boolean)member.getValue());
}
break;
case "willMessage":
case "willMessageBytes":
if (member.getValue() instanceof String) {
obj.setWillMessage((String)member.getValue());
obj.setWillMessageBytes(io.vertx.core.buffer.Buffer.buffer(BASE64_DECODER.decode((String)member.getValue())));
}
break;
case "willQoS":
Expand Down Expand Up @@ -127,8 +127,8 @@ static void toJson(MqttClientOptions obj, java.util.Map<String, Object> json) {
json.put("username", obj.getUsername());
}
json.put("willFlag", obj.isWillFlag());
if (obj.getWillMessage() != null) {
json.put("willMessage", obj.getWillMessage());
if (obj.getWillMessageBytes() != null) {
json.put("willMessageBytes", BASE64_ENCODER.encodeToString(obj.getWillMessageBytes().getBytes()));
}
json.put("willQoS", obj.getWillQoS());
json.put("willRetain", obj.isWillRetain());
Expand Down
46 changes: 34 additions & 12 deletions src/main/java/io/vertx/mqtt/MqttClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
package io.vertx.mqtt;

import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.core.net.*;

import java.nio.charset.StandardCharsets;

/**
* Represents options used by the MQTT client.
Expand All @@ -51,7 +48,7 @@ public class MqttClientOptions extends NetClientOptions {
private String username;
private String password;
private String willTopic;
private String willMessage;
private Buffer willMessageBytes;
private boolean cleanSession = DEFAULT_CLEAN_SESSION;
private boolean willFlag = DEFAULT_WILL_FLAG;
private int willQoS = DEFAULT_WILL_QOS;
Expand Down Expand Up @@ -95,6 +92,9 @@ public MqttClientOptions(JsonObject json) {
super(json);
init();
MqttClientOptionsConverter.fromJson(json, this);
if (!json.containsKey("willMessageBytes") && json.containsKey("willMessage")) {
willMessageBytes = Buffer.buffer(json.getString("willMessage"));
}
}

/**
Expand All @@ -108,7 +108,7 @@ public MqttClientOptions(MqttClientOptions other) {
this.username = other.username;
this.password = other.password;
this.willTopic = other.willTopic;
this.willMessage = other.willMessage;
this.willMessageBytes = other.willMessageBytes;
this.cleanSession = other.cleanSession;
this.willFlag = other.willFlag;
this.willQoS = other.willQoS;
Expand Down Expand Up @@ -202,8 +202,17 @@ public String getWillTopic() {
/**
* @return will message content
*/
@Deprecated
@GenIgnore
public String getWillMessage() {
return willMessage;
return willMessageBytes.toString(StandardCharsets.UTF_8);
}

/**
* @return will message bytes content
*/
public Buffer getWillMessageBytes() {
return willMessageBytes;
}

/**
Expand Down Expand Up @@ -256,8 +265,21 @@ public MqttClientOptions setWillTopic(String willTopic) {
* @param willMessage content of the will message
* @return current options instance
*/
@Deprecated
@GenIgnore
public MqttClientOptions setWillMessage(String willMessage) {
this.willMessage = willMessage;
this.willMessageBytes = Buffer.buffer(willMessage.getBytes(StandardCharsets.UTF_8));
return this;
}

/**
* Set the content of the will message
*
* @param willMessage content of the will message
* @return current options instance
*/
public MqttClientOptions setWillMessageBytes(Buffer willMessage) {
this.willMessageBytes = willMessage;
return this;
}

Expand Down Expand Up @@ -572,7 +594,7 @@ public String toString() {
", username='" + username + '\'' +
", password='" + password + '\'' +
", willTopic='" + willTopic + '\'' +
", willMessage='" + willMessage + '\'' +
", willMessageBytes='" + willMessageBytes + '\'' +
", cleanSession=" + cleanSession +
", willFlag=" + willFlag +
", willQoS=" + willQoS +
Expand Down
40 changes: 9 additions & 31 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
Expand All @@ -46,11 +31,11 @@
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.impl.NetClientBuilder;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.impl.NetClientBuilder;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
Expand All @@ -63,14 +48,7 @@
import io.vertx.mqtt.messages.impl.MqttPublishMessageImpl;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -306,7 +284,7 @@ private Future<MqttConnAckMessage> doConnect(int port, String host, String serve
MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.getWillMessageBytes() != null ? options.getWillMessageBytes().getBytes() : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes() : null
);
Expand Down Expand Up @@ -1206,14 +1184,14 @@ private void handlePublish(MqttPublishMessage msg) {

switch (msg.qosLevel()) {

case AT_MOST_ONCE:
case AT_MOST_ONCE:
if (handler != null) {
handler.handle(msg);
}
break;

case AT_LEAST_ONCE:
if (options.isAutoAck()) {
case AT_LEAST_ONCE:
if (options.isAutoAck()) {
this.publishAcknowledge(msg.messageId());
} else {
((MqttPublishMessageImpl) msg).setAckCallback(() -> this.publishAcknowledge(msg.messageId()));
Expand Down Expand Up @@ -1247,7 +1225,7 @@ private void handlePubrel(int pubrelMessageId) {
return;
}
}

if (options.isAutoAck()) {
this.publishComplete(pubrelMessageId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,7 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.*;
import io.netty.util.CharsetUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
Expand Down Expand Up @@ -216,7 +207,7 @@ private MqttMessage createConnectPacket(MqttClientOptions options) {
MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.getWillMessageBytes() != null ? options.getWillMessageBytes().getBytes() : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes(StandardCharsets.UTF_8) : null
);
Expand Down
10 changes: 2 additions & 8 deletions src/test/java/io/vertx/mqtt/test/server/MqttServerWillTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,10 @@
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.MqttWill;
import io.vertx.mqtt.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -104,7 +98,7 @@ public void testWill(TestContext context) {
client = MqttClient.create(vertx, new MqttClientOptions()
.setWillFlag(true)
.setWillQoS(2)
.setWillMessage("the-message")
.setWillMessageBytes(Buffer.buffer("the-message"))
);
client.connect(MQTT_SERVER_PORT, MQTT_SERVER_HOST, context.asyncAssertSuccess(ack -> {
}));
Expand Down

0 comments on commit 34c91a3

Please sign in to comment.