Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added message clean up on cancelled publish messages
  • Loading branch information
pazzarpj committed Dec 2, 2022
1 parent 456ed99 commit ecab1de
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 35 deletions.
35 changes: 20 additions & 15 deletions amqtt/mqtt/protocol/client_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,18 @@ async def mqtt_subscribe(self, topics, packet_id):
# Wait for SUBACK is received
waiter = futures.Future()
self._subscriptions_waiter[subscribe.variable_header.packet_id] = waiter
return_codes = await waiter

del self._subscriptions_waiter[subscribe.variable_header.packet_id]
try:
return_codes = await waiter
finally:
del self._subscriptions_waiter[subscribe.variable_header.packet_id]
return return_codes

async def handle_suback(self, suback: SubackPacket):
packet_id = suback.variable_header.packet_id
try:
waiter = self._subscriptions_waiter.get(packet_id)
waiter = self._subscriptions_waiter.get(packet_id)
if waiter is not None:
waiter.set_result(suback.payload.return_codes)
except KeyError:
else:
self.logger.warning(
"Received SUBACK for unknown pending subscription with Id: %s"
% packet_id
Expand All @@ -132,15 +133,17 @@ async def mqtt_unsubscribe(self, topics, packet_id):
await self._send_packet(unsubscribe)
waiter = futures.Future()
self._unsubscriptions_waiter[unsubscribe.variable_header.packet_id] = waiter
await waiter
del self._unsubscriptions_waiter[unsubscribe.variable_header.packet_id]
try:
await waiter
finally:
del self._unsubscriptions_waiter[unsubscribe.variable_header.packet_id]

async def handle_unsuback(self, unsuback: UnsubackPacket):
packet_id = unsuback.variable_header.packet_id
try:
waiter = self._unsubscriptions_waiter.get(packet_id)
waiter = self._unsubscriptions_waiter.get(packet_id)
if waiter is not None:
waiter.set_result(None)
except KeyError:
else:
self.logger.warning(
"Received UNSUBACK for unknown pending subscription with Id: %s"
% packet_id
Expand All @@ -152,10 +155,12 @@ async def mqtt_disconnect(self):

async def mqtt_ping(self):
ping_packet = PingReqPacket()
await self._send_packet(ping_packet)
resp = await self._pingresp_queue.get()
if self._ping_task:
self._ping_task = None
try:
await self._send_packet(ping_packet)
resp = await self._pingresp_queue.get()
finally:
if self._ping_task:
self._ping_task = None
return resp

async def handle_pingresp(self, pingresp: PingRespPacket):
Expand Down
33 changes: 19 additions & 14 deletions amqtt/mqtt/protocol/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ async def _handle_qos1_message_flow(self, app_message):
# Wait for puback
waiter = asyncio.Future()
self._puback_waiters[app_message.packet_id] = waiter
await waiter
del self._puback_waiters[app_message.packet_id]
app_message.puback_packet = waiter.result()

# Discard inflight message
del self.session.inflight_out[app_message.packet_id]
try:
await waiter
app_message.puback_packet = waiter.result()
finally:
del self._puback_waiters[app_message.packet_id]
# Discard inflight message
del self.session.inflight_out[app_message.packet_id]
elif app_message.direction == INCOMING:
# Initiate delivery
self.logger.debug("Add message to delivery")
Expand Down Expand Up @@ -351,21 +352,25 @@ async def _handle_qos2_message_flow(self, app_message):
raise AMQTTException(message)
waiter = asyncio.Future()
self._pubrec_waiters[app_message.packet_id] = waiter
await waiter
del self._pubrec_waiters[app_message.packet_id]
app_message.pubrec_packet = waiter.result()
try:
await waiter
app_message.pubrec_packet = waiter.result()
finally:
del self._pubrec_waiters[app_message.packet_id]
del self.session.inflight_out[app_message.packet_id]
if not app_message.pubcomp_packet:
# Send pubrel
app_message.pubrel_packet = PubrelPacket.build(app_message.packet_id)
await self._send_packet(app_message.pubrel_packet)
# Wait for PUBCOMP
waiter = asyncio.Future()
self._pubcomp_waiters[app_message.packet_id] = waiter
await waiter
del self._pubcomp_waiters[app_message.packet_id]
app_message.pubcomp_packet = waiter.result()
# Discard inflight message
del self.session.inflight_out[app_message.packet_id]
try:
await waiter
app_message.pubcomp_packet = waiter.result()
finally:
del self._pubcomp_waiters[app_message.packet_id]
del self.session.inflight_out[app_message.packet_id]
elif app_message.direction == INCOMING:
self.session.inflight_in[app_message.packet_id] = app_message
# Send pubrec
Expand Down
11 changes: 5 additions & 6 deletions amqtt/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,15 @@ def _init_states(self):

@property
def next_packet_id(self):
self._packet_id += 1
if self._packet_id > 65535:
self._packet_id = 1
self._packet_id = (self._packet_id % 65535) + 1
limit = self._packet_id
while (
self._packet_id in self.inflight_in or self._packet_id in self.inflight_out
):
self._packet_id += 1
if self._packet_id > 65535:
self._packet_id = (self._packet_id % 65535) + 1
if self._packet_id == limit:
raise AMQTTException(
"More than 65525 messages pending. No free packet ID"
"More than 65535 messages pending. No free packet ID"
)

return self._packet_id
Expand Down

0 comments on commit ecab1de

Please sign in to comment.