diff --git a/agents-api/agents_api/activities/task_steps/raise_complete_async.py b/agents-api/agents_api/activities/task_steps/raise_complete_async.py index 904b298d3..adae15b7e 100644 --- a/agents-api/agents_api/activities/task_steps/raise_complete_async.py +++ b/agents-api/agents_api/activities/task_steps/raise_complete_async.py @@ -1,13 +1,11 @@ import base64 +from typing import Any from beartype import beartype from temporalio import activity from ...autogen.openapi_model import CreateTransitionRequest -from ...common.protocol.tasks import ( - StepContext, - StepOutcome, -) +from ...common.protocol.tasks import StepContext from ...common.storage_handler import auto_blob_store from .transition_step import original_transition_step @@ -15,7 +13,7 @@ @activity.defn @auto_blob_store @beartype -async def raise_complete_async(context: StepContext, output: StepOutcome) -> None: +async def raise_complete_async(context: StepContext, output: Any) -> None: activity_info = activity.info() captured_token = base64.b64encode(activity_info.task_token).decode("ascii") diff --git a/agents-api/agents_api/clients/temporal.py b/agents-api/agents_api/clients/temporal.py index 4bb25cbc9..deb4809f1 100644 --- a/agents-api/agents_api/clients/temporal.py +++ b/agents-api/agents_api/clients/temporal.py @@ -2,6 +2,11 @@ from uuid import UUID from temporalio.client import Client, TLSConfig +from temporalio.common import ( + SearchAttributeKey, + SearchAttributePair, + TypedSearchAttributes, +) from ..autogen.openapi_model import TransitionTarget from ..common.protocol.tasks import ExecutionInput @@ -48,6 +53,7 @@ async def run_task_execution_workflow( from ..workflows.task_execution import TaskExecutionWorkflow client = client or (await get_client()) + execution_id_key = SearchAttributeKey.for_keyword("CustomStringField") return await client.start_workflow( TaskExecutionWorkflow.run, @@ -56,7 +62,13 @@ async def run_task_execution_workflow( id=str(job_id), run_timeout=timedelta(days=31), retry_policy=DEFAULT_RETRY_POLICY, - # TODO: Should add search_attributes for queryability + search_attributes=TypedSearchAttributes( + [ + SearchAttributePair( + execution_id_key, str(execution_input.execution.id) + ), + ] + ), ) diff --git a/agents-api/agents_api/common/utils/yaml.py b/agents-api/agents_api/common/utils/yaml.py index c6b15a59f..1cde64aa4 100644 --- a/agents-api/agents_api/common/utils/yaml.py +++ b/agents-api/agents_api/common/utils/yaml.py @@ -1,4 +1,3 @@ -from io import StringIO from typing import Any import yaml diff --git a/agents-api/agents_api/env.py b/agents-api/agents_api/env.py index 43cfc45d0..c69eae8f2 100644 --- a/agents-api/agents_api/env.py +++ b/agents-api/agents_api/env.py @@ -25,6 +25,7 @@ # ----- task_max_parallelism: int = env.int("AGENTS_API_TASK_MAX_PARALLELISM", default=100) + # Blob Store # ---------- use_blob_store_for_temporal: bool = env.bool( @@ -37,6 +38,7 @@ s3_access_key: str | None = env.str("S3_ACCESS_KEY", default=None) s3_secret_key: str | None = env.str("S3_SECRET_KEY", default=None) + # Debug # ----- debug: bool = env.bool("AGENTS_API_DEBUG", default=False) diff --git a/agents-api/agents_api/models/execution/constants.py b/agents-api/agents_api/models/execution/constants.py new file mode 100644 index 000000000..8d4568ba2 --- /dev/null +++ b/agents-api/agents_api/models/execution/constants.py @@ -0,0 +1,5 @@ +########## +# Consts # +########## + +OUTPUT_UNNEST_KEY = "$$e7w_unnest$$" diff --git a/agents-api/agents_api/models/execution/create_execution.py b/agents-api/agents_api/models/execution/create_execution.py index 13108fe15..2b509cda4 100644 --- a/agents-api/agents_api/models/execution/create_execution.py +++ b/agents-api/agents_api/models/execution/create_execution.py @@ -18,6 +18,7 @@ verify_developer_owns_resource_query, wrap_in_class, ) +from .constants import OUTPUT_UNNEST_KEY ModelT = TypeVar("ModelT", bound=Any) T = TypeVar("T") @@ -59,6 +60,9 @@ def create_execution( data["metadata"] = data.get("metadata", {}) execution_data = data + if execution_data["output"] is not None and not isinstance(execution_data["output"], dict): + execution_data["output"] = {OUTPUT_UNNEST_KEY: execution_data["output"]} + columns, values = cozo_process_mutate_data( { **execution_data, diff --git a/agents-api/agents_api/models/execution/get_execution.py b/agents-api/agents_api/models/execution/get_execution.py index 263ce9c66..11eb1e107 100644 --- a/agents-api/agents_api/models/execution/get_execution.py +++ b/agents-api/agents_api/models/execution/get_execution.py @@ -13,6 +13,7 @@ rewrap_exceptions, wrap_in_class, ) +from .constants import OUTPUT_UNNEST_KEY ModelT = TypeVar("ModelT", bound=Any) T = TypeVar("T") @@ -26,7 +27,14 @@ TypeError: partialclass(HTTPException, status_code=400), } ) -@wrap_in_class(Execution, one=True) +@wrap_in_class( + Execution, + one=True, + transform=lambda d: { + **d, + "output": d["output"][OUTPUT_UNNEST_KEY] if OUTPUT_UNNEST_KEY in d["output"] else d["output"], + }, +) @cozo_query @beartype def get_execution( diff --git a/agents-api/agents_api/models/execution/list_executions.py b/agents-api/agents_api/models/execution/list_executions.py index 09194cdbd..6bd7f5303 100644 --- a/agents-api/agents_api/models/execution/list_executions.py +++ b/agents-api/agents_api/models/execution/list_executions.py @@ -15,6 +15,7 @@ verify_developer_owns_resource_query, wrap_in_class, ) +from .constants import OUTPUT_UNNEST_KEY ModelT = TypeVar("ModelT", bound=Any) T = TypeVar("T") @@ -27,7 +28,15 @@ TypeError: partialclass(HTTPException, status_code=400), } ) -@wrap_in_class(Execution) +@wrap_in_class( + Execution, + transform=lambda d: { + **d, + "output": d["output"][OUTPUT_UNNEST_KEY] + if OUTPUT_UNNEST_KEY in d["output"] + else d["output"], + }, +) @cozo_query @beartype def list_executions( diff --git a/agents-api/agents_api/models/execution/update_execution.py b/agents-api/agents_api/models/execution/update_execution.py index 5b578c08e..35deab259 100644 --- a/agents-api/agents_api/models/execution/update_execution.py +++ b/agents-api/agents_api/models/execution/update_execution.py @@ -23,6 +23,7 @@ verify_developer_owns_resource_query, wrap_in_class, ) +from .constants import OUTPUT_UNNEST_KEY ModelT = TypeVar("ModelT", bound=Any) T = TypeVar("T") @@ -50,7 +51,7 @@ def update_execution( task_id: UUID, execution_id: UUID, data: UpdateExecutionRequest, - output: dict | None = None, + output: dict | Any | None = None, error: str | None = None, ) -> tuple[list[str], dict]: developer_id = str(developer_id) @@ -63,6 +64,9 @@ def update_execution( execution_data: dict = data.model_dump(exclude_none=True) + if output is not None and not isinstance(output, dict): + output: dict = {OUTPUT_UNNEST_KEY: output} + columns, values = cozo_process_mutate_data( { **execution_data,