Skip to content

Commit

Permalink
Revert "Fixed critical bugs e.g., incorrectly retained messages on qo…
Browse files Browse the repository at this point in the history
…s0, race conditions on detached sessions, eliminated set_exception on client disconnect tasks, a few debug log message isEnabledFor wrappers."

This reverts commit 1f5efd3.
  • Loading branch information
HerrMuellerluedenscheid committed Jan 10, 2021
1 parent 87cc9fc commit 009cc25
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 15 deletions.
16 changes: 7 additions & 9 deletions hbmqtt/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,20 +693,18 @@ async def _broadcast_loop(self):
if 'qos' in broadcast:
qos = broadcast['qos']
if target_session.transitions.state == 'connected':
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
handler = self._get_handler(target_session)
task = asyncio.ensure_future(
handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),
loop=self._loop)
running_tasks.append(task)
elif qos is not None and qos > 0:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
else:
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
retained_message = RetainedApplicationMessage(
broadcast['session'], broadcast['topic'], broadcast['data'], qos)
await target_session.retained_messages.put(retained_message)
Expand Down
2 changes: 1 addition & 1 deletion hbmqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def cancel_tasks():
while self.client_tasks:
task = self.client_tasks.popleft()
if not task.done():
task.cancel()
task.set_exception(ClientException("Connection lost"))

self.logger.debug("Watch broker disconnection")
# Wait for disconnection from broker (like connection lost)
Expand Down
6 changes: 3 additions & 3 deletions hbmqtt/mqtt/protocol/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,18 +408,18 @@ async def _reader_loop(self):
if task:
running_tasks.append(task)
else:
self.logger.debug("No more data (EOF received), stopping reader coro")
self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id)
break
except MQTTException:
self.logger.debug("Message discarded")
except asyncio.CancelledError:
self.logger.debug("Task cancelled, reader loop ending")
break
except asyncio.TimeoutError:
self.logger.debug("Input stream read timeout")
self.logger.debug("%s Input stream read timeout" % self.session.client_id)
self.handle_read_timeout()
except NoDataException:
self.logger.debug("No data available")
self.logger.debug("%s No data available" % self.session.client_id)
except BaseException as e:
self.logger.warning("%s Unhandled exception in reader coro: %r" % (type(self).__name__, e))
break
Expand Down
3 changes: 1 addition & 2 deletions hbmqtt/plugins/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ def clean_fired_events(future):
if wait:
if tasks:
await asyncio.wait(tasks, loop=self._loop)
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))

async def map(self, coro, *args, **kwargs):
"""
Expand Down

0 comments on commit 009cc25

Please sign in to comment.