diff --git a/broker/src/main/java/io/moquette/BrokerConstants.java b/broker/src/main/java/io/moquette/BrokerConstants.java index 086d032c7..4949fd462 100644 --- a/broker/src/main/java/io/moquette/BrokerConstants.java +++ b/broker/src/main/java/io/moquette/BrokerConstants.java @@ -82,6 +82,9 @@ public final class BrokerConstants { public static final String STORAGE_CLASS_NAME = "storage_class"; + public static final int FLIGHT_BEFORE_RESEND_MS = 5_000; + public static final int INFLIGHT_WINDOW_SIZE = 10; + private BrokerConstants() { } } diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 4247d2711..140d7b28a 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -15,6 +15,8 @@ */ package io.moquette.broker; +import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS; +import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE; import io.moquette.broker.subscriptions.Subscription; import io.moquette.broker.subscriptions.Topic; import io.netty.buffer.ByteBuf; @@ -34,8 +36,6 @@ class Session { private static final Logger LOG = LoggerFactory.getLogger(Session.class); - private static final int FLIGHT_BEFORE_RESEND_MS = 5_000; - private static final int INFLIGHT_WINDOW_SIZE = 10; static class InFlightPacket implements Delayed { diff --git a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTests.java b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java similarity index 79% rename from broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTests.java rename to broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java index 1a6f91ceb..026dabbc9 100644 --- a/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTests.java +++ b/broker/src/test/java/io/moquette/integration/ServerLowlevelMessagesIntegrationTest.java @@ -16,6 +16,7 @@ package io.moquette.integration; +import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS; import io.moquette.broker.Server; import io.moquette.broker.config.IConfig; import io.moquette.broker.config.MemoryConfig; @@ -38,11 +39,12 @@ import java.util.concurrent.TimeUnit; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static java.nio.charset.StandardCharsets.UTF_8; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.jupiter.api.Assertions.*; -public class ServerLowlevelMessagesIntegrationTests { +public class ServerLowlevelMessagesIntegrationTest { - private static final Logger LOG = LoggerFactory.getLogger(ServerLowlevelMessagesIntegrationTests.class); + private static final Logger LOG = LoggerFactory.getLogger(ServerLowlevelMessagesIntegrationTest.class); static MqttClientPersistence s_dataStore; Server m_server; Client m_client; @@ -171,4 +173,43 @@ public void testWillMessageIsPublishedOnClientBadDisconnection() throws Interrup m_willSubscriber.disconnect(); } + @Test + public void testResendNotAckedPublishes() throws MqttException, InterruptedException { + LOG.info("*** testResendNotAckedPublishes ***"); + String topic = "/test"; + + MqttClient subscriber = new MqttClient("tcp://localhost:1883", "Subscriber"); + MqttClient publisher = new MqttClient("tcp://localhost:1883", "Publisher"); + + try { + subscriber.connect(); + publisher.connect(); + + AtomicBoolean isFirst = new AtomicBoolean(true); + AtomicBoolean receivedPublish = new AtomicBoolean(false); + subscriber.subscribe(topic, 1, (String topic1, org.eclipse.paho.client.mqttv3.MqttMessage message) -> { + if (isFirst.getAndSet(false)) { + // wait to trigger resending PUBLISH + TimeUnit.SECONDS.sleep(FLIGHT_BEFORE_RESEND_MS * 2); + } else { + receivedPublish.set(true); + } + }); + + publisher.publish(topic, "hello".getBytes(), 1, false); + Awaitility.await("Waiting for resend.") + .atMost(FLIGHT_BEFORE_RESEND_MS * 3, TimeUnit.MILLISECONDS) + .pollDelay(FLIGHT_BEFORE_RESEND_MS * 2, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilTrue(receivedPublish); + } finally { + try { + if (subscriber.isConnected()) { + subscriber.disconnect(); + } + } finally { + publisher.disconnect(); + } + } + } }