Skip to content

Commit

Permalink
Support some simple feature (#254)
Browse files Browse the repository at this point in the history
* Support user properties

* Supoort ContentType and payloadFormatIndicator

* add CheckPacket

* delete check packet

* fix bug: reset topic alias

* fix bug: reset topic alias

* support SUBSCRIPTION_IDENTIFIER RESPONSE_TOPIC CORRELATION_DATA CONTENT_TYPE
  • Loading branch information
DongyuanPan authored May 23, 2024
1 parent 48968cd commit 5397e58
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static Message toMessage(MqttPublishMessage mqttMessage) {
message.putUserProperty(Message.propertyCorrelationData, new String(correlationData.value(), StandardCharsets.UTF_8));
}

// User Properties
// The user properties of publish packets need to be stored, and when pushing, they need to be brought with them
List<MqttProperties.UserProperty> userProperties = (List<MqttProperties.UserProperty>) mqttProperties.getProperties(USER_PROPERTY.value());
List<MqttProperties.StringPair> userPropertyList = new ArrayList<>();
for (MqttProperties.UserProperty userProperty : userProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,50 @@

package org.apache.rocketmq.mqtt.common.test.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.CharsetUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.mqtt.common.model.Message;
import org.apache.rocketmq.mqtt.common.util.MessageUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.USER_PROPERTY;
import static org.apache.rocketmq.mqtt.common.util.MessageUtil.EMPTYSTRING;
import static org.apache.rocketmq.mqtt.common.util.MessageUtil.dealEmptyMessage;
import static org.apache.rocketmq.mqtt.common.util.MessageUtil.removeRetainedFlag;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

public class TestMessageUtil {

Expand Down Expand Up @@ -119,4 +140,40 @@ public void TestDealEmptyMessage() {
newEmptyMessage.payload().readBytes(newBody);
Assert.assertArrayEquals(EMPTYSTRING.getBytes(), newBody);
}

@Test
public void TestMqtt5Message() {

MqttProperties props = new MqttProperties();
props.add(new MqttProperties.UserProperty("isSecret", "true"));
props.add(new MqttProperties.UserProperty("tag", "firstTag"));
props.add(new MqttProperties.UserProperty("tag", "secondTag"));

props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 100));
props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 101));

MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1);
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("test", 0, props);
ByteBuf payload = Unpooled.copiedBuffer("test".getBytes(StandardCharsets.UTF_8));
MqttPublishMessage publishMessage = new MqttPublishMessage(mqttFixedHeader, variableHeader, payload);
Message message = MessageUtil.toMessage(publishMessage);

String mqtt5UserProperties = message.getUserProperty(Message.propertyMqtt5UserProperty);
MqttPublishMessage newPublishMessage = null;

if (StringUtils.isNotBlank(mqtt5UserProperties)) {
ArrayList<MqttProperties.StringPair> userProperties = JSON.parseObject(mqtt5UserProperties,
new TypeReference<ArrayList<MqttProperties.StringPair>>() {}
);
MqttProperties newProps = new MqttProperties();
newProps.add(new MqttProperties.UserProperties(userProperties));
MqttPublishVariableHeader newVariableHeader = new MqttPublishVariableHeader("test", 0, props);
newPublishMessage = new MqttPublishMessage(mqttFixedHeader, variableHeader, payload);
}

MqttProperties checkProps = newPublishMessage.variableHeader().properties();
Assert.assertEquals("true", ((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(0).value()).value);
Assert.assertEquals("firstTag", ((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(1).value()).value);
Assert.assertEquals("secondTag", ((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(2).value()).value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.rocketmq.mqtt.cs.session.infly;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
Expand All @@ -39,8 +43,13 @@
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;

import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
import static java.lang.Math.min;
import static java.util.Objects.hash;
Expand Down Expand Up @@ -95,9 +104,9 @@ public void push(Message message, Subscription subscription, Session session, Qu
try {
if (session.isClean()) {
if (message.getStoreTimestamp() > 0 &&
message.getStoreTimestamp() < session.getStartTime()) {
message.getStoreTimestamp() < session.getStartTime()) {
logger.warn("old msg:{},{},{},{}", session.getClientId(), message.getMsgId(),
message.getStoreTimestamp(), session.getStartTime());
message.getStoreTimestamp(), session.getStartTime());
rollNext(session, mqttId);
return;
}
Expand Down Expand Up @@ -172,28 +181,37 @@ public void write(Session session, Message message, int mqttId, int qos, Subscri
data = MqttMessageFactory.buildPublishMessage(topicName, message.getPayload(), qos, retained, mqttId);
break;
case MQTT_5:
// add content type
if (StringUtils.isNotBlank(message.getUserProperty(Message.propertyContentType))) {
mqttProperties.add(new MqttProperties.StringProperty(CONTENT_TYPE.value(), message.getUserProperty(Message.propertyContentType)));
}

// add Response Topic
if (StringUtils.isNotBlank(message.getUserProperty(Message.propertyResponseTopic))) {
mqttProperties.add(new MqttProperties.StringProperty(RESPONSE_TOPIC.value(), message.getUserProperty(Message.propertyResponseTopic)));
}

// add Correlation Data
if (StringUtils.isNotBlank(message.getUserProperty(Message.propertyCorrelationData))) {
mqttProperties.add(new MqttProperties.StringProperty(CORRELATION_DATA.value(), message.getUserProperty(Message.propertyCorrelationData)));
}

// process publish user properties
processUserProperties(message, mqttProperties);

// process subscription identifier
if (subscription.getSubscriptionIdentifier() > 0) {
mqttProperties.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), subscription.getSubscriptionIdentifier()));
}

// TODO retain flag should be set by subscription option
int topicAlias = ChannelInfo.getTopicAliasMaximum(channel);
if (topicAlias > 0) {
String topicNameTmp = "";
if (ChannelInfo.getServerTopicAlias(channel, topicName) == null) {
// allocate topic alias
int allocateAlias = genServerTopicAlias(topicName, topicAlias);

if (ChannelInfo.getServerAliasTopic(channel, allocateAlias) != null) {
// conflict, reset topic <-> alias
topicNameTmp = topicName;
}

ChannelInfo.setServerTopicAlias(channel, topicName, allocateAlias);
ChannelInfo.setServerAliasTopic(channel, allocateAlias, topicName);
}
boolean isRetained = message.isRetained();

mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), ChannelInfo.getServerTopicAlias(channel, topicName)));
data = MqttMessageFactory.buildMqtt5PublishMessage(topicNameTmp, message.getPayload(), qos, retained, mqttId, mqttProperties);
// process topic alias
if (!processTopicAlias(channel, topicName, mqttProperties)) {
data = MqttMessageFactory.buildMqtt5PublishMessage("", message.getPayload(), qos, isRetained, mqttId, mqttProperties);
} else {
// no alias
data = MqttMessageFactory.buildMqtt5PublishMessage(topicName, message.getPayload(), qos, retained, mqttId, mqttProperties);
data = MqttMessageFactory.buildMqtt5PublishMessage(topicName, message.getPayload(), qos, isRetained, mqttId, mqttProperties);
}
break;
default:
Expand All @@ -207,12 +225,54 @@ public void write(Session session, Message message, int mqttId, int qos, Subscri
message.setRetry(message.getRetry() + 1);
logger.warn("retryPush:{},{},{}", session.getClientId(), message.getMsgId(), message.getRetry());
} else if (subscription.isShare()) {
String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(message.getOriginTopic(), "/","%");
String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(message.getOriginTopic(), "/", "%");
lmqQueueStore.popAck(lmqTopic, subscription.getSharedName(), message);
}
});
}

public void processUserProperties(Message message, MqttProperties mqttProperties) {
String mqtt5UserProperties = message.getUserProperty(Message.propertyMqtt5UserProperty);
if (StringUtils.isNotBlank(mqtt5UserProperties)) {
ArrayList<MqttProperties.StringPair> userProperties = JSON.parseObject(mqtt5UserProperties,
new TypeReference<ArrayList<MqttProperties.StringPair>>() {
}
);
mqttProperties.add(new MqttProperties.UserProperties(userProperties));
}
}

/**
* process topic alias
* @param channel
* @param topicName
* @param mqttProperties
* @return true: conflict when allocated topic alias
*/
public boolean processTopicAlias(Channel channel, String topicName, MqttProperties mqttProperties) {
int topicAlias = ChannelInfo.getTopicAliasMaximum(channel);
boolean conflict = false;

if (topicAlias > 0) {
if (ChannelInfo.getServerTopicAlias(channel, topicName) == null) {
// allocate topic alias
int allocateAlias = genServerTopicAlias(topicName, topicAlias);

if (ChannelInfo.getServerAliasTopic(channel, allocateAlias) != null) {
// conflict, client will reset topic <-> alias
conflict = true;
}

ChannelInfo.setServerTopicAlias(channel, topicName, allocateAlias);
ChannelInfo.setServerAliasTopic(channel, allocateAlias, topicName);
}

// topic has allocated topic alias,just set to mqttProperties
mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), ChannelInfo.getServerTopicAlias(channel, topicName)));
}
return conflict;
}

public int genServerTopicAlias(String topicName, int topicAliasMaximum) {
return hash(topicName) % topicAliasMaximum + 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public CompletableFuture<HookResult> process(MqttMessageUpContext context, MqttM
public CompletableFuture<StoreResult> put(MqttMessageUpContext context, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;

// process topic alias
final MqttPublishVariableHeader variableHeaderTmp = mqttPublishMessage.variableHeader();
MqttProperties mqttProperties = variableHeaderTmp.properties();
if (mqttProperties != null && context.getClientTopicAliasMap() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void messageArrived(String topic, MqttMessage message) throws Exception {
try {
String payload = new String(message.getPayload());
String[] ss = payload.split("_");
System.out.println(now() + "receive:" + topic + "," + payload);
System.out.println(now() + "receive:" + topic + "," + payload + ", properties: " + message.getProperties());
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class Mqtt5Producer {
public static void main(String[] args) throws InterruptedException, MqttException, NoSuchAlgorithmException, InvalidKeyException {
Expand Down Expand Up @@ -82,6 +85,18 @@ public void authPacketArrived(int reasonCode, MqttProperties properties) {
for (int i = 0; i < 1000; i++) {
String msg = "r1_" + System.currentTimeMillis() + "_" + i;
MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));

// user properties
MqttProperties mqttProperties = new MqttProperties();
List<UserProperty> userProperties = new ArrayList<>();
userProperties.add(new UserProperty("tag", "r1"));
userProperties.add(new UserProperty("tag", "r11"));
mqttProperties.setUserProperties(userProperties);

// content type
mqttProperties.setContentType("text/plain");
message.setProperties(mqttProperties);

message.setQos(1);
String mqttSendTopic = firstTopic + "/r1";
mqttClient.publish(mqttSendTopic, message);
Expand Down

0 comments on commit 5397e58

Please sign in to comment.