diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java index aca851419..ec864f3ef 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java @@ -175,9 +175,7 @@ public void processPubComp(Channel channel, MqttMessage msg) { @Override public void processPingReq(Channel channel) { channel.writeAndFlush(pingResp()); - topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> { - exchanger.writeAndFlush(pingReq()); - })); + brokerPool.forEach((k, v) -> v.writeAndFlush(pingReq())); } @Override @@ -186,12 +184,12 @@ public void processDisconnect(Channel channel, MqttMessage msg) { if (log.isDebugEnabled()) { log.debug("[Proxy Disconnect] [{}] ", clientId); } - topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> { - exchanger.writeAndFlush(msg); - exchanger.close(); - })); - topicBrokers.clear(); + brokerPool.forEach((k, v) -> { + v.writeAndFlush(msg); + v.close(); + }); brokerPool.clear(); + topicBrokers.clear(); // When login, checkState(msg) failed, connection is null. Connection connection = NettyUtils.getConnection(channel); if (connection == null) { @@ -211,11 +209,9 @@ public void processConnectionLost(Channel channel) { } Connection connection = NettyUtils.getConnection(channel); connectionManager.removeConnection(connection); - topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> { - exchanger.close(); - })); - topicBrokers.clear(); + brokerPool.forEach((k, v) -> v.close()); brokerPool.clear(); + topicBrokers.clear(); } @Override