From c1a887db218498c28b7d1bfdceda7d8a38b02b57 Mon Sep 17 00:00:00 2001 From: ptg Date: Wed, 13 Nov 2024 16:59:09 -0500 Subject: [PATCH] fix async graph --- ptgctl/async_graph.py | 16 ++++++++++++---- ptgctl/tools/display.py | 22 ++++++++++++++++++++-- ptgctl/tools/mock.py | 4 +++- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/ptgctl/async_graph.py b/ptgctl/async_graph.py index 93f2466..1841ed7 100644 --- a/ptgctl/async_graph.py +++ b/ptgctl/async_graph.py @@ -64,15 +64,23 @@ def __init__(self, maxsize=1, buffersize=8): super().__init__(maxsize) def _init(self, maxsize): - self._queue = collections.deque(maxlen=maxsize) + self._queue = collections.deque(maxlen=1000) + self._latest = collections.deque(maxlen=1) self._buffer = collections.deque(maxlen=self.buffersize) def _put(self, item): + self._latest.append(item) self._queue.append(item) self._buffer.append(item) - def read_buffer(self): - return list(self._buffer) + def read_buffer(self): # TODO: rename + output = [] + for i in range(len(self._queue)): + try: + output.append(self._queue.popleft()) + except IndexError: + pass + return output def push(self, item): full = self.full() @@ -122,4 +130,4 @@ async def main(): # g.add_consumer(sample_consumer, q, sleep=3, name='get3') if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/ptgctl/tools/display.py b/ptgctl/tools/display.py index 734b008..d05bf48 100644 --- a/ptgctl/tools/display.py +++ b/ptgctl/tools/display.py @@ -121,16 +121,34 @@ async def raw(api, stream_id, utf=False, **kw): @util.async2sync @util.interruptable -async def file(api, stream_id, out_dir='', include_timestamps=False, **kw): +async def file(api, stream_id, out_dir='', include_timestamps=False, group_sessions=True, **kw): os.makedirs(out_dir or '.', exist_ok=True) + kw.setdefault('latest', False) + SESSION_ID = 'event:session:id' + if group_sessions: + stream_id += '+' + SESSION_ID + async with api.data_pull_connect(stream_id, **kw) as ws: with contextlib.ExitStack() as stack: files = {} pbars = {} + current_session = api.session.id() + print("current session:", current_session) while True: for sid, ts, data in await ws.recv_data(): + if group_sessions and sid == SESSION_ID: + print("Changing session", sid) + stack.pop_all().close() + files.clear() + current_session = data.decode('utf-8') + if group_sessions and not current_session: + continue if sid not in files: - files[sid] = stack.enter_context(open(os.path.join(out_dir, f'{sid}.txt'), 'w')) + sid_file = os.path.join(out_dir, current_session or 'unknown-session' if group_sessions else '', f'{sid}.txt') + print("Opening file", sid_file) + os.makedirs(os.path.dirname(sid_file), exist_ok=True) + files[sid] = stack.enter_context(open(sid_file, 'w')) + if sid not in pbars: pbars[sid] = tqdm.tqdm(desc=sid) files[sid].write(f"{f'{ts}:' if include_timestamps else ''}{data.decode('utf-8')}\n") pbars[sid].update() diff --git a/ptgctl/tools/mock.py b/ptgctl/tools/mock.py index 336b0a9..6d7cf7c 100644 --- a/ptgctl/tools/mock.py +++ b/ptgctl/tools/mock.py @@ -32,10 +32,12 @@ async def video_loop(api, src=0, pos=0, **kw): DIVS = 4 @util.async2sync -async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepbystep=False, prefix=None): +async def video(api, src=0, pos=0, width=0.3, shape=None, fps=15, speed=1, stepbystep=False, prefix=None, skill=None): '''Send video (by default your webcam) to the API.''' sid = CAM_POS_SIDS[pos] sid = f'{prefix or ""}{sid}' + if skill: + api.session.start_recipe(skill) async with api.data_push_connect(sid, batch=True) as ws: async for im in _video_feed(src, fps, shape, speed=speed): if pos: