diff --git a/gaia-sdk-java/gaia-sdk-mqtt/src/main/kotlin/gaia/sdk/mqtt/MqttSensorQueue.kt b/gaia-sdk-java/gaia-sdk-mqtt/src/main/kotlin/gaia/sdk/mqtt/MqttSensorQueue.kt index 981abf98..8a86a32d 100644 --- a/gaia-sdk-java/gaia-sdk-mqtt/src/main/kotlin/gaia/sdk/mqtt/MqttSensorQueue.kt +++ b/gaia-sdk-java/gaia-sdk-mqtt/src/main/kotlin/gaia/sdk/mqtt/MqttSensorQueue.kt @@ -19,6 +19,7 @@ package gaia.sdk.mqtt import com.fasterxml.jackson.databind.ObjectMapper import com.hivemq.client.mqtt.MqttGlobalPublishFilter.SUBSCRIBED import com.hivemq.client.mqtt.MqttWebSocketConfig +import com.hivemq.client.mqtt.datatypes.MqttQos import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect import com.hivemq.client.mqtt.mqtt5.Mqtt5Client import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient @@ -72,7 +73,7 @@ class MqttSensorQueue(private val options: QueueOptions) : ISensorQueue { override fun subscribe(type: IQueueType, header: QueueHeader, consumer: (QueuePayload) -> Unit): Completable { return client.subscribeWith() - .topicFilter(getTopic(type, header)).noLocal(true) + .topicFilter(getTopic(type, header)).noLocal(true).qos(MqttQos.AT_LEAST_ONCE) .applySubscribe() .timeout(options.subscribeTimeout, TimeUnit.SECONDS) .doOnSuccess { @@ -99,6 +100,7 @@ class MqttSensorQueue(private val options: QueueOptions) : ISensorQueue { override fun publish(type: IQueueType, header: QueueHeader, payload: QueuePayload): Completable { val publish = Mqtt5Publish.builder() .topic(getTopic(type, header)) + .qos(MqttQos.AT_LEAST_ONCE) .userProperties() .add("identityId", header.identityId.toString()) .add("userId", options.userId) diff --git a/gaia-sdk-javascript/src/mqtt/MqttSensorQueue.ts b/gaia-sdk-javascript/src/mqtt/MqttSensorQueue.ts index 66b6c9e8..773316ee 100644 --- a/gaia-sdk-javascript/src/mqtt/MqttSensorQueue.ts +++ b/gaia-sdk-javascript/src/mqtt/MqttSensorQueue.ts @@ -33,7 +33,7 @@ export class MqttSensorQueue { deviceId: this.options.deviceId, userId: this.options.userId }); - const opts: IMqttPublishOpts = {properties: {userProperties}}; + const opts: IMqttPublishOpts = {properties: {userProperties}, qos: 1}; this.client.publish(topic, payloadStr, opts, this.mqttCallback(JSON.parse(payloadStr))); } @@ -45,7 +45,7 @@ export class MqttSensorQueue { } if (callback) { console.debug('subscribe to', topic); - this.client.subscribe(topic, callback); + this.client.subscribe(topic, {qos: 1}, callback); this.subscriptions.set(topic, callback); } }