From 5397e58cb7e2831af52a5725b6c4bfd682da7a09 Mon Sep 17 00:00:00 2001 From: Dongyuan Pan Date: Thu, 23 May 2024 10:29:09 +0800 Subject: [PATCH] Support some simple feature (#254) * Support user properties * Supoort ContentType and payloadFormatIndicator * add CheckPacket * delete check packet * fix bug: reset topic alias * fix bug: reset topic alias * support SUBSCRIPTION_IDENTIFIER RESPONSE_TOPIC CORRELATION_DATA CONTENT_TYPE --- .../mqtt/common/util/MessageUtil.java | 2 +- .../common/test/util/TestMessageUtil.java | 57 ++++++++++ .../mqtt/cs/session/infly/PushAction.java | 104 ++++++++++++++---- .../mqtt5/processor/PublishProcessor5.java | 1 + .../mqtt/example/mqtt5/Mqtt5Consumer.java | 2 +- .../mqtt/example/mqtt5/Mqtt5Producer.java | 15 +++ 6 files changed, 157 insertions(+), 24 deletions(-) diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java index 536244515..6b864686d 100644 --- a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java +++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java @@ -108,7 +108,7 @@ public static Message toMessage(MqttPublishMessage mqttMessage) { message.putUserProperty(Message.propertyCorrelationData, new String(correlationData.value(), StandardCharsets.UTF_8)); } - // User Properties + // The user properties of publish packets need to be stored, and when pushing, they need to be brought with them List userProperties = (List) mqttProperties.getProperties(USER_PROPERTY.value()); List userPropertyList = new ArrayList<>(); for (MqttProperties.UserProperty userProperty : userProperties) { diff --git a/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java index 7318210e8..6788dbd19 100644 --- a/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java +++ b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java @@ -17,29 +17,50 @@ package org.apache.rocketmq.mqtt.common.test.util; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.CharsetUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.mqtt.common.model.Message; import org.apache.rocketmq.mqtt.common.util.MessageUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.USER_PROPERTY; import static org.apache.rocketmq.mqtt.common.util.MessageUtil.EMPTYSTRING; import static org.apache.rocketmq.mqtt.common.util.MessageUtil.dealEmptyMessage; import static org.apache.rocketmq.mqtt.common.util.MessageUtil.removeRetainedFlag; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; public class TestMessageUtil { @@ -119,4 +140,40 @@ public void TestDealEmptyMessage() { newEmptyMessage.payload().readBytes(newBody); Assert.assertArrayEquals(EMPTYSTRING.getBytes(), newBody); } + + @Test + public void TestMqtt5Message() { + + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.UserProperty("isSecret", "true")); + props.add(new MqttProperties.UserProperty("tag", "firstTag")); + props.add(new MqttProperties.UserProperty("tag", "secondTag")); + + props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 100)); + props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 101)); + + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1); + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("test", 0, props); + ByteBuf payload = Unpooled.copiedBuffer("test".getBytes(StandardCharsets.UTF_8)); + MqttPublishMessage publishMessage = new MqttPublishMessage(mqttFixedHeader, variableHeader, payload); + Message message = MessageUtil.toMessage(publishMessage); + + String mqtt5UserProperties = message.getUserProperty(Message.propertyMqtt5UserProperty); + MqttPublishMessage newPublishMessage = null; + + if (StringUtils.isNotBlank(mqtt5UserProperties)) { + ArrayList userProperties = JSON.parseObject(mqtt5UserProperties, + new TypeReference>() {} + ); + MqttProperties newProps = new MqttProperties(); + newProps.add(new MqttProperties.UserProperties(userProperties)); + MqttPublishVariableHeader newVariableHeader = new MqttPublishVariableHeader("test", 0, props); + newPublishMessage = new MqttPublishMessage(mqttFixedHeader, variableHeader, payload); + } + + MqttProperties checkProps = newPublishMessage.variableHeader().properties(); + Assert.assertEquals("true", ((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(0).value()).value); + Assert.assertEquals("firstTag", ((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(1).value()).value); + Assert.assertEquals("secondTag", ((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(2).value()).value); + } } diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java index ca044ac4a..feccc55b5 100644 --- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java +++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java @@ -17,10 +17,14 @@ package org.apache.rocketmq.mqtt.cs.session.infly; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttVersion; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; @@ -39,8 +43,13 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.ArrayList; import java.util.List; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS; import static java.lang.Math.min; import static java.util.Objects.hash; @@ -95,9 +104,9 @@ public void push(Message message, Subscription subscription, Session session, Qu try { if (session.isClean()) { if (message.getStoreTimestamp() > 0 && - message.getStoreTimestamp() < session.getStartTime()) { + message.getStoreTimestamp() < session.getStartTime()) { logger.warn("old msg:{},{},{},{}", session.getClientId(), message.getMsgId(), - message.getStoreTimestamp(), session.getStartTime()); + message.getStoreTimestamp(), session.getStartTime()); rollNext(session, mqttId); return; } @@ -172,28 +181,37 @@ public void write(Session session, Message message, int mqttId, int qos, Subscri data = MqttMessageFactory.buildPublishMessage(topicName, message.getPayload(), qos, retained, mqttId); break; case MQTT_5: + // add content type + if (StringUtils.isNotBlank(message.getUserProperty(Message.propertyContentType))) { + mqttProperties.add(new MqttProperties.StringProperty(CONTENT_TYPE.value(), message.getUserProperty(Message.propertyContentType))); + } + + // add Response Topic + if (StringUtils.isNotBlank(message.getUserProperty(Message.propertyResponseTopic))) { + mqttProperties.add(new MqttProperties.StringProperty(RESPONSE_TOPIC.value(), message.getUserProperty(Message.propertyResponseTopic))); + } + + // add Correlation Data + if (StringUtils.isNotBlank(message.getUserProperty(Message.propertyCorrelationData))) { + mqttProperties.add(new MqttProperties.StringProperty(CORRELATION_DATA.value(), message.getUserProperty(Message.propertyCorrelationData))); + } + + // process publish user properties + processUserProperties(message, mqttProperties); + + // process subscription identifier + if (subscription.getSubscriptionIdentifier() > 0) { + mqttProperties.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), subscription.getSubscriptionIdentifier())); + } + // TODO retain flag should be set by subscription option - int topicAlias = ChannelInfo.getTopicAliasMaximum(channel); - if (topicAlias > 0) { - String topicNameTmp = ""; - if (ChannelInfo.getServerTopicAlias(channel, topicName) == null) { - // allocate topic alias - int allocateAlias = genServerTopicAlias(topicName, topicAlias); - - if (ChannelInfo.getServerAliasTopic(channel, allocateAlias) != null) { - // conflict, reset topic <-> alias - topicNameTmp = topicName; - } - - ChannelInfo.setServerTopicAlias(channel, topicName, allocateAlias); - ChannelInfo.setServerAliasTopic(channel, allocateAlias, topicName); - } + boolean isRetained = message.isRetained(); - mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), ChannelInfo.getServerTopicAlias(channel, topicName))); - data = MqttMessageFactory.buildMqtt5PublishMessage(topicNameTmp, message.getPayload(), qos, retained, mqttId, mqttProperties); + // process topic alias + if (!processTopicAlias(channel, topicName, mqttProperties)) { + data = MqttMessageFactory.buildMqtt5PublishMessage("", message.getPayload(), qos, isRetained, mqttId, mqttProperties); } else { - // no alias - data = MqttMessageFactory.buildMqtt5PublishMessage(topicName, message.getPayload(), qos, retained, mqttId, mqttProperties); + data = MqttMessageFactory.buildMqtt5PublishMessage(topicName, message.getPayload(), qos, isRetained, mqttId, mqttProperties); } break; default: @@ -207,12 +225,54 @@ public void write(Session session, Message message, int mqttId, int qos, Subscri message.setRetry(message.getRetry() + 1); logger.warn("retryPush:{},{},{}", session.getClientId(), message.getMsgId(), message.getRetry()); } else if (subscription.isShare()) { - String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(message.getOriginTopic(), "/","%"); + String lmqTopic = MixAll.LMQ_PREFIX + StringUtils.replace(message.getOriginTopic(), "/", "%"); lmqQueueStore.popAck(lmqTopic, subscription.getSharedName(), message); } }); } + public void processUserProperties(Message message, MqttProperties mqttProperties) { + String mqtt5UserProperties = message.getUserProperty(Message.propertyMqtt5UserProperty); + if (StringUtils.isNotBlank(mqtt5UserProperties)) { + ArrayList userProperties = JSON.parseObject(mqtt5UserProperties, + new TypeReference>() { + } + ); + mqttProperties.add(new MqttProperties.UserProperties(userProperties)); + } + } + + /** + * process topic alias + * @param channel + * @param topicName + * @param mqttProperties + * @return true: conflict when allocated topic alias + */ + public boolean processTopicAlias(Channel channel, String topicName, MqttProperties mqttProperties) { + int topicAlias = ChannelInfo.getTopicAliasMaximum(channel); + boolean conflict = false; + + if (topicAlias > 0) { + if (ChannelInfo.getServerTopicAlias(channel, topicName) == null) { + // allocate topic alias + int allocateAlias = genServerTopicAlias(topicName, topicAlias); + + if (ChannelInfo.getServerAliasTopic(channel, allocateAlias) != null) { + // conflict, client will reset topic <-> alias + conflict = true; + } + + ChannelInfo.setServerTopicAlias(channel, topicName, allocateAlias); + ChannelInfo.setServerAliasTopic(channel, allocateAlias, topicName); + } + + // topic has allocated topic alias,just set to mqttProperties + mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), ChannelInfo.getServerTopicAlias(channel, topicName))); + } + return conflict; + } + public int genServerTopicAlias(String topicName, int topicAliasMaximum) { return hash(topicName) % topicAliasMaximum + 1; } diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java index 2cdee6659..4c3bb89fe 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java @@ -72,6 +72,7 @@ public CompletableFuture process(MqttMessageUpContext context, MqttM public CompletableFuture put(MqttMessageUpContext context, MqttMessage mqttMessage) { MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; + // process topic alias final MqttPublishVariableHeader variableHeaderTmp = mqttPublishMessage.variableHeader(); MqttProperties mqttProperties = variableHeaderTmp.properties(); if (mqttProperties != null && context.getClientTopicAliasMap() != null) { diff --git a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java index f40dd944c..c139d40e9 100644 --- a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java +++ b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java @@ -62,7 +62,7 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { try { String payload = new String(message.getPayload()); String[] ss = payload.split("_"); - System.out.println(now() + "receive:" + topic + "," + payload); + System.out.println(now() + "receive:" + topic + "," + payload + ", properties: " + message.getProperties()); } catch (Exception e) { e.printStackTrace(); } diff --git a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java index 4968f9e7f..4ae1876b3 100644 --- a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java +++ b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java @@ -27,12 +27,15 @@ import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; +import java.util.List; public class Mqtt5Producer { public static void main(String[] args) throws InterruptedException, MqttException, NoSuchAlgorithmException, InvalidKeyException { @@ -82,6 +85,18 @@ public void authPacketArrived(int reasonCode, MqttProperties properties) { for (int i = 0; i < 1000; i++) { String msg = "r1_" + System.currentTimeMillis() + "_" + i; MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8)); + + // user properties + MqttProperties mqttProperties = new MqttProperties(); + List userProperties = new ArrayList<>(); + userProperties.add(new UserProperty("tag", "r1")); + userProperties.add(new UserProperty("tag", "r11")); + mqttProperties.setUserProperties(userProperties); + + // content type + mqttProperties.setContentType("text/plain"); + message.setProperties(mqttProperties); + message.setQos(1); String mqttSendTopic = firstTopic + "/r1"; mqttClient.publish(mqttSendTopic, message);