Skip to content

Commit

Permalink
Add robustness to the PubSub library for IPython
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Sep 6, 2024
1 parent 0a27391 commit 7c99049
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions src/gort/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from __future__ import annotations

import asyncio
import json
import time
import uuid
Expand All @@ -26,6 +27,7 @@

import aio_pika
from aio_pika.abc import AbstractIncomingMessage
from aio_pika.exceptions import AMQPConnectionError
from pydantic import BaseModel, Field

from gort import config
Expand Down Expand Up @@ -143,7 +145,7 @@ async def disconnect(self):
await self.connection.close()

async def __aenter__(self):
if not self.connection or self.connection.is_closed:
if not self.connection or self.connection.is_closed or self.channel.is_closed:
await self.connect()

return self
Expand Down Expand Up @@ -184,15 +186,26 @@ async def publish(self, message: dict, routing_key: str | None = None):
"""

# Give the event loop a chance to run. This should only matter if the
# library is being used in IPython.
await asyncio.sleep(0.1)

if not self.channel or not self.exchange or self.channel.is_closed:
await self.connect()

assert self.exchange, "exchange not defined."

await self.exchange.publish(
aio_pika.Message(body=json.dumps(message).encode()),
routing_key=routing_key or config["services.pubsub.routing_key"],
)
for _ in range(3):
try:
await self.exchange.publish(
aio_pika.Message(body=json.dumps(message).encode()),
routing_key=routing_key or config["services.pubsub.routing_key"],
)
except AMQPConnectionError:
# Try a reconnect.
await self.connect()
else:
break


class GortSubscriber(BasePubSub):
Expand Down

0 comments on commit 7c99049

Please sign in to comment.