Skip to content

Commit

Permalink
Automatically re-create queue if durable changed (#1637)
Browse files Browse the repository at this point in the history
Co-authored-by: Jan Klopper <[email protected]>
  • Loading branch information
dekkers and underdarknl authored Aug 22, 2023
1 parent 0915c90 commit 86594e3
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion mula/scheduler/connectors/listeners/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,19 @@ def dispatch(self, body: bytes) -> None:
raise NotImplementedError

def basic_consume(self, queue: str, durable: bool, prefetch_count: int) -> None:
self.channel.queue_declare(queue=queue, durable=durable)
try:
self.channel.queue_declare(queue=queue, durable=durable)
except pika.exceptions.ChannelClosedByBroker as exc:
if "inequivalent arg 'durable'" in exc.reply_text:
# Queue changed from non-durable to durable. Given that
# previously they weren't durable and contents would also be
# lost if RabbitMQ restarted, we will just delete the queue and
# recreate it to provide for a smooth upgrade.
self.channel = self.connection.channel()
self.channel.queue_delete(queue=queue)
self.channel.queue_declare(queue=queue, durable=durable)
else:
raise
self.channel.basic_qos(prefetch_count=prefetch_count)
self.channel.basic_consume(queue, on_message_callback=self.callback)
self.channel.start_consuming()
Expand Down

0 comments on commit 86594e3

Please sign in to comment.