Skip to content

Commit

Permalink
added a break condition to _ctrl_read_forever
Browse files Browse the repository at this point in the history
  • Loading branch information
evalott100 committed Apr 8, 2024
1 parent 8d95812 commit eb15736
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/pandablocks/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class AsyncioClient:
# Control and data ports are now disconnected
"""

_EMPTY_PACKET_RETRIES = 10

def __init__(self, host: str):
self._host = host
self._ctrl_connection = ControlConnection()
Expand Down Expand Up @@ -99,8 +101,21 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

async def _ctrl_read_forever(self, reader: asyncio.StreamReader):
"""Continually read data from the stream reader and add to the data queue.
Args:
reader: The `StreamReader` to read from
"""
empty_packets = 0
while True:
received = await reader.read(4096)
empty_packets = empty_packets + 1 if received == b"" else 0
if empty_packets == self._EMPTY_PACKET_RETRIES:
raise ConnectionError(
f"Received {self._EMPTY_PACKET_RETRIES} empty packets in a row, "
"closing connection"
)

try:
to_send = self._ctrl_connection.receive_bytes(received)
await self._ctrl_stream.write_and_drain(to_send)
Expand Down
43 changes: 43 additions & 0 deletions tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,28 @@ async def test_asyncio_data_timeout(dummy_server_async, fast_dump):
"This goes forever, when it runs out of data we will get our timeout"


@pytest.mark.parametrize("empty_packet_retries", [10, 100, 1000])
async def test_asyncio_empty_frame_error(empty_packet_retries):
dummy_data = iter([b""] * empty_packet_retries)

async def dummy_read(n):
return dummy_data.__next__()

reader = asyncio.StreamReader()
reader.read = dummy_read

client = AsyncioClient("localhost")
client._EMPTY_PACKET_RETRIES = empty_packet_retries
with pytest.raises(
ConnectionError,
match=(
f"Received {empty_packet_retries} empty packets in a "
"row, closing connection"
),
):
await client._ctrl_read_forever(reader)


@pytest.mark.asyncio
async def test_asyncio_connects(dummy_server_async: DummyServer):
async with AsyncioClient("localhost") as client:
Expand Down Expand Up @@ -138,3 +160,24 @@ async def test_asyncio_client_fails_when_cannot_drain(dummy_server_async: DummyS
# Can't use client.close() as it gets endlessly stuck. Do the important part.
assert client._ctrl_task
client._ctrl_task.cancel()


@pytest.mark.asyncio
async def test_asyncio_client_breaks_after_n_empty_bytearrays(
dummy_server_async: DummyServer,
):
"""Test that we don't hang indefinitely when failing to drain data from the OS
send buffer"""

# Note this value is probably OS-dependant. I found it experimentally.
large_data = b"" * 100000000

client = AsyncioClient("localhost")
await client.connect()
await dummy_server_async.close()
with pytest.raises(asyncio.TimeoutError):
await client._ctrl_stream.write_and_drain(large_data, timeout=TIMEOUT)

# Can't use client.close() as it gets endlessly stuck. Do the important part.
assert client._ctrl_task
client._ctrl_task.cancel()

0 comments on commit eb15736

Please sign in to comment.