From cbc31c54b6858808b7dca39dcf96e7d7e725f382 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Thu, 18 Feb 2021 13:18:11 +0100 Subject: [PATCH] Added a test for #573 --- .../java/io/moquette/BrokerConstants.java | 3 ++ .../main/java/io/moquette/broker/Session.java | 2 - ...ServerLowlevelMessagesIntegrationTest.java | 41 +++++++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/io/moquette/BrokerConstants.java b/broker/src/main/java/io/moquette/BrokerConstants.java index 086d032c7..4949fd462 100644 --- a/broker/src/main/java/io/moquette/BrokerConstants.java +++ b/broker/src/main/java/io/moquette/BrokerConstants.java @@ -82,6 +82,9 @@ public final class BrokerConstants { public static final String STORAGE_CLASS_NAME = "storage_class"; + public static final int FLIGHT_BEFORE_RESEND_MS = 5_000; + public static final int INFLIGHT_WINDOW_SIZE = 10; + private BrokerConstants() { } } diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 4247d2711..155d62b94 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -34,8 +34,6 @@ class Session { private static final Logger LOG = LoggerFactory.getLogger(Session.class); - private static final int FLIGHT_BEFORE_RESEND_MS = 5_000; - private static final int INFLIGHT_WINDOW_SIZE = 10; static class InFlightPacket implements Delayed { diff --git a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java index c7a1d2e6d..026dabbc9 100644 --- a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java @@ -16,6 +16,7 @@ package io.moquette.integration; +import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS; import io.moquette.broker.Server; import io.moquette.broker.config.IConfig; import io.moquette.broker.config.MemoryConfig; @@ -38,6 +39,7 @@ import java.util.concurrent.TimeUnit; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static java.nio.charset.StandardCharsets.UTF_8; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.jupiter.api.Assertions.*; public class ServerLowlevelMessagesIntegrationTest { @@ -171,4 +173,43 @@ public void testWillMessageIsPublishedOnClientBadDisconnection() throws Interrup m_willSubscriber.disconnect(); } + @Test + public void testResendNotAckedPublishes() throws MqttException, InterruptedException { + LOG.info("*** testResendNotAckedPublishes ***"); + String topic = "/test"; + + MqttClient subscriber = new MqttClient("tcp://localhost:1883", "Subscriber"); + MqttClient publisher = new MqttClient("tcp://localhost:1883", "Publisher"); + + try { + subscriber.connect(); + publisher.connect(); + + AtomicBoolean isFirst = new AtomicBoolean(true); + AtomicBoolean receivedPublish = new AtomicBoolean(false); + subscriber.subscribe(topic, 1, (String topic1, org.eclipse.paho.client.mqttv3.MqttMessage message) -> { + if (isFirst.getAndSet(false)) { + // wait to trigger resending PUBLISH + TimeUnit.SECONDS.sleep(FLIGHT_BEFORE_RESEND_MS * 2); + } else { + receivedPublish.set(true); + } + }); + + publisher.publish(topic, "hello".getBytes(), 1, false); + Awaitility.await("Waiting for resend.") + .atMost(FLIGHT_BEFORE_RESEND_MS * 3, TimeUnit.MILLISECONDS) + .pollDelay(FLIGHT_BEFORE_RESEND_MS * 2, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilTrue(receivedPublish); + } finally { + try { + if (subscriber.isConnected()) { + subscriber.disconnect(); + } + } finally { + publisher.disconnect(); + } + } + } }