diff --git a/openhands/events/stream.py b/openhands/events/stream.py index c67940451423..fd5a6618d79c 100644 --- a/openhands/events/stream.py +++ b/openhands/events/stream.py @@ -1,6 +1,5 @@ import asyncio import threading -from concurrent.futures import ThreadPoolExecutor from dataclasses import field from datetime import datetime from enum import Enum @@ -67,7 +66,6 @@ def __init__(self, sid: str, file_store: FileStore, num_workers: int = 1): self.sid = sid self.file_store = file_store self._queue: Queue[Event] = Queue() - self._thread_pools: dict[str, dict[str, ThreadPoolExecutor]] = {} self._queue_thread = threading.Thread(target=self._run_queue_loop) self._queue_thread.daemon = True self._queue_thread.start() @@ -89,9 +87,6 @@ def __post_init__(self) -> None: if id >= self._cur_id: self._cur_id = id + 1 - def _init_thread_loop(self): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) def _get_filename_for_id(self, id: int) -> str: return get_conversation_event_filename(self.sid, id) @@ -174,10 +169,8 @@ def get_latest_event_id(self) -> int: def subscribe( self, subscriber_id: EventStreamSubscriber, callback: Callable, callback_id: str ): - pool = ThreadPoolExecutor(max_workers=1, initializer=self._init_thread_loop) if subscriber_id not in self._subscribers: self._subscribers[subscriber_id] = {} - self._thread_pools[subscriber_id] = {} if callback_id in self._subscribers[subscriber_id]: raise ValueError( @@ -185,7 +178,6 @@ def subscribe( ) self._subscribers[subscriber_id][callback_id] = callback - self._thread_pools[subscriber_id][callback_id] = pool def unsubscribe(self, subscriber_id: EventStreamSubscriber, callback_id: str): if subscriber_id not in self._subscribers: @@ -226,8 +218,9 @@ async def _process_queue(self): callbacks = self._subscribers[key] for callback_id in callbacks: callback = callbacks[callback_id] - pool = self._thread_pools[key][callback_id] - pool.submit(callback, event) + thread = threading.Thread(target=callback, args=(event,)) + thread.daemon = True + thread.start() def _callback(self, callback: Callable, event: Event): asyncio.run(callback(event))