Skip to content

Commit

Permalink
Fix subscribes to multiple topics, and sends multiple heartbeat packe…
Browse files Browse the repository at this point in the history
…ts (#241). (#244)
  • Loading branch information
Technoboy- committed Nov 1, 2021
1 parent 2829258 commit 5afca3f
Showing 1 changed file with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ public void processPubComp(Channel channel, MqttMessage msg) {
@Override
public void processPingReq(Channel channel) {
channel.writeAndFlush(pingResp());
topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> {
exchanger.writeAndFlush(pingReq());
}));
brokerPool.forEach((k, v) -> v.writeAndFlush(pingReq()));
}

@Override
Expand All @@ -186,12 +184,12 @@ public void processDisconnect(Channel channel, MqttMessage msg) {
if (log.isDebugEnabled()) {
log.debug("[Proxy Disconnect] [{}] ", clientId);
}
topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> {
exchanger.writeAndFlush(msg);
exchanger.close();
}));
topicBrokers.clear();
brokerPool.forEach((k, v) -> {
v.writeAndFlush(msg);
v.close();
});
brokerPool.clear();
topicBrokers.clear();
// When login, checkState(msg) failed, connection is null.
Connection connection = NettyUtils.getConnection(channel);
if (connection == null) {
Expand All @@ -211,11 +209,9 @@ public void processConnectionLost(Channel channel) {
}
Connection connection = NettyUtils.getConnection(channel);
connectionManager.removeConnection(connection);
topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> {
exchanger.close();
}));
topicBrokers.clear();
brokerPool.forEach((k, v) -> v.close());
brokerPool.clear();
topicBrokers.clear();
}

@Override
Expand Down

0 comments on commit 5afca3f

Please sign in to comment.