Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
enyst committed Jan 9, 2025
1 parent 382f94c commit a6501b7
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 2 deletions.
8 changes: 8 additions & 0 deletions openhands/controller/agent_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ def should_step(self, event: Event) -> bool:
if isinstance(event, Action):
if isinstance(event, MessageAction) and event.source == EventSource.USER:
return True
if (
isinstance(event, MessageAction)
and self.get_agent_state() != AgentState.AWAITING_USER_INPUT
):
# TODO: this is fragile, but how else to check if eligible?
return True
if isinstance(event, AgentDelegateAction):
return True
return False
Expand All @@ -251,6 +257,8 @@ def on_event(self, event: Event) -> None:
event (Event): The incoming event to process.
"""

print(f'CONTROLLER{self.id}:on_event: {event.__class__.__name__}')

# If we have a delegate that is not finished or errored, forward events to it
if self.delegate is not None:
delegate_state = self.delegate.get_agent_state()
Expand Down
1 change: 1 addition & 0 deletions openhands/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ async def run_controller(
event_stream.add_event(initial_user_action, EventSource.USER)

def on_event(event: Event):
print(f'MAIN:on_event: {event.__class__.__name__}')
if isinstance(event, AgentStateChangedObservation):
if event.agent_state == AgentState.AWAITING_USER_INPUT:
if exit_on_message:
Expand Down
8 changes: 6 additions & 2 deletions openhands/events/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ class EventStream:
_queue: queue.Queue[Event]
_queue_thread: threading.Thread
_queue_loop: asyncio.AbstractEventLoop | None
_thread_pools: dict[str, dict[str, ThreadPoolExecutor]]
_thread_loops: dict[str, dict[str, asyncio.AbstractEventLoop]]

def __init__(self, sid: str, file_store: FileStore):
self.sid = sid
self.file_store = file_store
self._stop_flag = threading.Event()
self._queue: queue.Queue[Event] = queue.Queue()
self._thread_pools: dict[str, dict[str, ThreadPoolExecutor]] = {}
self._thread_loops: dict[str, dict[str, asyncio.AbstractEventLoop]] = {}
self._thread_pools = {}
self._thread_loops = {}
self._queue_loop = None
self._queue_thread = threading.Thread(target=self._run_queue_loop)
self._queue_thread.daemon = True
Expand Down Expand Up @@ -268,6 +269,7 @@ def add_event(self, event: Event, source: EventSource):
data = event_to_dict(event)
if event.id is not None:
self.file_store.write(self._get_filename_for_id(event.id), json.dumps(data))
print(f'EVENTSTREAM:add_event: {event.__class__.__name__}')
self._queue.put(event)

def _run_queue_loop(self):
Expand All @@ -285,6 +287,8 @@ async def _process_queue(self):
event = self._queue.get(timeout=0.1)
except queue.Empty:
continue

# pass each event to each callback in order
for key in sorted(self._subscribers.keys()):
callbacks = self._subscribers[key]
for callback_id in callbacks:
Expand Down
2 changes: 2 additions & 0 deletions openhands/runtime/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def add_env_vars(self, env_vars: dict[str, str]) -> None:
)

def on_event(self, event: Event) -> None:
print(f'RUNTIME:on_event: {event.__class__.__name__}')
if isinstance(event, Action):
asyncio.get_event_loop().run_until_complete(self._handle_action(event))

Expand All @@ -184,6 +185,7 @@ async def _handle_action(self, event: Action) -> None:
event.timeout = self.config.sandbox.timeout
assert event.timeout is not None
try:
print(f'ASYNC RUNTIME:on_event: {event.__class__.__name__}')
observation: Observation = await call_sync_from_async(
self.run_action, event
)
Expand Down

0 comments on commit a6501b7

Please sign in to comment.