Skip to content

Commit

Permalink
Fix bug flushing Data frames from asyncio client
Browse files Browse the repository at this point in the history
When AsyncioClient.data() was called with non-zero flush_period
then it only yielded the Data after flush when a new packet of data
appeared on the wire or at the end of acquisition. This changes it to be
more eager and yield as soon as it is flushed.
  • Loading branch information
coretl committed Sep 1, 2023
1 parent e24fdfc commit abfaa31
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 28 deletions.
50 changes: 25 additions & 25 deletions pandablocks/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio
import logging
from asyncio.streams import StreamReader, StreamWriter
from collections import deque
from typing import AsyncGenerator, Deque, Dict, Optional
from typing import AsyncGenerator, Dict, Iterable, Optional

from .commands import Command, T
from .connections import ControlConnection, DataConnection
Expand Down Expand Up @@ -142,35 +141,36 @@ async def data(
`asyncio.TimeoutError`
"""

data_stream = _StreamHelper()
await data_stream.connect(self._host, 8889),

stream = _StreamHelper()
connection = DataConnection()
data: Deque[Data] = deque()
reader = data_stream.reader
# Should we flush every FrameData?
flush_every_frame = flush_period is None

async def queue_flushed_data():
data.extend(connection.flush())
queue: asyncio.Queue[Iterable[Data]] = asyncio.Queue()

async def periodic_flush():
if not flush_every_frame:
if flush_period is not None:
while True:
# Every flush_period seconds flush and queue data
await asyncio.gather(
asyncio.sleep(flush_period), queue_flushed_data()
)
await asyncio.sleep(flush_period)
queue.put_nowait(connection.flush())

flush_task = asyncio.create_task(periodic_flush())
async def read_from_stream():
reader = stream.reader
# Should we flush every FrameData?
flush_every_frame = flush_period is None
while True:
try:
recv = await asyncio.wait_for(reader.read(4096), frame_timeout)
except asyncio.TimeoutError as e:
queue.put_nowait([e])
else:
queue.put_nowait(connection.receive_bytes(recv, flush_every_frame))

await stream.connect(self._host, 8889)
await stream.write_and_drain(connection.connect(scaled))
fut = asyncio.gather(periodic_flush(), read_from_stream())
try:
await data_stream.write_and_drain(connection.connect(scaled))
while True:
received = await asyncio.wait_for(reader.read(4096), frame_timeout)
for d in connection.receive_bytes(received, flush_every_frame):
data.append(d)
while data:
yield data.popleft()
for data in await queue.get():
yield data
finally:
flush_task.cancel()
await data_stream.close()
fut.cancel()
await stream.close()
12 changes: 9 additions & 3 deletions tests/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ async def test_asyncio_bad_put_raises(dummy_server_async):


@pytest.mark.asyncio
async def test_asyncio_data(dummy_server_async, fast_dump, fast_dump_expected):
@pytest.mark.parametrize("disarmed", [True, False])
@pytest.mark.parametrize("flush_period", [0.1, None])
async def test_asyncio_data(dummy_server_async, fast_dump, fast_dump_expected, disarmed, flush_period):
if not disarmed:
# simulate getting the data without the END marker as if arm was not pressed
fast_dump = map(lambda x: x.split(b"END")[0], fast_dump)
fast_dump_expected = list(fast_dump_expected)[:-1]
dummy_server_async.data = fast_dump
events = []
async with AsyncioClient("localhost") as client:
async for data in client.data(frame_timeout=1):
async for data in client.data(frame_timeout=1, flush_period=flush_period):
events.append(data)
if len(events) == 9:
if len(events) == len(fast_dump_expected):
break
assert fast_dump_expected == events

Expand Down

0 comments on commit abfaa31

Please sign in to comment.