Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove while True in AgentController #5868

Merged
merged 69 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
8082558
fix for loops that hang after never getting a connection
rbren Dec 27, 2024
993b158
create_task
rbren Dec 27, 2024
17e9d1d
first pass
rbren Dec 27, 2024
91750d5
add logs
rbren Dec 27, 2024
4f601b1
Merge branch 'main' into rb/controller-loop
rbren Dec 27, 2024
a563b04
add logs
rbren Dec 27, 2024
9200520
remove debug
rbren Dec 27, 2024
e4ae68a
sleep to give control
rbren Dec 27, 2024
017b206
remove debug
rbren Dec 27, 2024
0e6d123
logspam
rbren Dec 27, 2024
7b15214
remove spam
rbren Dec 27, 2024
edda57e
logspam
rbren Dec 27, 2024
48d9f9c
Merge branch 'main' into rb/controller-loop
rbren Dec 27, 2024
b6b1977
remove threading
rbren Dec 27, 2024
8dbd05d
sync event callbacks
rbren Dec 27, 2024
cd4d588
remove print
rbren Dec 27, 2024
afe3a9a
remvoe sleep
rbren Dec 27, 2024
36b8b52
fix on_events
rbren Dec 27, 2024
47a2526
es queue
rbren Dec 27, 2024
99b7fef
debug info
rbren Dec 27, 2024
4e3d88f
give each subscriber its own thread
rbren Dec 27, 2024
9cfe028
everything working
rbren Dec 27, 2024
4086da6
logspam
rbren Dec 27, 2024
df8f685
should_continue
rbren Dec 27, 2024
86e5632
fix: call on_event directly in security tests
openhands-agent Dec 28, 2024
a12b544
fix tests
rbren Dec 30, 2024
c2d8251
implement should_step
rbren Dec 30, 2024
79cfd58
move fn
rbren Dec 30, 2024
8644dd3
delint
rbren Dec 30, 2024
4043464
fix error reporting
rbren Dec 30, 2024
ba75849
fix mock
rbren Dec 30, 2024
b0d75cf
Merge branch 'main' into rb/controller-loop
rbren Dec 30, 2024
22e2c51
Merge branch 'main' into rb/controller-loop
rbren Dec 30, 2024
0dbcbaa
fix remote runtime
rbren Dec 30, 2024
e79cf23
Merge branch 'rb/fix-runtime' into rb/controller-loop
rbren Dec 30, 2024
c9ecef9
refactor: replace ThreadPoolExecutor with simple Thread in EventStream
openhands-agent Dec 30, 2024
b55c787
Revert "refactor: replace ThreadPoolExecutor with simple Thread in Ev…
rbren Dec 30, 2024
2ebab02
Merge branch 'main' into rb/controller-loop
rbren Dec 30, 2024
4ff94a0
Update openhands/events/stream.py
rbren Dec 30, 2024
effa01b
remove some more events from step
rbren Dec 30, 2024
33597a0
Update openhands/events/stream.py
rbren Dec 31, 2024
1e65436
Update tests/unit/test_agent_controller.py
rbren Dec 31, 2024
a2b2611
handle auth exception
rbren Dec 31, 2024
bffbd7b
lint
rbren Dec 31, 2024
3359760
add TODO
rbren Dec 31, 2024
7a18867
Merge branch 'main' into rb/controller-loop
rbren Dec 31, 2024
19b26cd
Revert "Add resizable and collapsible panel layout (#5926)"
rbren Dec 31, 2024
df85a70
handle all litellm erros
rbren Dec 31, 2024
6995e23
Fix a few issues with settings
amanape Dec 31, 2024
12efd30
stop sending args from FE
rbren Dec 31, 2024
f4e9cda
fix fn call
rbren Dec 31, 2024
35f59f1
remove logs
rbren Dec 31, 2024
060461f
dont use defaults for stuff defined in settings
rbren Dec 31, 2024
acb3a5c
override full settings object
rbren Dec 31, 2024
98fc5b4
remove unused var
rbren Dec 31, 2024
0143c60
Bring back merge logic
amanape Dec 31, 2024
89b09e9
Update openhands/server/routes/settings.py
amanape Dec 31, 2024
2f8bdfb
fix comment
rbren Dec 31, 2024
fd51bbc
Merge branch 'fix/settings' into rb/controller-loop
rbren Dec 31, 2024
1949264
Merge branch 'main' into rb/controller-loop
rbren Dec 31, 2024
e6679f3
revert resize
rbren Dec 31, 2024
e0d94a8
Merge branch 'main' into rb/controller-loop
rbren Dec 31, 2024
9fec71e
remove logout
rbren Dec 31, 2024
05c2313
lint
rbren Dec 31, 2024
98a1aa0
Merge branch 'rb/fix-logout' into rb/controller-loop
rbren Dec 31, 2024
5bf3403
better get_gh_user
rbren Dec 31, 2024
1ac5800
add debug info
rbren Dec 31, 2024
d337405
Merge branch 'rb/fix-hanging-loops' into rb/controller-loop
rbren Dec 31, 2024
ac520c1
more debug info
rbren Dec 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 39 additions & 30 deletions openhands/controller/agent_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
)
from openhands.events.serialization.event import truncate_content
from openhands.llm.llm import LLM
from openhands.utils.shutdown_listener import should_continue

# note: RESUME is only available on web GUI
TRAFFIC_CONTROL_REMINDER = (
Expand All @@ -64,7 +63,6 @@ class AgentController:
confirmation_mode: bool
agent_to_llm_config: dict[str, LLMConfig]
agent_configs: dict[str, AgentConfig]
agent_task: asyncio.Future | None = None
parent: 'AgentController | None' = None
delegate: 'AgentController | None' = None
_pending_action: Action | None = None
Expand Down Expand Up @@ -109,7 +107,6 @@ def __init__(
headless_mode: Whether the agent is run in headless mode.
status_callback: Optional callback function to handle status updates.
"""
self._step_lock = asyncio.Lock()
self.id = sid
self.agent = agent
self.headless_mode = headless_mode
Expand Down Expand Up @@ -199,32 +196,44 @@ async def _react_to_exception(
err_id = 'STATUS$ERROR_LLM_AUTHENTICATION'
self.status_callback('error', err_id, type(e).__name__ + ': ' + str(e))

async def start_step_loop(self):
"""The main loop for the agent's step-by-step execution."""
self.log('info', 'Starting step loop...')
while True:
if not self._is_awaiting_observation() and not should_continue():
break
if self._closed:
break
try:
await self._step()
except asyncio.CancelledError:
self.log('debug', 'AgentController task was cancelled')
break
except Exception as e:
traceback.print_exc()
self.log('error', f'Error while running the agent: {e}')
await self._react_to_exception(e)
def step(self):
asyncio.create_task(self._step_with_exception_handling())

await asyncio.sleep(0.1)
async def _step_with_exception_handling(self):
try:
await self._step()
except Exception as e:
traceback.print_exc()
self.log('error', f'Error while running the agent: {e}')
reported = RuntimeError(
'There was an unexpected error while running the agent.'
)
if isinstance(e, litellm.LLMError):
reported = e
await self._react_to_exception(reported)

async def on_event(self, event: Event) -> None:
def should_step(self, event: Event) -> bool:
if isinstance(event, Action):
if isinstance(event, MessageAction) and event.source == EventSource.USER:
return True
return False
if isinstance(event, Observation):
if isinstance(event, NullObservation) or isinstance(
event, AgentStateChangedObservation
):
return False
return True
return False

def on_event(self, event: Event) -> None:
"""Callback from the event stream. Notifies the controller of incoming events.

Args:
event (Event): The incoming event to process.
"""
asyncio.get_event_loop().run_until_complete(self._on_event(event))

async def _on_event(self, event: Event) -> None:
if hasattr(event, 'hidden') and event.hidden:
return

Expand All @@ -237,6 +246,9 @@ async def on_event(self, event: Event) -> None:
elif isinstance(event, Observation):
await self._handle_observation(event)

if self.should_step(event):
self.step()

async def _handle_action(self, action: Action) -> None:
"""Handles actions from the event stream.

Expand Down Expand Up @@ -465,19 +477,16 @@ async def start_delegate(self, action: AgentDelegateAction) -> None:
async def _step(self) -> None:
"""Executes a single step of the parent or delegate agent. Detects stuck agents and limits on the number of iterations and the task budget."""
if self.get_agent_state() != AgentState.RUNNING:
await asyncio.sleep(1)
return

if self._pending_action:
await asyncio.sleep(1)
return

if self.delegate is not None:
assert self.delegate != self
if self.delegate.get_agent_state() == AgentState.PAUSED:
# no need to check too often
await asyncio.sleep(1)
else:
# TODO this conditional will always be false, because the parent controllers are unsubscribed
# remove if it's still useless when delegation is reworked
if self.delegate.get_agent_state() != AgentState.PAUSED:
await self._delegate_step()
return

Expand All @@ -487,7 +496,6 @@ async def _step(self) -> None:
extra={'msg_type': 'STEP'},
)

# check if agent hit the resources limit
stop_step = False
if self.state.iteration >= self.state.max_iterations:
stop_step = await self._handle_traffic_control(
Expand All @@ -500,6 +508,7 @@ async def _step(self) -> None:
'budget', current_cost, self.max_budget_per_task
)
if stop_step:
logger.warning('Stopping agent due to traffic control')
return

if self._is_stuck():
Expand Down Expand Up @@ -945,7 +954,7 @@ def __repr__(self):
return (
f'AgentController(id={self.id}, agent={self.agent!r}, '
f'event_stream={self.event_stream!r}, '
f'state={self.state!r}, agent_task={self.agent_task!r}, '
f'state={self.state!r}, '
f'delegate={self.delegate!r}, _pending_action={self._pending_action!r})'
)

Expand Down
8 changes: 0 additions & 8 deletions openhands/core/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ async def run_agent_until_done(
the agent until it reaches a terminal state.
Note that runtime must be connected before being passed in here.
"""
controller.agent_task = asyncio.create_task(controller.start_step_loop())

def status_callback(msg_type, msg_id, msg):
if msg_type == 'error':
Expand All @@ -41,10 +40,3 @@ def status_callback(msg_type, msg_id, msg):

while controller.state.agent_state not in end_states:
await asyncio.sleep(1)

if not controller.agent_task.done():
controller.agent_task.cancel()
try:
await controller.agent_task
except asyncio.CancelledError:
pass
58 changes: 39 additions & 19 deletions openhands/events/stream.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import threading
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from enum import Enum
from queue import Queue
from typing import Callable, Iterable

from openhands.core.logger import openhands_logger as logger
Expand Down Expand Up @@ -52,15 +53,26 @@ async def __aiter__(self):
yield await loop.run_in_executor(None, lambda e=event: e) # type: ignore


@dataclass
class EventStream:
sid: str
file_store: FileStore
# For each subscriber ID, there is a map of callback functions - useful
# when there are multiple listeners
_subscribers: dict[str, dict[str, Callable]] = field(default_factory=dict)
_subscribers: dict[str, dict[str, Callable]]
_cur_id: int = 0
_lock: threading.Lock = field(default_factory=threading.Lock)
_lock: threading.Lock

def __init__(self, sid: str, file_store: FileStore, num_workers: int = 1):
self.sid = sid
self.file_store = file_store
self._queue: Queue[Event] = Queue()
self._thread_pools: dict[str, dict[str, ThreadPoolExecutor]] = {}
self._queue_thread = threading.Thread(target=self._run_queue_loop)
self._queue_thread.daemon = True
self._queue_thread.start()
self._subscribers = {}
self._lock = threading.Lock()
self._cur_id = 0

def __post_init__(self) -> None:
try:
Expand All @@ -76,6 +88,10 @@ def __post_init__(self) -> None:
if id >= self._cur_id:
self._cur_id = id + 1

def _init_thread_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

def _get_filename_for_id(self, id: int) -> str:
return get_conversation_event_filename(self.sid, id)

Expand Down Expand Up @@ -157,15 +173,18 @@ def get_latest_event_id(self) -> int:
def subscribe(
self, subscriber_id: EventStreamSubscriber, callback: Callable, callback_id: str
):
pool = ThreadPoolExecutor(max_workers=1, initializer=self._init_thread_loop)
if subscriber_id not in self._subscribers:
self._subscribers[subscriber_id] = {}
self._thread_pools[subscriber_id] = {}

if callback_id in self._subscribers[subscriber_id]:
raise ValueError(
f'Callback ID on subscriber {subscriber_id} already exists: {callback_id}'
)

self._subscribers[subscriber_id][callback_id] = callback
self._thread_pools[subscriber_id][callback_id] = pool

def unsubscribe(self, subscriber_id: EventStreamSubscriber, callback_id: str):
if subscriber_id not in self._subscribers:
Expand All @@ -179,13 +198,6 @@ def unsubscribe(self, subscriber_id: EventStreamSubscriber, callback_id: str):
del self._subscribers[subscriber_id][callback_id]

def add_event(self, event: Event, source: EventSource):
try:
asyncio.get_running_loop().create_task(self._async_add_event(event, source))
except RuntimeError:
# No event loop running...
asyncio.run(self._async_add_event(event, source))

async def _async_add_event(self, event: Event, source: EventSource):
if hasattr(event, '_id') and event.id is not None:
raise ValueError(
'Event already has an ID. It was probably added back to the EventStream from inside a handler, trigging a loop.'
Expand All @@ -199,14 +211,22 @@ async def _async_add_event(self, event: Event, source: EventSource):
data = event_to_dict(event)
if event.id is not None:
self.file_store.write(self._get_filename_for_id(event.id), json.dumps(data))
tasks = []
for key in sorted(self._subscribers.keys()):
callbacks = self._subscribers[key]
for callback_id in callbacks:
callback = callbacks[callback_id]
tasks.append(asyncio.create_task(callback(event)))
if tasks:
await asyncio.wait(tasks)
self._queue.put(event)

def _run_queue_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._process_queue())

async def _process_queue(self):
while should_continue():
event = self._queue.get()
for key in sorted(self._subscribers.keys()):
callbacks = self._subscribers[key]
for callback_id in callbacks:
callback = callbacks[callback_id]
pool = self._thread_pools[key][callback_id]
pool.submit(callback, event)

def _callback(self, callback: Callable, event: Event):
asyncio.run(callback(event))
Expand Down
65 changes: 34 additions & 31 deletions openhands/runtime/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import atexit
import copy
import json
Expand Down Expand Up @@ -167,38 +168,40 @@ def add_env_vars(self, env_vars: dict[str, str]) -> None:
f'Failed to add env vars [{env_vars}] to environment: {obs.content}'
)

async def on_event(self, event: Event) -> None:
def on_event(self, event: Event) -> None:
if isinstance(event, Action):
# set timeout to default if not set
if event.timeout is None:
event.timeout = self.config.sandbox.timeout
assert event.timeout is not None
try:
observation: Observation = await call_sync_from_async(
self.run_action, event
)
except Exception as e:
err_id = ''
if isinstance(e, ConnectionError) or isinstance(
e, AgentRuntimeDisconnectedError
):
err_id = 'STATUS$ERROR_RUNTIME_DISCONNECTED'
logger.error(
'Unexpected error while running action',
exc_info=True,
stack_info=True,
)
self.log('error', f'Problematic action: {str(event)}')
self.send_error_message(err_id, str(e))
self.close()
return

observation._cause = event.id # type: ignore[attr-defined]
observation.tool_call_metadata = event.tool_call_metadata

# this might be unnecessary, since source should be set by the event stream when we're here
source = event.source if event.source else EventSource.AGENT
self.event_stream.add_event(observation, source) # type: ignore[arg-type]
asyncio.get_event_loop().run_until_complete(self._handle_action(event))

async def _handle_action(self, event: Action) -> None:
if event.timeout is None:
event.timeout = self.config.sandbox.timeout
assert event.timeout is not None
try:
observation: Observation = await call_sync_from_async(
self.run_action, event
)
except Exception as e:
err_id = ''
if isinstance(e, ConnectionError) or isinstance(
e, AgentRuntimeDisconnectedError
):
err_id = 'STATUS$ERROR_RUNTIME_DISCONNECTED'
logger.error(
'Unexpected error while running action',
exc_info=True,
stack_info=True,
)
self.log('error', f'Problematic action: {str(event)}')
self.send_error_message(err_id, str(e))
self.close()
return

observation._cause = event.id # type: ignore[attr-defined]
observation.tool_call_metadata = event.tool_call_metadata

# this might be unnecessary, since source should be set by the event stream when we're here
source = event.source if event.source else EventSource.AGENT
self.event_stream.add_event(observation, source) # type: ignore[arg-type]

def clone_repo(self, github_token: str | None, selected_repository: str | None):
if not github_token or not selected_repository:
Expand Down
Loading
Loading