Skip to content

Commit

Permalink
fix message passing, flow
Browse files Browse the repository at this point in the history
  • Loading branch information
enyst committed Jan 9, 2025
1 parent 5281bb3 commit 419f568
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 25 deletions.
7 changes: 7 additions & 0 deletions openhands/agenthub/micro/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ def history_to_json(self, history: list[Event], max_events: int = 20, **kwargs):
# history is in reverse order, let's fix it
processed_history.reverse()

# everything starts with a message
# the first message is already in the prompt as the task
# so we don't need to include it in the history
if event_count < max_events:
processed_history.pop(0)

return json.dumps(processed_history, **kwargs)

def __init__(self, llm: LLM, config: AgentConfig):
Expand All @@ -62,6 +68,7 @@ def __init__(self, llm: LLM, config: AgentConfig):

def step(self, state: State) -> Action:
last_user_message, last_image_urls = state.get_current_user_intent()
print(f'MICROAGENT:step: {last_user_message}')
prompt = self.prompt_template.render(
state=state,
instructions=instructions,
Expand Down
46 changes: 22 additions & 24 deletions openhands/controller/agent_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,11 @@ async def close(self) -> None:
)

# unsubscribe from the event stream
self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER, self.id)
# only the root parent controller subscribes to the event stream
if not self.is_delegate:
self.event_stream.unsubscribe(
EventStreamSubscriber.AGENT_CONTROLLER, self.id
)
self._closed = True

def log(self, level: str, message: str, extra: dict | None = None) -> None:
Expand Down Expand Up @@ -230,6 +234,10 @@ async def _step_with_exception_handling(self):
await self._react_to_exception(reported)

def should_step(self, event: Event) -> bool:
# it might be the delegate's day in the sun
if self.delegate is not None:
return False

if isinstance(event, Action):
if isinstance(event, MessageAction) and event.source == EventSource.USER:
return True
Expand Down Expand Up @@ -262,7 +270,7 @@ def on_event(self, event: Event) -> None:
# If we have a delegate that is not finished or errored, forward events to it
if self.delegate is not None:
delegate_state = self.delegate.get_agent_state()
if self.should_step(event) and delegate_state not in (
if delegate_state not in (
AgentState.FINISHED,
AgentState.ERROR,
AgentState.REJECTED,
Expand All @@ -272,7 +280,7 @@ def on_event(self, event: Event) -> None:
self.delegate._on_event(event)
)
return
elif self.should_step(event):
else:
# delegate is done
self.end_delegate()
return
Expand All @@ -297,17 +305,22 @@ async def _on_event(self, event: Event) -> None:
self.step()

async def _handle_action(self, action: Action) -> None:
"""Handles actions from the event stream.
Args:
action (Action): The action to handle.
"""
"""Handles an Action from the agent or delegate."""
if isinstance(action, ChangeAgentStateAction):
await self.set_agent_state_to(action.agent_state) # type: ignore
elif isinstance(action, MessageAction):
await self._handle_message_action(action)
elif isinstance(action, AgentDelegateAction):
await self.start_delegate(action)
assert self.delegate is not None
# Post a MessageAction with the task for the delegate
if 'task' in action.inputs:
self.event_stream.add_event(
MessageAction(content='TASK: ' + action.inputs['task']),
EventSource.USER,
)
await self.delegate.set_agent_state_to(AgentState.RUNNING)
return

elif isinstance(action, AgentFinishAction):
self.state.outputs = action.outputs
Expand Down Expand Up @@ -470,6 +483,7 @@ async def set_agent_state_to(self, new_state: AgentState) -> None:
self._pending_action._id = None # type: ignore[attr-defined]
self.event_stream.add_event(self._pending_action, EventSource.AGENT)

print(f'CONTROLLER {self.id}:set_agent_state_to: {new_state}')
self.state.agent_state = new_state
self.event_stream.add_event(
AgentStateChangedObservation('', self.state.agent_state),
Expand Down Expand Up @@ -539,8 +553,6 @@ async def start_delegate(self, action: AgentDelegateAction) -> None:
headless_mode=self.headless_mode,
)

await self.delegate.set_agent_state_to(AgentState.RUNNING)

def end_delegate(self) -> None:
print(f'CONTROLLER {self.id}:end_delegate')
if self.delegate is None:
Expand Down Expand Up @@ -600,14 +612,6 @@ async def _step(self) -> None:
if self._pending_action:
return

if self.delegate is not None:
assert self.delegate != self
# TODO this conditional will always be false, because the parent controllers are unsubscribed
# remove if it's still useless when delegation is reworked
if self.delegate.get_agent_state() != AgentState.PAUSED:
await self._delegate_step()
return

self.log(
'info',
f'LEVEL {self.state.delegate_level} LOCAL STEP {self.state.local_iteration} GLOBAL STEP {self.state.iteration}',
Expand Down Expand Up @@ -697,12 +701,6 @@ async def _step(self) -> None:
log_level = 'info' if LOG_ALL_EVENTS else 'debug'
self.log(log_level, str(action), extra={'msg_type': 'ACTION'})

async def _delegate_step(self) -> None:
"""Executes a single step of the delegate agent."""
await self.delegate._step() # type: ignore[union-attr]

return

async def _handle_traffic_control(
self, limit_type: str, current_value: float, max_value: float
) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion openhands/runtime/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def add_env_vars(self, env_vars: dict[str, str]) -> None:
)

def on_event(self, event: Event) -> None:
print(f'RUNTIME:on_event: {event.__class__.__name__}({event.id})')
print(f'RUNTIME:on_event: {event.__class__.__name__}({event.id})\n')
if isinstance(event, Action):
asyncio.get_event_loop().run_until_complete(self._handle_action(event))

Expand Down

0 comments on commit 419f568

Please sign in to comment.