Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(agents-api): Fix transitioning to error #1027

Draft
wants to merge 9 commits into
base: dev
Choose a base branch
from
Prev Previous commit
Next Next commit
fix(agents-api): Misc fixes for transitioning to error state
Ahmad-mtos committed Jan 8, 2025
commit d2b81f4d0a6ee1a29eaefd6d25164b9215dcdbf9
37 changes: 13 additions & 24 deletions agents-api/agents_api/workflows/task_execution/__init__.py
Original file line number Diff line number Diff line change
@@ -220,7 +220,6 @@ async def run(
# Handle errors (activity returns None)
case step, StepOutcome(error=error) if error is not None:
workflow.logger.error(f"Error in step {context.cursor.step}: {error}")
await transition(context, type="error", output=error)
msg = f"Step {type(step).__name__} threw error: {error}"
raise ApplicationError(msg)

@@ -345,11 +344,6 @@ async def run(
workflow.logger.error(f"Error step: {error}")

state = PartialTransition(type="error", output=error)
await transition(
context,
state,
last_error=self.last_error,
)

msg = f"Error raised by ErrorWorkflowStep: {error}"
raise ApplicationError(msg)
@@ -644,11 +638,6 @@ async def run(
f"Unhandled step type: {type(context.current_step).__name__}"
)
state = PartialTransition(type="error", output="Not implemented")
await transition(
context,
state,
last_error=self.last_error,
)

msg = "Not implemented"
raise ApplicationError(msg)
@@ -679,7 +668,7 @@ async def run(
if not final_state.next:
msg = "No next step"
raise ApplicationError(msg)

workflow.logger.info(
f"Continuing to next step: {final_state.next.workflow}.{final_state.next.step}"
)
@@ -694,19 +683,19 @@ async def run(
retry_policy=DEFAULT_RETRY_POLICY,
heartbeat_timeout=timedelta(seconds=temporal_heartbeat_timeout),
)

except Exception as e:
workflow.logger.error(f"Unhandled error: {e!s}")
await transition(context, type="error", output=str(e), last_error=self.last_error)
raise ApplicationError("Workflow encountered an error") from e

previous_inputs.append(final_output)
previous_inputs.append(final_output)

# Continue as a child workflow
return await continue_as_child(
context.execution_input,
start=final_state.next,
previous_inputs=previous_inputs,
user_state=state.user_state,
# Continue as a child workflow
return await continue_as_child(
context.execution_input,
start=final_state.next,
previous_inputs=previous_inputs,
user_state=state.user_state,
)

except Exception as e:
workflow.logger.error(f"Unhandled error: {e!s}")
await transition(context, type="error", output=str(e))
msg = "Workflow encountered an error"
raise ApplicationError(msg) from e
8 changes: 0 additions & 8 deletions agents-api/agents_api/workflows/task_execution/helpers.py
Original file line number Diff line number Diff line change
@@ -26,14 +26,6 @@
T = TypeVar("T")


async def handle_error(context: StepContext, error: BaseException):
workflow.logger.error(f"Error in workflow: {error!s}")
workflow.logger.error(f"Error in step {context.cursor.step}: {error}")
await transition(context, type="error", output=error)
msg = f"Step {type(context.current_step).__name__} threw error: {error}"
raise ApplicationError(msg) from error


def validate_execution_input(execution_input: ExecutionInput) -> TaskSpecDef:
"""Validates and returns the task from execution input.
18 changes: 9 additions & 9 deletions agents-api/agents_api/workflows/task_execution/transition.py
Original file line number Diff line number Diff line change
@@ -31,16 +31,16 @@ async def transition(
if state.type is not None and state.type == "error":
error_type = "error"

match context.is_last_step, context.cursor and not error_type:
case (True, TransitionTarget(workflow="main")):
state.type = "finish"
case (True, _):
state.type = "finish_branch"
case _, _:
state.type = "step"

if error_type:
if error_type and error_type == "error":
state.type = "error"
else:
match context.is_last_step, context.cursor:
case (True, TransitionTarget(workflow="main")):
state.type = "finish"
case (True, _):
state.type = "finish_branch"
case _, _:
state.type = "step"

transition_request = CreateTransitionRequest(
current=context.cursor,