Skip to content

Commit

Permalink
Fixed moquette-io#573 ByteBuf reference counting
Browse files Browse the repository at this point in the history
The main place where ByteBufs were not released was the inflightWindow.
  • Loading branch information
hylkevds committed May 8, 2021
1 parent f6f3f6e commit f0904f7
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 6 deletions.
9 changes: 8 additions & 1 deletion broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand Down Expand Up @@ -376,8 +377,9 @@ void processPublish(MqttPublishMessage msg) {
}
case EXACTLY_ONCE: {
bindedSession.receivedPublishQos2(messageID, msg);
// Second pass-on, retain
msg.payload().retain();
postOffice.receivedPublishQos2(this, msg, username);
// msg.release();
break;
}
default:
Expand Down Expand Up @@ -426,6 +428,11 @@ void sendIfWritableElseDrop(MqttMessage msg) {
channelFuture = channel.write(msg);
}
channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE);
} else {
// msg not passed on, release.
if (msg instanceof ByteBufHolder) {
((ByteBufHolder) msg).release();
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ void receivedPublishQos2(MQTTConnection connection, MqttPublishMessage mqttPubli
final String clientId = connection.getClientId();
if (!authorizator.canWrite(topic, username, clientId)) {
LOG.error("MQTT client is not authorized to publish on topic: {}", topic);
// msg not passed on, release payload.
payload.release();
return;
}

Expand All @@ -254,6 +256,8 @@ void receivedPublishQos2(MQTTConnection connection, MqttPublishMessage mqttPubli

String clientID = connection.getClientId();
interceptor.notifyTopicPublished(mqttPublishMessage, clientID, username);
// none of the methods above released the payload, do it now.
payload.release();
}

static MqttQoS lowerQosToTheSubscriptionDesired(Subscription sub, MqttQoS qos) {
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ public void internalPublish(MqttPublishMessage msg, final String clientId) {
}
LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
dispatcher.internalPublish(msg);
msg.payload().release();
}

public void stopServer() {
Expand Down
35 changes: 32 additions & 3 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,12 @@ boolean isClean() {
}

public void processPubRec(int packetId) {
inflightWindow.remove(packetId);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
Expand All @@ -200,7 +205,12 @@ public void processPubRec(int packetId) {
}

public void processPubComp(int messageID) {
inflightWindow.remove(messageID);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();

drainQueueToConnection();
Expand All @@ -216,6 +226,9 @@ public void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload)
case AT_MOST_ONCE:
if (connected()) {
mqttConnection.sendPublishNotRetainedQos0(topic, qos, payload);
} else {
// buffer not passed on, release it.
payload.release();
}
break;
case AT_LEAST_ONCE:
Expand All @@ -226,12 +239,16 @@ public void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload)
break;
case FAILURE:
LOG.error("Not admissible");
// buffer not passed on, release it.
payload.release();
}
}

private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) {
if (!connected() && isClean()) {
//pushing messages to disconnected not clean session
//buffer not passed on, release it.
payload.release();
return;
}

Expand All @@ -240,6 +257,9 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) {
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

// second pass-on, add retain
payload.retain();
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);
Expand All @@ -257,6 +277,9 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload) {
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

// second pass-on, add retain
payload.retain();
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);
Expand All @@ -283,7 +306,11 @@ private boolean inflighHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
inflightWindow.remove(ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
drainQueueToConnection();
}
Expand Down Expand Up @@ -347,6 +374,8 @@ private void drainQueueToConnection() {
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
// Second pass-on.
msgPub.payload.retain();
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload, sendPacketId);
Expand Down
11 changes: 11 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
public class SessionRegistry {

public abstract static class EnqueuedMessage {

/**
* Releases any held resources. Must be called when the EnqueuedMessage is no
* longer needed.
*/
public void release() {}
}

public static class PublishedMessage extends EnqueuedMessage {
Expand All @@ -63,6 +69,11 @@ public MqttQoS getPublishingQos() {
public ByteBuf getPayload() {
return payload;
}

@Override
public void release() {
payload.release();
}
}

public static final class PubRelMarker extends EnqueuedMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
Expand Down Expand Up @@ -81,16 +82,18 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel
@Test
public void dropConnectionOnPublishWithInvalidTopicFormat() {
// Connect message with clean session set to true and client id is null.
final ByteBuf payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8));
MqttPublishMessage publish = MqttMessageBuilders.publish()
.topicName("")
.retained(false)
.qos(MqttQoS.AT_MOST_ONCE)
.payload(Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8))).build();
.payload(payload).build();

sut.processPublish(publish);

// Verify
assertFalse(channel.isOpen(), "Connection should be closed by the broker");
payload.release();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,13 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {
connection.processConnect(connectMessage);
ConnectionTestUtils.assertConnectAccepted(channel);

final ByteBuf payload1 = ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!");
this.retainedRepository.retain(new Topic(NEWS_TOPIC), MqttMessageBuilders.publish()
.payload(ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!"))
.payload(payload1)
.qos(AT_LEAST_ONCE)
.build());
// Retaining a msg does not release the payload.
payload1.release();

// Exercise
final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
Expand All @@ -241,6 +244,8 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());
// receivedPublishQos0 does not release payload.
anyPayload.release();

// Verify
assertTrue(retainedRepository.isEmpty(), "QoS0 MUST clean retained message for topic");
Expand Down
7 changes: 7 additions & 0 deletions broker/src/test/java/io/moquette/broker/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public void testPubAckDrainMessagesRemainingInQueue() {

// Verify
assertTrue(queuedMessages.isEmpty(), "Messages should be drained");

// release the rest, to avoid leaking buffers
for (int i = 2; i <= 11; i++) {
client.pubAckReceived(i);
}
client.closeImmediately();
testChannel.close();
}

private void sendQoS1To(Session client, Topic destinationTopic, String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public void shouldNotInternalPublishOnReadBlockedSubscriptionTopic() throws Exce
.payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8)))
.build();

// We will be sending the same message again, retain the payload.
message.payload().retain();
m_server.internalPublish(message, "INTRLPUB");

Awaitility.await().until(m_messagesCollector::isMessageReceived);
Expand Down

0 comments on commit f0904f7

Please sign in to comment.