Skip to content

Commit

Permalink
fix(backend/postgres): allow concurrent pubs
Browse files Browse the repository at this point in the history
This fix adds a lock (asyncio.Event based) to avoid
`asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress`

fixes #22
  • Loading branch information
pwoolvett committed May 6, 2022
1 parent 9255c29 commit 01c33e9
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion broadcaster/_backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def __init__(self, url: str):

async def connect(self) -> None:
self._conn = await asyncpg.connect(self._url)
self._event = asyncio.Event()
self._event.set()
self._listen_queue: asyncio.Queue = asyncio.Queue()

async def disconnect(self) -> None:
Expand All @@ -25,7 +27,12 @@ async def unsubscribe(self, channel: str) -> None:
await self._conn.remove_listener(channel, self._listener)

async def publish(self, channel: str, message: str) -> None:
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
await self._event.wait()
self._event.clear()
try:
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
finally:
self._event.set()

def _listener(self, *args: Any) -> None:
connection, pid, channel, payload = args
Expand Down

0 comments on commit 01c33e9

Please sign in to comment.