Skip to content

Commit

Permalink
[#2955] MQTT 5 support
Browse files Browse the repository at this point in the history
  • Loading branch information
harism authored and sophokles73 committed Sep 27, 2024
1 parent 7dc258d commit 4a52110
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@

import io.micrometer.core.instrument.Timer.Sample;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.opentracing.SpanContext;
Expand All @@ -106,6 +107,9 @@
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttUnsubAckReasonCode;

/**
* A base class for implementing Vert.x based Hono protocol adapters for publishing events & telemetry data using
Expand Down Expand Up @@ -885,7 +889,7 @@ public final Future<Void> uploadCommandResponseMessage(final MqttContext ctx) {
// check that the remote MQTT client is still connected before sending PUBACK
if (ctx.isAtLeastOnce() && ctx.deviceEndpoint().isConnected()) {
currentSpan.log(EVENT_SENDING_PUBACK);
ctx.acknowledge();
ctx.acknowledge(MqttPubAckReasonCode.SUCCESS);
}
currentSpan.finish();
return Future.<Void> succeededFuture();
Expand Down Expand Up @@ -968,7 +972,7 @@ private Future<Void> uploadMessage(
// check that the remote MQTT client is still connected before sending PUBACK
if (ctx.isAtLeastOnce() && ctx.deviceEndpoint().isConnected()) {
currentSpan.log(EVENT_SENDING_PUBACK);
ctx.acknowledge();
ctx.acknowledge(MqttPubAckReasonCode.SUCCESS);
}
currentSpan.finish();
return ok;
Expand Down Expand Up @@ -1266,7 +1270,7 @@ private void handlePublishedMessageError(final MqttContext context, final Throwa
span.log("skipped sending PUBACK");
} else if (context.deviceEndpoint().isConnected()) {
span.log(EVENT_SENDING_PUBACK);
context.acknowledge();
context.acknowledge(MqttPubAckReasonCode.UNSPECIFIED_ERROR);
}
}
span.finish();
Expand Down Expand Up @@ -1415,11 +1419,11 @@ protected final void onSubscribe(final MqttSubscribeMessage subscribeMsg) {
Future.join(new ArrayList<>(subscriptionOutcomes)).onComplete(v -> {

if (endpoint.isConnected()) {
// return a status code for each topic filter contained in the SUBSCRIBE packet
final List<MqttQoS> grantedQosLevels = subscriptionOutcomes.stream()
.map(future -> future.failed() ? MqttQoS.FAILURE : future.result().getQos())
// return a reason code for each topic filter contained in the SUBSCRIBE packet
final List<MqttSubAckReasonCode> reasonCodes = subscriptionOutcomes.stream()
.map(future -> future.failed() ? MqttSubAckReasonCode.UNSPECIFIED_ERROR : MqttSubAckReasonCode.qosGranted(future.result().getQos()))
.collect(Collectors.toList());
endpoint.subscribeAcknowledge(subscribeMsg.messageId(), grantedQosLevels);
endpoint.subscribeAcknowledge(subscribeMsg.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
} else {
TracingHelper.logError(span, "skipped sending command subscription notification - endpoint not connected anymore");
log.debug("skipped sending command subscription notification - endpoint not connected anymore [tenant-id: {}, device-id: {}]",
Expand Down Expand Up @@ -1785,6 +1789,7 @@ protected final void onUnsubscribe(final MqttUnsubscribeMessage unsubscribeMsg)
}
final Span span = newSpan("UNSUBSCRIBE");

final List<MqttUnsubAckReasonCode> reasonCodes = new ArrayList<>();
final List<Future<Void>> removalDoneFutures = new ArrayList<>(unsubscribeMsg.topics().size());
unsubscribeMsg.topics().forEach(topic -> {

Expand All @@ -1810,13 +1815,15 @@ protected final void onUnsubscribe(final MqttUnsubscribeMessage unsubscribeMsg)
if (removedSubscription.get() != null) {
log.debug("removed subscription with topic [{}] for device [tenant-id: {}, device-id: {}]",
topic, removedSubscription.get().getTenant(), removedSubscription.get().getDeviceId());
reasonCodes.add(MqttUnsubAckReasonCode.SUCCESS);
} else {
TracingHelper.logError(span, String.format("no subscription found for topic filter [%s]", topic));
log.debug("cannot unsubscribe - no subscription found for topic filter [{}]", topic);
reasonCodes.add(MqttUnsubAckReasonCode.UNSPECIFIED_ERROR);
}
});
if (endpoint.isConnected()) {
endpoint.unsubscribeAcknowledge(unsubscribeMsg.messageId());
endpoint.unsubscribeAcknowledge(unsubscribeMsg.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
}
Future.join(removalDoneFutures).onComplete(r -> span.finish());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.eclipse.hono.util.Strings;

import io.micrometer.core.instrument.Timer.Sample;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode;

/**
* A dictionary of relevant information required during the
Expand Down Expand Up @@ -162,8 +164,14 @@ public static MqttContext fromPublishPacket(
result.propertyBag = bag;
result.topic = bag.topicWithoutPropertyBag();
result.endpoint = MetricsTags.EndpointType.fromString(result.topic.getEndpoint());
// set the content-type using the corresponding value from the property bag
result.contentType = bag.getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE);
// 1. set the content-type using the corresponding value from the property bag
// 2. set the content-type using the corresponding value from the mqtt message properties (MQTT5)
result.contentType = Optional.ofNullable(bag.getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE))
.orElse(Optional.ofNullable(publishedMessage.properties())
.map(properties -> properties.getProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value()))
.map(MqttProperties.MqttProperty::value)
.map(Object::toString)
.orElse(null));
if (result.endpoint == EndpointType.EVENT) {
result.timeToLive = determineTimeToLive(bag);
}
Expand Down Expand Up @@ -423,10 +431,12 @@ public boolean isAtLeastOnce() {

/**
* Sends a PUBACK for the message to the device.
*
* @param reasonCode Mqtt Publish Acknowledge reason code.
*/
public void acknowledge() {
public void acknowledge(final MqttPubAckReasonCode reasonCode) {
if (message != null && deviceEndpoint != null && isAtLeastOnce()) {
deviceEndpoint.publishAcknowledge(message.messageId());
deviceEndpoint.publishAcknowledge(message.messageId(), reasonCode, MqttProperties.NO_PROPERTIES);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.mockito.ArgumentCaptor;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.opentracing.SpanContext;
Expand All @@ -97,6 +98,8 @@
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;

/**
* Verifies behavior of {@link AbstractVertxBasedMqttProtocolAdapter}.
Expand Down Expand Up @@ -737,7 +740,7 @@ private void testUploadQoS1MessageSendsPubAckOnSuccess(
upload.accept(adapter, context);

// THEN the device does not receive a PUBACK
verify(endpoint, never()).publishAcknowledge(anyInt());
verify(endpoint, never()).publishAcknowledge(anyInt(), any(MqttPubAckReasonCode.class), any(MqttProperties.class));
// and the message has not been reported as forwarded
verify(metrics, never()).reportTelemetry(
any(MetricsTags.EndpointType.class),
Expand All @@ -750,7 +753,7 @@ private void testUploadQoS1MessageSendsPubAckOnSuccess(

// until the message has been settled and accepted
outcome.complete();
verify(endpoint).publishAcknowledge(5555555);
verify(endpoint).publishAcknowledge(5555555, MqttPubAckReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES);
verify(metrics).reportTelemetry(
eq(type),
eq("my-tenant"),
Expand Down Expand Up @@ -883,7 +886,7 @@ public void testUploadTelemetryMessageIncludesRetainAnnotation(final VertxTestCo

ctx.verify(() -> {
// THEN the device has received a PUBACK
verify(endpoint).publishAcknowledge(5555555);
verify(endpoint).publishAcknowledge(5555555, MqttPubAckReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES);
// and the message has been sent downstream
// including the "retain" annotation
verify(telemetrySender).sendTelemetry(
Expand Down Expand Up @@ -1148,16 +1151,16 @@ public void testOnSubscribeIncludesStatusCodeForEachFilter() {

// THEN the adapter sends a SUBACK packet to the device
// which contains a failure status code for each unsupported filter
final ArgumentCaptor<List<MqttQoS>> codeCaptor = ArgumentCaptor.forClass(List.class);
verify(endpoint).subscribeAcknowledge(eq(15), codeCaptor.capture());
final ArgumentCaptor<List<MqttSubAckReasonCode>> codeCaptor = ArgumentCaptor.forClass(List.class);
verify(endpoint).subscribeAcknowledge(eq(15), codeCaptor.capture(), eq(MqttProperties.NO_PROPERTIES));
assertThat(codeCaptor.getValue()).hasSize(subscriptions.size());
assertThat(codeCaptor.getValue().get(0)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(1)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(2)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(3)).isEqualTo(MqttQoS.AT_MOST_ONCE);
assertThat(codeCaptor.getValue().get(4)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(5)).isEqualTo(MqttQoS.FAILURE);
assertThat(codeCaptor.getValue().get(6)).isEqualTo(MqttQoS.AT_MOST_ONCE);
assertThat(codeCaptor.getValue().get(0)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(1)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(2)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(3)).isEqualTo(MqttSubAckReasonCode.qosGranted(MqttQoS.AT_MOST_ONCE));
assertThat(codeCaptor.getValue().get(4)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(5)).isEqualTo(MqttSubAckReasonCode.UNSPECIFIED_ERROR);
assertThat(codeCaptor.getValue().get(6)).isEqualTo(MqttSubAckReasonCode.qosGranted(MqttQoS.AT_MOST_ONCE));
}

private static MqttTopicSubscription newMockTopicSubscription(final String filter, final MqttQoS qos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
* SPDX-License-Identifier: EPL-2.0
*/


package org.eclipse.hono.adapter.mqtt;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -32,6 +32,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.netty.handler.codec.mqtt.MqttProperties;
import io.opentracing.Span;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.messages.MqttPublishMessage;
Expand Down Expand Up @@ -95,17 +96,43 @@ public void verifyPropertyBagRetrievedFromTopic() {
* Verifies the <em>content-type</em> value retrieved from the <em>property-bag</em> in a message's topic.
*/
@Test
public void verifyContentType() {
public void verifyContentTypeFromPropertyBag() {
final String contentType = "application/vnd.eclipse.ditto+json";
final String encodedContentType = URLEncoder.encode(contentType, StandardCharsets.UTF_8);
final var device = new DeviceUser("tenant", "device");
final MqttPublishMessage msg = mock(MqttPublishMessage.class);
final MqttProperties msgProperties = new MqttProperties();
msgProperties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value(), "offending/value+json"));
when(msg.topicName()).thenReturn(
String.format("event/tenant/device/?Content-Type=%s&param2=value2&param3=value3", encodedContentType));
when(msg.properties()).thenReturn(
msgProperties);
final MqttContext context = MqttContext.fromPublishPacket(msg, mock(MqttEndpoint.class), span, device);

assertNotNull(context.propertyBag());
assertEquals(contentType, context.propertyBag().getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE));
assertEquals(contentType, context.contentType());
}

/**
* Verifies the <em>content-type</em> value retrieved from the <em>properties</em> in a message.
*/
@Test
public void verifyContentTypeFromProperties() {
final String contentType = "application/vnd.eclipse.ditto+json";
final var device = new DeviceUser("tenant", "device");
final MqttPublishMessage msg = mock(MqttPublishMessage.class);
final MqttProperties msgProperties = new MqttProperties();
msgProperties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value(), contentType));
when(msg.topicName()).thenReturn(
"event/tenant/device/?param2=value2&param3=value3");
when(msg.properties()).thenReturn(
msgProperties);
final MqttContext context = MqttContext.fromPublishPacket(msg, mock(MqttEndpoint.class), span, device);

assertNotNull(context.propertyBag());
assertNull(context.propertyBag().getProperty(MessageHelper.SYS_PROPERTY_CONTENT_TYPE));
assertEquals(contentType, context.contentType());
}

/**
Expand Down

0 comments on commit 4a52110

Please sign in to comment.