Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python38 and refactor tests to use pytest-asyncio #18

Merged
merged 14 commits into from
Mar 9, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "Fixed critical bugs e.g., incorrectly retained messages on qo…
…s0, race conditions on detached sessions, eliminated set_exception on client disconnect tasks, a few debug log message isEnabledFor wrappers."

This reverts commit 1f5efd3.
HerrMuellerluedenscheid authored and FlorianLudwig committed Mar 9, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit e8678d74e00d4eab5a4408795bead8c0ee240344
16 changes: 7 additions & 9 deletions hbmqtt/broker.py
Original file line number Diff line number Diff line change
@@ -693,20 +693,18 @@ async def _broadcast_loop(self):
if 'qos' in broadcast:
qos = broadcast['qos']
if target_session.transitions.state == 'connected':
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
handler = self._get_handler(target_session)
task = asyncio.ensure_future(
handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),
loop=self._loop)
running_tasks.append(task)
elif qos is not None and qos > 0:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
else:
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
retained_message = RetainedApplicationMessage(
broadcast['session'], broadcast['topic'], broadcast['data'], qos)
await target_session.retained_messages.put(retained_message)
2 changes: 1 addition & 1 deletion hbmqtt/client.py
Original file line number Diff line number Diff line change
@@ -443,7 +443,7 @@ def cancel_tasks():
while self.client_tasks:
task = self.client_tasks.popleft()
if not task.done():
task.cancel()
task.set_exception(ClientException("Connection lost"))

self.logger.debug("Watch broker disconnection")
# Wait for disconnection from broker (like connection lost)
6 changes: 3 additions & 3 deletions hbmqtt/mqtt/protocol/handler.py
Original file line number Diff line number Diff line change
@@ -408,18 +408,18 @@ async def _reader_loop(self):
if task:
running_tasks.append(task)
else:
self.logger.debug("No more data (EOF received), stopping reader coro")
self.logger.debug("%s No more data (EOF received), stopping reader coro" % self.session.client_id)
break
except MQTTException:
self.logger.debug("Message discarded")
except asyncio.CancelledError:
self.logger.debug("Task cancelled, reader loop ending")
break
except asyncio.TimeoutError:
self.logger.debug("Input stream read timeout")
self.logger.debug("%s Input stream read timeout" % self.session.client_id)
self.handle_read_timeout()
except NoDataException:
self.logger.debug("No data available")
self.logger.debug("%s No data available" % self.session.client_id)
except BaseException as e:
self.logger.warning("%s Unhandled exception in reader coro: %r" % (type(self).__name__, e))
break
3 changes: 1 addition & 2 deletions hbmqtt/plugins/manager.py
Original file line number Diff line number Diff line change
@@ -147,8 +147,7 @@ def clean_fired_events(future):
if wait:
if tasks:
await asyncio.wait(tasks, loop=self._loop)
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))
self.logger.debug("Plugins len(_fired_events)=%d" % (len(self._fired_events)))

async def map(self, coro, *args, **kwargs):
"""