diff --git a/agents-api/agents_api/autogen/Tasks.py b/agents-api/agents_api/autogen/Tasks.py index b9212d8cb..f6bf58ddf 100644 --- a/agents-api/agents_api/autogen/Tasks.py +++ b/agents-api/agents_api/autogen/Tasks.py @@ -161,8 +161,21 @@ class CreateTaskRequest(BaseModel): model_config = ConfigDict( populate_by_name=True, ) - name: str + name: Annotated[str, Field(max_length=255, min_length=1)] + """ + The name of the task. + """ + canonical_name: Annotated[ + str | None, + Field(max_length=255, min_length=1, pattern="^[a-zA-Z][a-zA-Z0-9_]*$"), + ] = None + """ + The canonical name of the task. + """ description: str = "" + """ + The description of the task. + """ main: Annotated[ list[ EvaluateStep @@ -650,7 +663,21 @@ class PatchTaskRequest(BaseModel): model_config = ConfigDict( populate_by_name=True, ) + name: Annotated[str | None, Field(max_length=255, min_length=1)] = None + """ + The name of the task. + """ + canonical_name: Annotated[ + str | None, + Field(max_length=255, min_length=1, pattern="^[a-zA-Z][a-zA-Z0-9_]*$"), + ] = None + """ + The canonical name of the task. + """ description: str = "" + """ + The description of the task. + """ main: Annotated[ list[ EvaluateStep @@ -966,8 +993,21 @@ class Task(BaseModel): model_config = ConfigDict( populate_by_name=True, ) - name: str + name: Annotated[str, Field(max_length=255, min_length=1)] + """ + The name of the task. + """ + canonical_name: Annotated[ + str | None, + Field(max_length=255, min_length=1, pattern="^[a-zA-Z][a-zA-Z0-9_]*$"), + ] = None + """ + The canonical name of the task. + """ description: str = "" + """ + The description of the task. + """ main: Annotated[ list[ EvaluateStep @@ -1124,7 +1164,21 @@ class UpdateTaskRequest(BaseModel): model_config = ConfigDict( populate_by_name=True, ) + name: Annotated[str, Field(max_length=255, min_length=1)] + """ + The name of the task. + """ + canonical_name: Annotated[ + str | None, + Field(max_length=255, min_length=1, pattern="^[a-zA-Z][a-zA-Z0-9_]*$"), + ] = None + """ + The canonical name of the task. + """ description: str = "" + """ + The description of the task. + """ main: Annotated[ list[ EvaluateStep diff --git a/agents-api/agents_api/queries/agents/create_agent.py b/agents-api/agents_api/queries/agents/create_agent.py index 76c96f46b..b5a4af75a 100644 --- a/agents-api/agents_api/queries/agents/create_agent.py +++ b/agents-api/agents_api/queries/agents/create_agent.py @@ -9,7 +9,7 @@ from sqlglot import parse_one from uuid_extensions import uuid7 -from ...autogen.openapi_model import Agent, CreateAgentRequest +from ...autogen.openapi_model import CreateAgentRequest, ResourceCreatedResponse from ...metrics.counters import increase_counter from ..utils import ( generate_canonical_name, @@ -75,9 +75,9 @@ # } # ) @wrap_in_class( - Agent, + ResourceCreatedResponse, one=True, - transform=lambda d: {"id": d["agent_id"], **d}, + transform=lambda d: {"id": d["agent_id"], "created_at": d["created_at"]}, ) @increase_counter("create_agent") @pg_query diff --git a/agents-api/agents_api/queries/agents/create_or_update_agent.py b/agents-api/agents_api/queries/agents/create_or_update_agent.py index ef3a0abe5..258badc93 100644 --- a/agents-api/agents_api/queries/agents/create_or_update_agent.py +++ b/agents-api/agents_api/queries/agents/create_or_update_agent.py @@ -18,6 +18,11 @@ # Define the raw SQL query agent_query = parse_one(""" +WITH existing_agent AS ( + SELECT canonical_name + FROM agents + WHERE developer_id = $1 AND agent_id = $2 +) INSERT INTO agents ( developer_id, agent_id, @@ -30,15 +35,18 @@ default_settings ) VALUES ( - $1, - $2, - $3, - $4, - $5, - $6, - $7, - $8, - $9 + $1, -- developer_id + $2, -- agent_id + COALESCE( -- canonical_name + (SELECT canonical_name FROM existing_agent), + $3 + ), + $4, -- name + $5, -- about + $6, -- instructions + $7, -- model + $8, -- metadata + $9 -- default_settings ) RETURNING *; """).sql(pretty=True) diff --git a/agents-api/agents_api/queries/developers/create_developer.py b/agents-api/agents_api/queries/developers/create_developer.py index bed6371c4..1e927397c 100644 --- a/agents-api/agents_api/queries/developers/create_developer.py +++ b/agents-api/agents_api/queries/developers/create_developer.py @@ -6,7 +6,7 @@ from sqlglot import parse_one from uuid_extensions import uuid7 -from ...common.protocol.developers import Developer +from ...autogen.openapi_model import ResourceCreatedResponse from ..utils import ( partialclass, pg_query, @@ -43,7 +43,11 @@ ) } ) -@wrap_in_class(Developer, one=True, transform=lambda d: {**d, "id": d["developer_id"]}) +@wrap_in_class( + ResourceCreatedResponse, + one=True, + transform=lambda d: {**d, "id": d["developer_id"], "created_at": d["created_at"]}, +) @pg_query @beartype async def create_developer( diff --git a/agents-api/agents_api/queries/entries/create_entries.py b/agents-api/agents_api/queries/entries/create_entries.py index 95973ad0b..ee931534d 100644 --- a/agents-api/agents_api/queries/entries/create_entries.py +++ b/agents-api/agents_api/queries/entries/create_entries.py @@ -7,7 +7,11 @@ from litellm.utils import _select_tokenizer as select_tokenizer from uuid_extensions import uuid7 -from ...autogen.openapi_model import CreateEntryRequest, Entry, Relation +from ...autogen.openapi_model import ( + CreateEntryRequest, + Relation, + ResourceCreatedResponse, +) from ...common.utils.datetime import utcnow from ...common.utils.messages import content_to_json from ...metrics.counters import increase_counter @@ -79,9 +83,10 @@ } ) @wrap_in_class( - Entry, + ResourceCreatedResponse, transform=lambda d: { "id": d.pop("entry_id"), + "created_at": d.pop("created_at"), **d, }, ) diff --git a/agents-api/agents_api/queries/entries/get_history.py b/agents-api/agents_api/queries/entries/get_history.py index e6967a6cc..ffa0746c0 100644 --- a/agents-api/agents_api/queries/entries/get_history.py +++ b/agents-api/agents_api/queries/entries/get_history.py @@ -1,5 +1,4 @@ import json -from typing import Any, List, Tuple from uuid import UUID import asyncpg diff --git a/agents-api/agents_api/queries/entries/list_entries.py b/agents-api/agents_api/queries/entries/list_entries.py index 89f432734..55384b633 100644 --- a/agents-api/agents_api/queries/entries/list_entries.py +++ b/agents-api/agents_api/queries/entries/list_entries.py @@ -11,14 +11,10 @@ # Query for checking if the session exists session_exists_query = """ -SELECT CASE - WHEN EXISTS ( - SELECT 1 FROM sessions - WHERE session_id = $1 AND developer_id = $2 - ) - THEN TRUE - ELSE (SELECT NULL::boolean WHERE FALSE) -- This raises a NO_DATA_FOUND error -END; +SELECT EXISTS ( + SELECT 1 FROM sessions + WHERE session_id = $1 AND developer_id = $2 +) AS exists; """ list_entries_query = """ diff --git a/agents-api/agents_api/queries/files/create_file.py b/agents-api/agents_api/queries/files/create_file.py index 48251fa5e..00d07bce7 100644 --- a/agents-api/agents_api/queries/files/create_file.py +++ b/agents-api/agents_api/queries/files/create_file.py @@ -5,18 +5,16 @@ import base64 import hashlib -from typing import Any, Literal +from typing import Literal from uuid import UUID -import asyncpg from beartype import beartype -from fastapi import HTTPException from sqlglot import parse_one from uuid_extensions import uuid7 from ...autogen.openapi_model import CreateFileRequest, File from ...metrics.counters import increase_counter -from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class +from ..utils import pg_query, wrap_in_class # Create file file_query = parse_one(""" diff --git a/agents-api/agents_api/queries/files/get_file.py b/agents-api/agents_api/queries/files/get_file.py index 4d5dca4c0..36bfc42c6 100644 --- a/agents-api/agents_api/queries/files/get_file.py +++ b/agents-api/agents_api/queries/files/get_file.py @@ -6,13 +6,11 @@ from typing import Literal from uuid import UUID -import asyncpg from beartype import beartype -from fastapi import HTTPException from sqlglot import parse_one from ...autogen.openapi_model import File -from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class +from ..utils import pg_query, wrap_in_class # Define the raw SQL query file_query = parse_one(""" diff --git a/agents-api/agents_api/queries/files/list_files.py b/agents-api/agents_api/queries/files/list_files.py index 2bc42f842..ee4f70d95 100644 --- a/agents-api/agents_api/queries/files/list_files.py +++ b/agents-api/agents_api/queries/files/list_files.py @@ -3,16 +3,15 @@ It constructs and executes SQL queries to fetch a list of files based on developer ID with pagination. """ -from typing import Any, Literal +from typing import Literal from uuid import UUID -import asyncpg from beartype import beartype from fastapi import HTTPException from sqlglot import parse_one from ...autogen.openapi_model import File -from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class +from ..utils import pg_query, wrap_in_class # Query to list all files for a developer (uses developer_id index) developer_files_query = parse_one(""" diff --git a/agents-api/agents_api/queries/sessions/create_session.py b/agents-api/agents_api/queries/sessions/create_session.py index 058462cf8..500d6fbea 100644 --- a/agents-api/agents_api/queries/sessions/create_session.py +++ b/agents-api/agents_api/queries/sessions/create_session.py @@ -7,8 +7,8 @@ from uuid_extensions import uuid7 from ...autogen.openapi_model import ( - CreateSessionRequest, - Session, + Create`Request, + ResourceCreatedResponse, ) from ...metrics.counters import increase_counter from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class @@ -68,11 +68,12 @@ } ) @wrap_in_class( - Session, + ResourceCreatedResponse, one=True, transform=lambda d: { **d, "id": d["session_id"], + "created_at": d["created_at"], }, ) @increase_counter("create_session") diff --git a/agents-api/agents_api/queries/tasks/__init__.py b/agents-api/agents_api/queries/tasks/__init__.py new file mode 100644 index 000000000..63b4bed22 --- /dev/null +++ b/agents-api/agents_api/queries/tasks/__init__.py @@ -0,0 +1,28 @@ +""" +The `task` module within the `queries` package provides SQL query functions for managing tasks +in the TimescaleDB database. This includes operations for: + +- Creating new tasks +- Updating existing tasks +- Retrieving task details +- Listing tasks with filtering and pagination +- Deleting tasks +""" + +from .create_or_update_task import create_or_update_task +from .create_task import create_task +from .delete_task import delete_task +from .get_task import get_task +from .list_tasks import list_tasks +from .patch_task import patch_task +from .update_task import update_task + +__all__ = [ + "create_or_update_task", + "create_task", + "delete_task", + "get_task", + "list_tasks", + "patch_task", + "update_task", +] diff --git a/agents-api/agents_api/queries/tasks/create_or_update_task.py b/agents-api/agents_api/queries/tasks/create_or_update_task.py new file mode 100644 index 000000000..1f259ac16 --- /dev/null +++ b/agents-api/agents_api/queries/tasks/create_or_update_task.py @@ -0,0 +1,242 @@ +from typing import Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one +from uuid_extensions import uuid7 + +from ...autogen.openapi_model import CreateOrUpdateTaskRequest, ResourceUpdatedResponse +from ...common.protocol.tasks import task_to_spec +from ...metrics.counters import increase_counter +from ..utils import ( + generate_canonical_name, + partialclass, + pg_query, + rewrap_exceptions, + wrap_in_class, +) + +# Define the raw SQL query for creating or updating a task +tools_query = parse_one(""" +WITH version AS ( + SELECT COALESCE(MAX("version"), 0) as current_version + FROM tasks + WHERE developer_id = $1 + AND task_id = $3 +) +INSERT INTO tools ( + task_version, + developer_id, + agent_id, + task_id, + tool_id, + type, + name, + description, + spec +) +SELECT + current_version, -- task_version + $1, -- developer_id + $2, -- agent_id + $3, -- task_id + $4, -- tool_id + $5, -- type + $6, -- name + $7, -- description + $8 -- spec +FROM version +""").sql(pretty=True) + +task_query = parse_one(""" +WITH current_version AS ( + SELECT COALESCE( + (SELECT MAX("version") + FROM tasks + WHERE developer_id = $1 + AND task_id = $4), + 0 + ) + 1 as next_version, + COALESCE( + (SELECT canonical_name + FROM tasks + WHERE developer_id = $1 AND task_id = $4 + ORDER BY version DESC + LIMIT 1), + $2 + ) as effective_canonical_name + FROM (SELECT 1) as dummy +) +INSERT INTO tasks ( + "version", + developer_id, + canonical_name, + agent_id, + task_id, + name, + description, + inherit_tools, + input_schema, + metadata +) +SELECT + next_version, -- version + $1, -- developer_id + effective_canonical_name, -- canonical_name + $3, -- agent_id + $4, -- task_id + $5, -- name + $6, -- description + $7, -- inherit_tools + $8::jsonb, -- input_schema + $9::jsonb -- metadata +FROM current_version +RETURNING *, (SELECT next_version FROM current_version) as next_version; +""").sql(pretty=True) + +# Define the raw SQL query for inserting workflows +workflows_query = parse_one(""" +WITH version AS ( + SELECT COALESCE(MAX("version"), 0) as current_version + FROM tasks + WHERE developer_id = $1 + AND task_id = $2 +) +INSERT INTO workflows ( + developer_id, + task_id, + "version", + name, + step_idx, + step_type, + step_definition +) +SELECT + $1, -- developer_id + $2, -- task_id + current_version, -- version + $3, -- name + $4, -- step_idx + $5, -- step_type + $6 -- step_definition +FROM version +""").sql(pretty=True) + + +@rewrap_exceptions( + { + asyncpg.ForeignKeyViolationError: partialclass( + HTTPException, + status_code=404, + detail="The specified developer or agent does not exist.", + ), + asyncpg.UniqueViolationError: partialclass( + HTTPException, + status_code=409, + detail="A task with this ID already exists for this agent.", + ), + } +) +@wrap_in_class( + ResourceUpdatedResponse, + one=True, + transform=lambda d: { + "id": d["task_id"], + "updated_at": d["updated_at"].timestamp(), + **d, + }, +) +@increase_counter("create_or_update_task") +@pg_query(return_index=0) +@beartype +async def create_or_update_task( + *, + developer_id: UUID, + agent_id: UUID, + task_id: UUID, + data: CreateOrUpdateTaskRequest, +) -> list[tuple[str, list, Literal["fetch", "fetchmany"]]]: + """ + Constructs an SQL query to create or update a task. + + Args: + developer_id (UUID): The UUID of the developer. + agent_id (UUID): The UUID of the agent. + task_id (UUID): The UUID of the task. + data (CreateOrUpdateTaskRequest): The task data to insert or update. + + Returns: + list[tuple[str, list, Literal["fetch", "fetchmany"]]]: List of SQL queries and parameters. + + Raises: + HTTPException: If developer/agent doesn't exist (404) or on unique constraint violation (409) + """ + + # Generate canonical name from task name if not provided + canonical_name = data.canonical_name or generate_canonical_name(data.name) + + # Version will be determined by the CTE + task_params = [ + developer_id, # $1 + canonical_name, # $2 + agent_id, # $3 + task_id, # $4 + data.name, # $5 + data.description, # $6 + data.inherit_tools, # $7 + data.input_schema or {}, # $8 + data.metadata or {}, # $9 + ] + + # Prepare tool parameters for the tools table + tool_params = [ + [ + developer_id, + agent_id, + task_id, + uuid7(), # tool_id + tool.type, + tool.name, + tool.description, + getattr(tool, tool.type), # spec + ] + for tool in data.tools or [] + ] + + # Generate workflows from task data using task_to_spec + workflows_spec = task_to_spec(data).model_dump(exclude_none=True, mode="json") + workflow_params = [] + for workflow in workflows_spec.get("workflows", []): + workflow_name = workflow.get("name") + steps = workflow.get("steps", []) + for step_idx, step in enumerate(steps): + workflow_params.append( + [ + developer_id, # $1 + task_id, # $2 + workflow_name, # $3 + step_idx, # $4 + step["kind_"], # $5 + step[step["kind_"]], # $6 + ] + ) + + return [ + ( + task_query, + task_params, + "fetch", + ), + ( + tools_query, + tool_params, + "fetchmany", + ), + ( + workflows_query, + workflow_params, + "fetchmany", + ), + ] diff --git a/agents-api/agents_api/queries/tasks/create_task.py b/agents-api/agents_api/queries/tasks/create_task.py new file mode 100644 index 000000000..58287fbbc --- /dev/null +++ b/agents-api/agents_api/queries/tasks/create_task.py @@ -0,0 +1,212 @@ +from typing import Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one +from uuid_extensions import uuid7 + +from ...autogen.openapi_model import CreateTaskRequest, ResourceCreatedResponse +from ...common.protocol.tasks import task_to_spec +from ...metrics.counters import increase_counter +from ..utils import ( + generate_canonical_name, + partialclass, + pg_query, + rewrap_exceptions, + wrap_in_class, +) + +# Define the raw SQL query for creating or updating a task +tools_query = parse_one(""" +INSERT INTO tools ( + task_version, + developer_id, + agent_id, + task_id, + tool_id, + type, + name, + description, + spec +) +VALUES ( + 1, -- task_version + $1, -- developer_id + $2, -- agent_id + $3, -- task_id + $4, -- tool_id + $5, -- type + $6, -- name + $7, -- description + $8 -- spec +) +""").sql(pretty=True) + +task_query = parse_one(""" +INSERT INTO tasks ( + "version", + developer_id, + agent_id, + task_id, + name, + canonical_name, + description, + inherit_tools, + input_schema, + metadata +) +VALUES ( + 1, -- version + $1, -- developer_id + $2, -- agent_id + $3, -- task_id + $4, -- name + $5, -- canonical_name + $6, -- description + $7, -- inherit_tools + $8::jsonb, -- input_schema + $9::jsonb -- metadata +) +RETURNING * +""").sql(pretty=True) + +# Define the raw SQL query for inserting workflows +workflows_query = parse_one(""" +INSERT INTO workflows ( + developer_id, + task_id, + "version", + name, + step_idx, + step_type, + step_definition +) +VALUES ( + $1, -- developer_id + $2, -- task_id + $3, -- version + $4, -- name + $5, -- step_idx + $6, -- step_type + $7 -- step_definition +) +""").sql(pretty=True) + + +@rewrap_exceptions( + { + asyncpg.ForeignKeyViolationError: partialclass( + HTTPException, + status_code=404, + detail="The specified developer or agent does not exist.", + ), + asyncpg.UniqueViolationError: partialclass( + HTTPException, + status_code=409, + detail="A task with this ID already exists for this agent.", + ), + } +) +@wrap_in_class( + ResourceCreatedResponse, + one=True, + transform=lambda d: { + "id": d["task_id"], + "jobs": [], + # "updated_at": d["updated_at"].timestamp(), + **d, + }, +) +@increase_counter("create_task") +@pg_query(return_index=0) +@beartype +async def create_task( + *, + developer_id: UUID, + agent_id: UUID, + task_id: UUID | None = None, + data: CreateTaskRequest, +) -> list[tuple[str, list, Literal["fetch", "fetchmany"]]]: + """ + Constructs SQL queries to create or update a task along with its associated tools and workflows. + + Args: + developer_id (UUID): The UUID of the developer. + agent_id (UUID): The UUID of the agent. + task_id (UUID, optional): The UUID of the task. If not provided, a new UUID is generated. + data (CreateTaskRequest): The task data to insert or update. + + Returns: + tuple[str, list]: SQL query and parameters. + + Raises: + HTTPException: If developer/agent doesn't exist (404) or on unique constraint violation (409) + """ + task_id = task_id or uuid7() + + # Insert parameters for the tasks table + task_params = [ + developer_id, # $1 + agent_id, # $2 + task_id, # $3 + data.name, # $4 + data.canonical_name or generate_canonical_name(data.name), # $5 + data.description, # $6 + data.inherit_tools, # $7 + data.input_schema or {}, # $8 + data.metadata or {}, # $9 + ] + + # Prepare tool parameters for the tools table + tool_params = [ + [ + developer_id, + agent_id, + task_id, + uuid7(), # tool_id + tool.type, + tool.name, + tool.description, + getattr(tool, tool.type), # spec + ] + for tool in data.tools or [] + ] + + # Generate workflows from task data using task_to_spec + workflows_spec = task_to_spec(data).model_dump(exclude_none=True, mode="json") + workflow_params = [] + for workflow in workflows_spec.get("workflows", []): + workflow_name = workflow.get("name") + steps = workflow.get("steps", []) + for step_idx, step in enumerate(steps): + workflow_params.append( + [ + developer_id, # $1 + task_id, # $2 + 1, # $3 (version) + workflow_name, # $4 + step_idx, # $5 + step["kind_"], # $6 + step[step["kind_"]], # $7 + ] + ) + + return [ + ( + task_query, + task_params, + "fetch", + ), + ( + tools_query, + tool_params, + "fetchmany", + ), + ( + workflows_query, + workflow_params, + "fetchmany", + ), + ] diff --git a/agents-api/agents_api/queries/tasks/delete_task.py b/agents-api/agents_api/queries/tasks/delete_task.py new file mode 100644 index 000000000..20e03e28a --- /dev/null +++ b/agents-api/agents_api/queries/tasks/delete_task.py @@ -0,0 +1,77 @@ +from typing import Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException + +from ...autogen.openapi_model import ResourceDeletedResponse +from ...common.utils.datetime import utcnow +from ...metrics.counters import increase_counter +from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class + +workflow_query = """ +DELETE FROM workflows +WHERE developer_id = $1 AND task_id = $2; +""" + +task_query = """ +DELETE FROM tasks +WHERE developer_id = $1 AND task_id = $2 +RETURNING task_id; +""" + + +@rewrap_exceptions( + { + asyncpg.ForeignKeyViolationError: partialclass( + HTTPException, + status_code=404, + detail="The specified developer or agent does not exist.", + ), + asyncpg.UniqueViolationError: partialclass( + HTTPException, + status_code=409, + detail="A task with this ID already exists for this agent.", + ), + asyncpg.NoDataFoundError: partialclass( + HTTPException, + status_code=404, + detail="Task not found", + ), + } +) +@wrap_in_class( + ResourceDeletedResponse, + one=True, + transform=lambda d: { + "id": d["task_id"], + "deleted_at": utcnow(), + }, +) +@increase_counter("delete_task") +@pg_query +@beartype +async def delete_task( + *, + developer_id: UUID, + task_id: UUID, +) -> list[tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]]: + """ + Deletes a task by its unique identifier along with its associated workflows. + + Parameters: + developer_id (UUID): The unique identifier of the developer associated with the task. + task_id (UUID): The unique identifier of the task to delete. + + Returns: + tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]: SQL query, parameters, and fetch method. + + Raises: + HTTPException: If developer/agent doesn't exist (404) or on unique constraint violation (409) + """ + + return [ + (workflow_query, [developer_id, task_id], "fetch"), + (task_query, [developer_id, task_id], "fetchrow"), + ] diff --git a/agents-api/agents_api/queries/tasks/get_task.py b/agents-api/agents_api/queries/tasks/get_task.py new file mode 100644 index 000000000..03da91256 --- /dev/null +++ b/agents-api/agents_api/queries/tasks/get_task.py @@ -0,0 +1,92 @@ +from typing import Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException + +from ...common.protocol.tasks import spec_to_task +from ...metrics.counters import increase_counter +from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class + +get_task_query = """ +SELECT + t.*, + COALESCE( + jsonb_agg( + CASE WHEN w.name IS NOT NULL THEN + jsonb_build_object( + 'name', w.name, + 'steps', jsonb_build_array( + jsonb_build_object( + w.step_type, w.step_definition, + 'step_idx', w.step_idx -- Not sure if this is needed + ) + ) + ) + END + ) FILTER (WHERE w.name IS NOT NULL), + '[]'::jsonb + ) as workflows +FROM + tasks t +LEFT JOIN + workflows w ON t.developer_id = w.developer_id AND t.task_id = w.task_id AND t.version = w.version +WHERE + t.developer_id = $1 AND t.task_id = $2 + AND t.version = ( + SELECT MAX(version) + FROM tasks + WHERE developer_id = $1 AND task_id = $2 + ) +GROUP BY t.developer_id, t.task_id, t.canonical_name, t.agent_id, t.version; +""" + + +@rewrap_exceptions( + { + asyncpg.ForeignKeyViolationError: partialclass( + HTTPException, + status_code=404, + detail="The specified developer or agent does not exist.", + ), + asyncpg.UniqueViolationError: partialclass( + HTTPException, + status_code=409, + detail="A task with this ID already exists for this agent.", + ), + asyncpg.NoDataFoundError: partialclass( + HTTPException, + status_code=404, + detail="Task not found", + ), + } +) +@wrap_in_class(spec_to_task, one=True) +@increase_counter("get_task") +@pg_query +@beartype +async def get_task( + *, + developer_id: UUID, + task_id: UUID, +) -> tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]: + """ + Retrieves a task by its unique identifier along with its associated workflows. + + Parameters: + developer_id (UUID): The unique identifier of the developer associated with the task. + task_id (UUID): The unique identifier of the task to retrieve. + + Returns: + tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]: SQL query, parameters, and fetch method. + + Raises: + HTTPException: If developer/agent doesn't exist (404) or on unique constraint violation (409) + """ + + return ( + get_task_query, + [developer_id, task_id], + "fetchrow", + ) diff --git a/agents-api/agents_api/queries/tasks/list_tasks.py b/agents-api/agents_api/queries/tasks/list_tasks.py new file mode 100644 index 000000000..5cec7103e --- /dev/null +++ b/agents-api/agents_api/queries/tasks/list_tasks.py @@ -0,0 +1,123 @@ +from typing import Any, Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException + +from ...common.protocol.tasks import spec_to_task +from ...metrics.counters import increase_counter +from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class + +list_tasks_query = """ +SELECT + t.*, + COALESCE( + jsonb_agg( + CASE WHEN w.name IS NOT NULL THEN + jsonb_build_object( + 'name', w.name, + 'steps', jsonb_build_array( + jsonb_build_object( + w.step_type, w.step_definition, + 'step_idx', w.step_idx -- Not sure if this is needed + ) + ) + ) + END + ) FILTER (WHERE w.name IS NOT NULL), + '[]'::jsonb + ) as workflows +FROM + tasks t +LEFT JOIN + workflows w ON t.developer_id = w.developer_id AND t.task_id = w.task_id AND t.version = w.version +WHERE + t.developer_id = $1 + {metadata_filter_query} +GROUP BY t.developer_id, t.task_id, t.canonical_name, t.agent_id, t.version +ORDER BY + CASE WHEN $4 = 'created_at' AND $5 = 'asc' THEN t.created_at END ASC NULLS LAST, + CASE WHEN $4 = 'created_at' AND $5 = 'desc' THEN t.created_at END DESC NULLS LAST, + CASE WHEN $4 = 'updated_at' AND $5 = 'asc' THEN t.updated_at END ASC NULLS LAST, + CASE WHEN $4 = 'updated_at' AND $5 = 'desc' THEN t.updated_at END DESC NULLS LAST +LIMIT $2 OFFSET $3; +""" + + +@rewrap_exceptions( + { + asyncpg.ForeignKeyViolationError: partialclass( + HTTPException, + status_code=404, + detail="The specified developer or agent does not exist.", + ), + asyncpg.UniqueViolationError: partialclass( + HTTPException, + status_code=409, + detail="A task with this ID already exists for this agent.", + ), + asyncpg.NoDataFoundError: partialclass( + HTTPException, + status_code=404, + detail="Task not found", + ), + } +) +@wrap_in_class(spec_to_task) +@increase_counter("list_tasks") +@pg_query +@beartype +async def list_tasks( + *, + developer_id: UUID, + limit: int = 100, + offset: int = 0, + sort_by: Literal["created_at", "updated_at"] = "created_at", + direction: Literal["asc", "desc"] = "desc", + metadata_filter: dict[str, Any] = {}, +) -> tuple[str, list]: + """ + Retrieves all tasks for a given developer with pagination and sorting. + + Parameters: + developer_id (UUID): The unique identifier of the developer. + limit (int): Maximum number of records to return (default: 100) + offset (int): Number of records to skip (default: 0) + sort_by (str): Field to sort by ("created_at" or "updated_at") + direction (str): Sort direction ("asc" or "desc") + metadata_filter (dict): Optional metadata filters + + Returns: + tuple[str, list]: SQL query and parameters. + + Raises: + HTTPException: If parameters are invalid or developer/agent doesn't exist + """ + if direction.lower() not in ["asc", "desc"]: + raise HTTPException(status_code=400, detail="Invalid sort direction") + + if limit > 100 or limit < 1: + raise HTTPException(status_code=400, detail="Limit must be between 1 and 100") + + if offset < 0: + raise HTTPException(status_code=400, detail="Offset must be non-negative") + + # Format query with metadata filter if needed + query = list_tasks_query.format( + metadata_filter_query="AND metadata @> $6::jsonb" if metadata_filter else "" + ) + + # Build parameters list + params = [ + developer_id, + limit, + offset, + sort_by, + direction, + ] + + if metadata_filter: + params.append(metadata_filter) + + return (query, params) diff --git a/agents-api/agents_api/queries/tasks/patch_task.py b/agents-api/agents_api/queries/tasks/patch_task.py new file mode 100644 index 000000000..2349f87c5 --- /dev/null +++ b/agents-api/agents_api/queries/tasks/patch_task.py @@ -0,0 +1,220 @@ +from typing import Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one + +from ...autogen.openapi_model import PatchTaskRequest, ResourceUpdatedResponse +from ...common.protocol.tasks import task_to_spec +from ...common.utils.datetime import utcnow +from ...metrics.counters import increase_counter +from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class + +# # Update task query using UPDATE +# update_task_query = parse_one(""" +# UPDATE tasks +# SET +# version = version + 1, +# canonical_name = $2, +# agent_id = $4, +# metadata = $5, +# name = $6, +# description = $7, +# inherit_tools = $8, +# input_schema = $9::jsonb, +# updated_at = NOW() +# WHERE +# developer_id = $1 +# AND task_id = $3 +# RETURNING *; +# """).sql(pretty=True) + +# Update task query using INSERT with version increment +patch_task_query = parse_one(""" +WITH current_version AS ( + SELECT MAX("version") as current_version, + canonical_name as existing_canonical_name, + metadata as existing_metadata, + name as existing_name, + description as existing_description, + inherit_tools as existing_inherit_tools, + input_schema as existing_input_schema + FROM tasks + WHERE developer_id = $1 + AND task_id = $3 + GROUP BY canonical_name, metadata, name, description, inherit_tools, input_schema + HAVING MAX("version") IS NOT NULL -- This ensures we only proceed if a version exists +) +INSERT INTO tasks ( + "version", + developer_id, -- $1 + canonical_name, -- $2 + task_id, -- $3 + agent_id, -- $4 + metadata, -- $5 + name, -- $6 + description, -- $7 + inherit_tools, -- $8 + input_schema -- $9 +) +SELECT + current_version + 1, -- version + $1, -- developer_id + COALESCE($2, existing_canonical_name), -- canonical_name + $3, -- task_id + $4, -- agent_id + COALESCE($5::jsonb, existing_metadata), -- metadata + COALESCE($6, existing_name), -- name + COALESCE($7, existing_description), -- description + COALESCE($8, existing_inherit_tools), -- inherit_tools + COALESCE($9::jsonb, existing_input_schema) -- input_schema +FROM current_version +RETURNING *; +""").sql(pretty=True) + +# When main is None - just copy existing workflows with new version +copy_workflows_query = parse_one(""" +WITH current_version AS ( + SELECT MAX(version) - 1 as current_version + FROM tasks + WHERE developer_id = $1 AND task_id = $2 +) +INSERT INTO workflows ( + developer_id, + task_id, + version, + name, + step_idx, + step_type, + step_definition +) +SELECT + developer_id, + task_id, + (SELECT current_version + 1 FROM current_version), -- new version + name, + step_idx, + step_type, + step_definition +FROM workflows +WHERE developer_id = $1 +AND task_id = $2 +AND version = (SELECT current_version FROM current_version) +""").sql(pretty=True) + +# When main is provided - create new workflows (existing query) +new_workflows_query = parse_one(""" +WITH current_version AS ( + SELECT COALESCE(MAX(version), 0) - 1 as next_version + FROM tasks + WHERE developer_id = $1 AND task_id = $2 +) +INSERT INTO workflows ( + developer_id, + task_id, + version, + name, + step_idx, + step_type, + step_definition +) +SELECT + $1, -- developer_id + $2, -- task_id + next_version + 1, -- version + $3, -- name + $4, -- step_idx + $5, -- step_type + $6 -- step_definition +FROM current_version +""").sql(pretty=True) + + +@rewrap_exceptions( + { + asyncpg.ForeignKeyViolationError: partialclass( + HTTPException, + status_code=404, + detail="The specified developer or agent does not exist.", + ), + asyncpg.UniqueViolationError: partialclass( + HTTPException, + status_code=409, + detail="A task with this ID already exists for this agent.", + ), + asyncpg.NoDataFoundError: partialclass( + HTTPException, + status_code=404, + detail="Task not found", + ), + } +) +@wrap_in_class( + ResourceUpdatedResponse, + one=True, + transform=lambda d: {"id": d["task_id"], "updated_at": utcnow()}, +) +@increase_counter("patch_task") +@pg_query(return_index=0) +@beartype +async def patch_task( + *, + developer_id: UUID, + task_id: UUID, + agent_id: UUID, + data: PatchTaskRequest, +) -> list[tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]]: + """ + Updates a task and its associated workflows with version control. + Only updates the fields that are provided in the request. + + Parameters: + developer_id (UUID): The unique identifier of the developer. + task_id (UUID): The unique identifier of the task to update. + data (PatchTaskRequest): The partial update data. + agent_id (UUID): The unique identifier of the agent. + Returns: + list[tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]]: List of queries to execute. + """ + # Parameters for patching the task + + patch_task_params = [ + developer_id, # $1 + data.canonical_name, # $2 + task_id, # $3 + agent_id, # $4 + data.metadata or None, # $5 + data.name or None, # $6 + data.description or None, # $7 + data.inherit_tools, # $8 + data.input_schema, # $9 + ] + + if data.main is None: + workflow_query = copy_workflows_query + workflow_params = [[developer_id, task_id]] # Only need these params + else: + workflow_query = new_workflows_query + workflow_params = [] + workflows_spec = task_to_spec(data).model_dump(exclude_none=True, mode="json") + for workflow in workflows_spec.get("workflows", []): + workflow_name = workflow.get("name") + steps = workflow.get("steps", []) + for step_idx, step in enumerate(steps): + workflow_params.append( + [ + developer_id, # $1 + task_id, # $2 + workflow_name, # $3 + step_idx, # $4 + step["kind_"], # $5 + step[step["kind_"]], # $6 + ] + ) + + return [ + (patch_task_query, patch_task_params, "fetchrow"), + (workflow_query, workflow_params, "fetchmany"), + ] diff --git a/agents-api/agents_api/queries/tasks/update_task.py b/agents-api/agents_api/queries/tasks/update_task.py new file mode 100644 index 000000000..2199da7b0 --- /dev/null +++ b/agents-api/agents_api/queries/tasks/update_task.py @@ -0,0 +1,188 @@ +from typing import Literal +from uuid import UUID + +import asyncpg +from beartype import beartype +from fastapi import HTTPException +from sqlglot import parse_one + +from ...autogen.openapi_model import ResourceUpdatedResponse, UpdateTaskRequest +from ...common.protocol.tasks import task_to_spec +from ...common.utils.datetime import utcnow +from ...metrics.counters import increase_counter +from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class + +# # Update task query using UPDATE +# update_task_query = parse_one(""" +# UPDATE tasks +# SET +# version = version + 1, +# canonical_name = $2, +# agent_id = $4, +# metadata = $5, +# name = $6, +# description = $7, +# inherit_tools = $8, +# input_schema = $9::jsonb, +# updated_at = NOW() +# WHERE +# developer_id = $1 +# AND task_id = $3 +# RETURNING *; +# """).sql(pretty=True) + +# Update task query using INSERT with version increment +update_task_query = parse_one(""" +WITH current_version AS ( + SELECT MAX("version") as current_version, + canonical_name as existing_canonical_name + FROM tasks + WHERE developer_id = $1 + AND task_id = $3 + GROUP BY task_id, canonical_name + HAVING MAX("version") IS NOT NULL -- This ensures we only proceed if a version exists +) +INSERT INTO tasks ( + "version", + developer_id, -- $1 + canonical_name, -- $2 + task_id, -- $3 + agent_id, -- $4 + metadata, -- $5 + name, -- $6 + description, -- $7 + inherit_tools, -- $8 + input_schema, -- $9 +) +SELECT + current_version + 1, -- version + $1, -- developer_id + COALESCE($2, existing_canonical_name), -- canonical_name + $3, -- task_id + $4, -- agent_id + $5::jsonb, -- metadata + $6, -- name + $7, -- description + $8, -- inherit_tools + $9::jsonb -- input_schema +FROM current_version +RETURNING *; +""").sql(pretty=True) + +# Update workflows query to use UPDATE instead of INSERT +workflows_query = parse_one(""" +WITH version AS ( + SELECT COALESCE(MAX(version), 0) as current_version + FROM tasks + WHERE developer_id = $1 AND task_id = $2 +) +INSERT INTO workflows ( + developer_id, + task_id, + version, + name, + step_idx, + step_type, + step_definition +) +SELECT + $1, -- developer_id + $2, -- task_id + current_version, -- version (from CTE) + $3, -- name + $4, -- step_idx + $5, -- step_type + $6 -- step_definition +FROM version +""").sql(pretty=True) + + +@rewrap_exceptions( + { + asyncpg.ForeignKeyViolationError: partialclass( + HTTPException, + status_code=404, + detail="The specified developer or agent does not exist.", + ), + asyncpg.UniqueViolationError: partialclass( + HTTPException, + status_code=409, + detail="A task with this ID already exists for this agent.", + ), + asyncpg.NoDataFoundError: partialclass( + HTTPException, + status_code=404, + detail="Task not found", + ), + } +) +@wrap_in_class( + ResourceUpdatedResponse, + one=True, + transform=lambda d: {"id": d["task_id"], "updated_at": utcnow()}, +) +@increase_counter("update_task") +@pg_query(return_index=0) +@beartype +async def update_task( + *, + developer_id: UUID, + task_id: UUID, + agent_id: UUID, + data: UpdateTaskRequest, +) -> list[tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]]: + """ + Updates a task and its associated workflows with version control. + + Parameters: + developer_id (UUID): The unique identifier of the developer. + task_id (UUID): The unique identifier of the task to update. + data (UpdateTaskRequest): The update data. + agent_id (UUID): The unique identifier of the agent. + Returns: + list[tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]]: List of queries to execute. + """ + print("UPDATING TIIIIIME") + # Parameters for updating the task + update_task_params = [ + developer_id, # $1 + data.canonical_name, # $2 + task_id, # $3 + agent_id, # $4 + data.metadata or {}, # $5 + data.name, # $6 + data.description, # $7 + data.inherit_tools, # $8 + data.input_schema or {}, # $9 + ] + + # Generate workflows from task data + workflows_spec = task_to_spec(data).model_dump(exclude_none=True, mode="json") + workflow_params = [] + for workflow in workflows_spec.get("workflows", []): + workflow_name = workflow.get("name") + steps = workflow.get("steps", []) + for step_idx, step in enumerate(steps): + workflow_params.append( + [ + developer_id, # $1 + task_id, # $2 + workflow_name, # $3 + step_idx, # $4 + step["kind_"], # $5 + step[step["kind_"]], # $6 + ] + ) + + return [ + ( + update_task_query, + update_task_params, + "fetchrow", + ), + ( + workflows_query, + workflow_params, + "fetchmany", + ), + ] diff --git a/agents-api/agents_api/queries/utils.py b/agents-api/agents_api/queries/utils.py index 0d139cb91..d736a30c1 100644 --- a/agents-api/agents_api/queries/utils.py +++ b/agents-api/agents_api/queries/utils.py @@ -172,13 +172,20 @@ async def wrapper( results: list[Record] = await method( query, *args, timeout=timeout ) - all_results.append(results) - - if method_name == "fetchrow" and ( - len(results) == 0 or results.get("bool", True) is None - ): + if method_name == "fetchrow": + results = ( + [results] + if results is not None + and results.get("bool", False) is not None + and results.get("exists", True) is not False + else [] + ) + + if method_name == "fetchrow" and len(results) == 0: raise asyncpg.NoDataFoundError("No data found") + all_results.append(results) + end = timeit and time.perf_counter() timeit and print( @@ -238,6 +245,10 @@ def _return_data(rec: list[Record]): return obj objs: list[ModelT] = [cls(**item) for item in map(transform, data)] + print("data", data) + print("-" * 10) + print("objs", objs) + print("-" * 100) return objs def decorator( diff --git a/agents-api/tests/fixtures.py b/agents-api/tests/fixtures.py index 430a2e3c5..fa996f560 100644 --- a/agents-api/tests/fixtures.py +++ b/agents-api/tests/fixtures.py @@ -10,6 +10,7 @@ CreateAgentRequest, CreateFileRequest, CreateSessionRequest, + CreateTaskRequest, CreateUserRequest, ) from agents_api.clients.pg import create_db_pool @@ -29,8 +30,8 @@ # from agents_api.queries.files.delete_file import delete_file from agents_api.queries.sessions.create_session import create_session +from agents_api.queries.tasks.create_task import create_task -# from agents_api.queries.task.create_task import create_task # from agents_api.queries.task.delete_task import delete_task # from agents_api.queries.tools.create_tools import create_tools # from agents_api.queries.tools.delete_tool import delete_tool @@ -148,6 +149,24 @@ async def test_file(dsn=pg_dsn, developer=test_developer, user=test_user): return file +@fixture(scope="test") +async def test_task(dsn=pg_dsn, developer=test_developer, agent=test_agent): + pool = await create_db_pool(dsn=dsn) + task = await create_task( + developer_id=developer.id, + agent_id=agent.id, + task_id=uuid7(), + data=CreateTaskRequest( + name="test task", + description="test task about", + input_schema={"type": "object", "additionalProperties": True}, + main=[{"evaluate": {"hi": "_"}}], + ), + connection_pool=pool, + ) + return task + + @fixture(scope="test") async def random_email(): return f"{"".join([random.choice(string.ascii_lowercase) for _ in range(10)])}@mail.com" @@ -157,7 +176,7 @@ async def random_email(): async def test_new_developer(dsn=pg_dsn, email=random_email): pool = await create_db_pool(dsn=dsn) dev_id = uuid7() - developer = await create_developer( + await create_developer( email=email, active=True, tags=["tag1"], @@ -166,6 +185,11 @@ async def test_new_developer(dsn=pg_dsn, email=random_email): connection_pool=pool, ) + developer = await get_developer( + developer_id=dev_id, + connection_pool=pool, + ) + return developer diff --git a/agents-api/tests/test_developer_queries.py b/agents-api/tests/test_developer_queries.py index eedc07dd2..6d94b3209 100644 --- a/agents-api/tests/test_developer_queries.py +++ b/agents-api/tests/test_developer_queries.py @@ -3,7 +3,9 @@ from uuid_extensions import uuid7 from ward import raises, test +from agents_api.autogen.openapi_model import ResourceCreatedResponse from agents_api.clients.pg import create_db_pool +from agents_api.common.protocol.developers import Developer from agents_api.queries.developers.create_developer import create_developer from agents_api.queries.developers.get_developer import ( get_developer, @@ -32,6 +34,7 @@ async def _(dsn=pg_dsn, dev=test_new_developer): connection_pool=pool, ) + assert type(developer) == Developer assert developer.id == dev.id assert developer.email == dev.email assert developer.active @@ -52,11 +55,9 @@ async def _(dsn=pg_dsn): connection_pool=pool, ) + assert type(developer) == ResourceCreatedResponse assert developer.id == dev_id - assert developer.email == "m@mail.com" - assert developer.active - assert developer.tags == ["tag1"] - assert developer.settings == {"key1": "val1"} + assert developer.created_at is not None @test("query: update developer") @@ -71,10 +72,6 @@ async def _(dsn=pg_dsn, dev=test_new_developer, email=random_email): ) assert developer.id == dev.id - assert developer.email == email - assert developer.active - assert developer.tags == ["tag2"] - assert developer.settings == {"key2": "val2"} @test("query: patch developer") diff --git a/agents-api/tests/test_entry_queries.py b/agents-api/tests/test_entry_queries.py index 706185c7b..1b5618974 100644 --- a/agents-api/tests/test_entry_queries.py +++ b/agents-api/tests/test_entry_queries.py @@ -3,8 +3,6 @@ It verifies the functionality of adding, retrieving, and processing entries as defined in the schema. """ -from uuid import UUID - from fastapi import HTTPException from uuid_extensions import uuid7 from ward import raises, test @@ -48,7 +46,7 @@ async def _(dsn=pg_dsn, developer=test_developer): assert exc_info.raised.status_code == 404 -@test("query: list entries no session") +@test("query: list entries sql - no session") async def _(dsn=pg_dsn, developer=test_developer): """Test the retrieval of entries from the database.""" @@ -63,7 +61,7 @@ async def _(dsn=pg_dsn, developer=test_developer): assert exc_info.raised.status_code == 404 -@test("query: get entries") +@test("query: list entries sql - session exists") async def _(dsn=pg_dsn, developer_id=test_developer_id, session=test_session): """Test the retrieval of entries from the database.""" @@ -101,7 +99,7 @@ async def _(dsn=pg_dsn, developer_id=test_developer_id, session=test_session): assert result is not None -@test("query: get history") +@test("query: get history sql - session exists") async def _(dsn=pg_dsn, developer_id=test_developer_id, session=test_session): """Test the retrieval of entry history from the database.""" @@ -140,7 +138,7 @@ async def _(dsn=pg_dsn, developer_id=test_developer_id, session=test_session): assert result.entries[0].id -@test("query: delete entries") +@test("query: delete entries sql - session exists") async def _(dsn=pg_dsn, developer_id=test_developer_id, session=test_session): """Test the deletion of entries from the database.""" diff --git a/agents-api/tests/test_files_queries.py b/agents-api/tests/test_files_queries.py index 92b52d733..c83c7a6f6 100644 --- a/agents-api/tests/test_files_queries.py +++ b/agents-api/tests/test_files_queries.py @@ -1,9 +1,7 @@ # # Tests for entry queries -from fastapi import HTTPException -from uuid_extensions import uuid7 -from ward import raises, test +from ward import test from agents_api.autogen.openapi_model import CreateFileRequest from agents_api.clients.pg import create_db_pool diff --git a/agents-api/tests/test_session_queries.py b/agents-api/tests/test_session_queries.py index 4673d6fc5..a2b899a89 100644 --- a/agents-api/tests/test_session_queries.py +++ b/agents-api/tests/test_session_queries.py @@ -56,7 +56,9 @@ async def _( ) assert result is not None - assert isinstance(result, Session), f"Result is not a Session, {result}" + assert isinstance( + result, ResourceCreatedResponse + ), f"Result is not a Session, {result}" assert result.id == session_id @@ -148,8 +150,8 @@ async def _(dsn=pg_dsn, developer_id=test_developer_id, session=test_session): assert isinstance(result, list) assert len(result) >= 1 assert all( - s.situation == session.situation for s in result - ), f"Result is not a list of sessions, {result}, {session.situation}" + isinstance(s, Session) for s in result + ), f"Result is not a list of sessions, {result}" @test("query: count sessions") @@ -227,7 +229,6 @@ async def _( session_id=session.id, connection_pool=pool, ) - assert patched_session.situation == session.situation assert patched_session.metadata == {"test": "metadata"} diff --git a/agents-api/tests/test_task_queries.py b/agents-api/tests/test_task_queries.py index 1a9fcd544..c4303bb97 100644 --- a/agents-api/tests/test_task_queries.py +++ b/agents-api/tests/test_task_queries.py @@ -1,160 +1,335 @@ -# # Tests for task queries - -# from uuid_extensions import uuid7 -# from ward import test - -# from agents_api.autogen.openapi_model import ( -# CreateTaskRequest, -# ResourceUpdatedResponse, -# Task, -# UpdateTaskRequest, -# ) -# from agents_api.queries.task.create_or_update_task import create_or_update_task -# from agents_api.queries.task.create_task import create_task -# from agents_api.queries.task.delete_task import delete_task -# from agents_api.queries.task.get_task import get_task -# from agents_api.queries.task.list_tasks import list_tasks -# from agents_api.queries.task.update_task import update_task -# from tests.fixtures import cozo_client, test_agent, test_developer_id, test_task - - -# @test("query: create task") -# def _(client=cozo_client, developer_id=test_developer_id, agent=test_agent): -# task_id = uuid7() - -# create_task( -# developer_id=developer_id, -# agent_id=agent.id, -# task_id=task_id, -# data=CreateTaskRequest( -# **{ -# "name": "test task", -# "description": "test task about", -# "input_schema": {"type": "object", "additionalProperties": True}, -# "main": [{"evaluate": {"hi": "_"}}], -# } -# ), -# client=client, -# ) - - -# @test("query: create or update task") -# def _(client=cozo_client, developer_id=test_developer_id, agent=test_agent): -# task_id = uuid7() - -# create_or_update_task( -# developer_id=developer_id, -# agent_id=agent.id, -# task_id=task_id, -# data=CreateTaskRequest( -# **{ -# "name": "test task", -# "description": "test task about", -# "input_schema": {"type": "object", "additionalProperties": True}, -# "main": [{"evaluate": {"hi": "_"}}], -# } -# ), -# client=client, -# ) - - -# @test("query: get task not exists") -# def _(client=cozo_client, developer_id=test_developer_id): -# task_id = uuid7() - -# try: -# get_task( -# developer_id=developer_id, -# task_id=task_id, -# client=client, -# ) -# except Exception: -# pass -# else: -# assert False, "Task should not exist" - - -# @test("query: get task exists") -# def _(client=cozo_client, developer_id=test_developer_id, task=test_task): -# result = get_task( -# developer_id=developer_id, -# task_id=task.id, -# client=client, -# ) - -# assert result is not None -# assert isinstance(result, Task) - - -# @test("query: delete task") -# def _(client=cozo_client, developer_id=test_developer_id, agent=test_agent): -# task = create_task( -# developer_id=developer_id, -# agent_id=agent.id, -# data=CreateTaskRequest( -# **{ -# "name": "test task", -# "description": "test task about", -# "input_schema": {"type": "object", "additionalProperties": True}, -# "main": [{"evaluate": {"hi": "_"}}], -# } -# ), -# client=client, -# ) - -# delete_task( -# developer_id=developer_id, -# agent_id=agent.id, -# task_id=task.id, -# client=client, -# ) - -# try: -# get_task( -# developer_id=developer_id, -# task_id=task.id, -# client=client, -# ) -# except Exception: -# pass - -# else: -# assert False, "Task should not exist" - - -# @test("query: update task") -# def _( -# client=cozo_client, developer_id=test_developer_id, agent=test_agent, task=test_task -# ): -# result = update_task( -# developer_id=developer_id, -# task_id=task.id, -# agent_id=agent.id, -# data=UpdateTaskRequest( -# **{ -# "name": "updated task", -# "description": "updated task about", -# "input_schema": {"type": "object", "additionalProperties": True}, -# "main": [{"evaluate": {"hi": "_"}}], -# } -# ), -# client=client, -# ) - -# assert result is not None -# assert isinstance(result, ResourceUpdatedResponse) - - -# @test("query: list tasks") -# def _( -# client=cozo_client, developer_id=test_developer_id, task=test_task, agent=test_agent -# ): -# result = list_tasks( -# developer_id=developer_id, -# agent_id=agent.id, -# client=client, -# ) - -# assert isinstance(result, list) -# assert len(result) > 0 -# assert all(isinstance(task, Task) for task in result) +# Tests for task queries + +from fastapi import HTTPException +from uuid_extensions import uuid7 +from ward import raises, test + +from agents_api.autogen.openapi_model import ( + CreateTaskRequest, + PatchTaskRequest, + ResourceUpdatedResponse, + Task, + UpdateTaskRequest, +) +from agents_api.clients.pg import create_db_pool +from agents_api.queries.tasks.create_or_update_task import create_or_update_task +from agents_api.queries.tasks.create_task import create_task +from agents_api.queries.tasks.delete_task import delete_task +from agents_api.queries.tasks.get_task import get_task +from agents_api.queries.tasks.list_tasks import list_tasks +from agents_api.queries.tasks.patch_task import patch_task +from agents_api.queries.tasks.update_task import update_task +from tests.fixtures import pg_dsn, test_agent, test_developer_id, test_task + + +@test("query: create task sql") +async def _(dsn=pg_dsn, developer_id=test_developer_id, agent=test_agent): + """Test that a task can be successfully created.""" + + pool = await create_db_pool(dsn=dsn) + await create_task( + developer_id=developer_id, + agent_id=agent.id, + task_id=uuid7(), + data=CreateTaskRequest( + name="test task", + description="test task about", + input_schema={"type": "object", "additionalProperties": True}, + main=[{"evaluate": {"hi": "_"}}], + ), + connection_pool=pool, + ) + + +@test("query: create or update task sql") +async def _(dsn=pg_dsn, developer_id=test_developer_id, agent=test_agent): + """Test that a task can be successfully created or updated.""" + + pool = await create_db_pool(dsn=dsn) + await create_or_update_task( + developer_id=developer_id, + agent_id=agent.id, + task_id=uuid7(), + data=CreateTaskRequest( + name="test task", + description="test task about", + input_schema={"type": "object", "additionalProperties": True}, + main=[{"evaluate": {"hi": "_"}}], + ), + connection_pool=pool, + ) + + +@test("query: get task sql - exists") +async def _(dsn=pg_dsn, developer_id=test_developer_id, task=test_task): + """Test that an existing task can be successfully retrieved.""" + + pool = await create_db_pool(dsn=dsn) + + # Then retrieve it + result = await get_task( + developer_id=developer_id, + task_id=task.id, + connection_pool=pool, + ) + assert result is not None + assert isinstance(result, Task), f"Result is not a Task, got {type(result)}" + assert result.id == task.id + assert result.name == "test task" + assert result.description == "test task about" + + +@test("query: get task sql - not exists") +async def _(dsn=pg_dsn, developer_id=test_developer_id): + """Test that attempting to retrieve a non-existent task raises an error.""" + + pool = await create_db_pool(dsn=dsn) + task_id = uuid7() + + with raises(HTTPException) as exc: + await get_task( + developer_id=developer_id, + task_id=task_id, + connection_pool=pool, + ) + + assert exc.raised.status_code == 404 + assert "Task not found" in str(exc.raised.detail) + + +@test("query: delete task sql - exists") +async def _(dsn=pg_dsn, developer_id=test_developer_id, task=test_task): + """Test that a task can be successfully deleted.""" + + pool = await create_db_pool(dsn=dsn) + + # First verify task exists + result = await get_task( + developer_id=developer_id, + task_id=task.id, + connection_pool=pool, + ) + assert result is not None + assert result.id == task.id + + # Delete the task + deleted = await delete_task( + developer_id=developer_id, + task_id=task.id, + connection_pool=pool, + ) + assert deleted is not None + assert deleted.id == task.id + + # Verify task no longer exists + with raises(HTTPException) as exc: + await get_task( + developer_id=developer_id, + task_id=task.id, + connection_pool=pool, + ) + + assert exc.raised.status_code == 404 + assert "Task not found" in str(exc.raised.detail) + + +@test("query: delete task sql - not exists") +async def _(dsn=pg_dsn, developer_id=test_developer_id): + """Test that attempting to delete a non-existent task raises an error.""" + + pool = await create_db_pool(dsn=dsn) + task_id = uuid7() + + with raises(HTTPException) as exc: + await delete_task( + developer_id=developer_id, + task_id=task_id, + connection_pool=pool, + ) + + assert exc.raised.status_code == 404 + assert "Task not found" in str(exc.raised.detail) + + +# Add tests for list tasks +@test("query: list tasks sql - with filters") +async def _(dsn=pg_dsn, developer_id=test_developer_id, agent=test_agent): + """Test that tasks can be successfully filtered and retrieved.""" + + pool = await create_db_pool(dsn=dsn) + result = await list_tasks( + developer_id=developer_id, + limit=10, + offset=0, + sort_by="updated_at", + direction="asc", + metadata_filter={"test": True}, + connection_pool=pool, + ) + assert result is not None + assert isinstance(result, list) + assert all(isinstance(task, Task) for task in result) + assert all(task.metadata.get("test") == True for task in result) + + +@test("query: list tasks sql - no filters") +async def _(dsn=pg_dsn, developer_id=test_developer_id, agent=test_agent): + """Test that a list of tasks can be successfully retrieved.""" + + pool = await create_db_pool(dsn=dsn) + result = await list_tasks( + developer_id=developer_id, + connection_pool=pool, + ) + assert result is not None + assert isinstance(result, list) + assert len(result) > 0 + assert all(isinstance(task, Task) for task in result) + + +@test("query: update task sql - exists") +async def _( + dsn=pg_dsn, developer_id=test_developer_id, agent=test_agent, task=test_task +): + """Test that a task can be successfully updated.""" + + pool = await create_db_pool(dsn=dsn) + updated = await update_task( + developer_id=developer_id, + task_id=task.id, + agent_id=agent.id, + data=UpdateTaskRequest( + **{ + "name": "updated task", + "canonical_name": "updated_task", + "description": "updated task description", + "input_schema": {"type": "object", "additionalProperties": True}, + "main": [{"evaluate": {"hi": "_"}}], + "inherit_tools": False, + "metadata": {"updated": True}, + } + ), + connection_pool=pool, + ) + + assert updated is not None + assert isinstance(updated, ResourceUpdatedResponse) + assert updated.id == task.id + + # Verify task was updated + updated_task = await get_task( + developer_id=developer_id, + task_id=task.id, + connection_pool=pool, + ) + assert updated_task.name == "updated task" + assert updated_task.description == "updated task description" + assert updated_task.metadata == {"updated": True} + + +@test("query: update task sql - not exists") +async def _(dsn=pg_dsn, developer_id=test_developer_id, agent=test_agent): + """Test that attempting to update a non-existent task raises an error.""" + + pool = await create_db_pool(dsn=dsn) + task_id = uuid7() + + with raises(HTTPException) as exc: + await update_task( + developer_id=developer_id, + task_id=task_id, + agent_id=agent.id, + data=UpdateTaskRequest( + **{ + "canonical_name": "updated_task", + "name": "updated task", + "description": "updated task description", + "input_schema": {"type": "object", "additionalProperties": True}, + "main": [{"evaluate": {"hi": "_"}}], + "inherit_tools": False, + } + ), + connection_pool=pool, + ) + + assert exc.raised.status_code == 404 + assert "Task not found" in str(exc.raised.detail) + + +@test("query: patch task sql - exists") +async def _(dsn=pg_dsn, developer_id=test_developer_id, agent=test_agent): + """Test that patching an existing task works correctly.""" + pool = await create_db_pool(dsn=dsn) + + # Create initial task + task = await create_task( + developer_id=developer_id, + agent_id=agent.id, + data=CreateTaskRequest( + **{ + "canonical_name": "test_task", + "name": "test task", + "description": "test task description", + "input_schema": {"type": "object", "additionalProperties": True}, + "main": [{"evaluate": {"hi": "_"}}], + "inherit_tools": False, + "metadata": {"initial": True}, + } + ), + connection_pool=pool, + ) + + # Patch the task + updated = await patch_task( + developer_id=developer_id, + task_id=task.id, + agent_id=agent.id, + data=PatchTaskRequest( + **{ + "name": "patched task", + "metadata": {"patched": True}, + } + ), + connection_pool=pool, + ) + + assert updated is not None + assert isinstance(updated, ResourceUpdatedResponse) + assert updated.id == task.id + + # Verify task was patched correctly + patched_task = await get_task( + developer_id=developer_id, + task_id=task.id, + connection_pool=pool, + ) + # Check that patched fields were updated + assert patched_task.name == "patched task" + assert patched_task.metadata == {"patched": True} + # Check that non-patched fields remain unchanged + assert patched_task.canonical_name == "test_task" + assert patched_task.description == "test task description" + + +@test("query: patch task sql - not exists") +async def _(dsn=pg_dsn, developer_id=test_developer_id, agent=test_agent): + """Test that attempting to patch a non-existent task raises an error.""" + pool = await create_db_pool(dsn=dsn) + task_id = uuid7() + + with raises(HTTPException) as exc: + await patch_task( + developer_id=developer_id, + task_id=task_id, + agent_id=agent.id, + data=PatchTaskRequest( + **{ + "name": "patched task", + "metadata": {"patched": True}, + } + ), + connection_pool=pool, + ) + + assert exc.raised.status_code == 404 + assert "Task not found" in str(exc.raised.detail) diff --git a/integrations-service/integrations/autogen/Tasks.py b/integrations-service/integrations/autogen/Tasks.py index b9212d8cb..f6bf58ddf 100644 --- a/integrations-service/integrations/autogen/Tasks.py +++ b/integrations-service/integrations/autogen/Tasks.py @@ -161,8 +161,21 @@ class CreateTaskRequest(BaseModel): model_config = ConfigDict( populate_by_name=True, ) - name: str + name: Annotated[str, Field(max_length=255, min_length=1)] + """ + The name of the task. + """ + canonical_name: Annotated[ + str | None, + Field(max_length=255, min_length=1, pattern="^[a-zA-Z][a-zA-Z0-9_]*$"), + ] = None + """ + The canonical name of the task. + """ description: str = "" + """ + The description of the task. + """ main: Annotated[ list[ EvaluateStep @@ -650,7 +663,21 @@ class PatchTaskRequest(BaseModel): model_config = ConfigDict( populate_by_name=True, ) + name: Annotated[str | None, Field(max_length=255, min_length=1)] = None + """ + The name of the task. + """ + canonical_name: Annotated[ + str | None, + Field(max_length=255, min_length=1, pattern="^[a-zA-Z][a-zA-Z0-9_]*$"), + ] = None + """ + The canonical name of the task. + """ description: str = "" + """ + The description of the task. + """ main: Annotated[ list[ EvaluateStep @@ -966,8 +993,21 @@ class Task(BaseModel): model_config = ConfigDict( populate_by_name=True, ) - name: str + name: Annotated[str, Field(max_length=255, min_length=1)] + """ + The name of the task. + """ + canonical_name: Annotated[ + str | None, + Field(max_length=255, min_length=1, pattern="^[a-zA-Z][a-zA-Z0-9_]*$"), + ] = None + """ + The canonical name of the task. + """ description: str = "" + """ + The description of the task. + """ main: Annotated[ list[ EvaluateStep @@ -1124,7 +1164,21 @@ class UpdateTaskRequest(BaseModel): model_config = ConfigDict( populate_by_name=True, ) + name: Annotated[str, Field(max_length=255, min_length=1)] + """ + The name of the task. + """ + canonical_name: Annotated[ + str | None, + Field(max_length=255, min_length=1, pattern="^[a-zA-Z][a-zA-Z0-9_]*$"), + ] = None + """ + The canonical name of the task. + """ description: str = "" + """ + The description of the task. + """ main: Annotated[ list[ EvaluateStep diff --git a/memory-store/migrations/000002_developers.up.sql b/memory-store/migrations/000002_developers.up.sql index 9ca9dca69..e18e42248 100644 --- a/memory-store/migrations/000002_developers.up.sql +++ b/memory-store/migrations/000002_developers.up.sql @@ -12,11 +12,21 @@ CREATE TABLE IF NOT EXISTS developers ( created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, CONSTRAINT pk_developers PRIMARY KEY (developer_id), - CONSTRAINT uq_developers_email UNIQUE (email) + CONSTRAINT uq_developers_email UNIQUE (email), + CONSTRAINT ct_settings_is_object CHECK (jsonb_typeof(settings) = 'object') ); -- Create sorted index on developer_id (optimized for UUID v7) -CREATE INDEX IF NOT EXISTS idx_developers_id_sorted ON developers (developer_id DESC); +CREATE INDEX IF NOT EXISTS idx_developers_id_sorted ON developers (developer_id DESC) INCLUDE ( + email, + active, + tags, + settings, + created_at, + updated_at +) +WHERE + active = TRUE; -- Create index on email CREATE INDEX IF NOT EXISTS idx_developers_email ON developers (email); @@ -24,11 +34,6 @@ CREATE INDEX IF NOT EXISTS idx_developers_email ON developers (email); -- Create GIN index for tags array CREATE INDEX IF NOT EXISTS idx_developers_tags ON developers USING GIN (tags); --- Create partial index for active developers -CREATE INDEX IF NOT EXISTS idx_developers_active ON developers (developer_id) -WHERE - active = TRUE; - -- Create trigger to automatically update updated_at DO $$ BEGIN diff --git a/memory-store/migrations/000004_agents.up.sql b/memory-store/migrations/000004_agents.up.sql index 32e066f71..1254cba5f 100644 --- a/memory-store/migrations/000004_agents.up.sql +++ b/memory-store/migrations/000004_agents.up.sql @@ -1,16 +1,5 @@ BEGIN; --- Drop existing objects if they exist -DROP TRIGGER IF EXISTS trg_agents_updated_at ON agents; - -DROP INDEX IF EXISTS idx_agents_metadata; - -DROP INDEX IF EXISTS idx_agents_developer; - -DROP INDEX IF EXISTS idx_agents_id_sorted; - -DROP TABLE IF EXISTS agents; - -- Create agents table CREATE TABLE IF NOT EXISTS agents ( developer_id UUID NOT NULL, @@ -35,7 +24,9 @@ CREATE TABLE IF NOT EXISTS agents ( default_settings JSONB NOT NULL DEFAULT '{}'::JSONB, CONSTRAINT pk_agents PRIMARY KEY (developer_id, agent_id), CONSTRAINT uq_agents_canonical_name_unique UNIQUE (developer_id, canonical_name), -- per developer - CONSTRAINT ct_agents_canonical_name_valid_identifier CHECK (canonical_name ~ '^[a-zA-Z][a-zA-Z0-9_]*$') + CONSTRAINT ct_agents_canonical_name_valid_identifier CHECK (canonical_name ~ '^[a-zA-Z][a-zA-Z0-9_]*$'), + CONSTRAINT ct_agents_metadata_is_object CHECK (jsonb_typeof(metadata) = 'object'), + CONSTRAINT ct_agents_default_settings_is_object CHECK (jsonb_typeof(default_settings) = 'object') ); -- Create sorted index on agent_id (optimized for UUID v7) diff --git a/memory-store/migrations/000005_files.up.sql b/memory-store/migrations/000005_files.up.sql index 40a2cbccf..1a851ca0b 100644 --- a/memory-store/migrations/000005_files.up.sql +++ b/memory-store/migrations/000005_files.up.sql @@ -56,6 +56,16 @@ DO $$ BEGIN END IF; END $$; +-- Create the user_files table +CREATE TABLE IF NOT EXISTS user_files ( + developer_id UUID NOT NULL, + user_id UUID NOT NULL, + file_id UUID NOT NULL, + CONSTRAINT pk_user_files PRIMARY KEY (developer_id, user_id, file_id), + CONSTRAINT fk_user_files_user FOREIGN KEY (developer_id, user_id) REFERENCES users (developer_id, user_id), + CONSTRAINT fk_user_files_file FOREIGN KEY (developer_id, file_id) REFERENCES files (developer_id, file_id) ON DELETE CASCADE +); + -- Create the file_owners table CREATE TABLE IF NOT EXISTS file_owners ( developer_id UUID NOT NULL, @@ -67,9 +77,21 @@ CREATE TABLE IF NOT EXISTS file_owners ( CONSTRAINT ct_file_owners_owner_type CHECK (owner_type IN ('user', 'agent')) ); +-- Create the agent_files table +CREATE TABLE IF NOT EXISTS agent_files ( + developer_id UUID NOT NULL, + agent_id UUID NOT NULL, + file_id UUID NOT NULL, + CONSTRAINT pk_agent_files PRIMARY KEY (developer_id, agent_id, file_id), + CONSTRAINT fk_agent_files_agent FOREIGN KEY (developer_id, agent_id) REFERENCES agents (developer_id, agent_id), + CONSTRAINT fk_agent_files_file FOREIGN KEY (developer_id, file_id) REFERENCES files (developer_id, file_id) ON DELETE CASCADE +); + -- Create indexes CREATE INDEX IF NOT EXISTS idx_file_owners_owner ON file_owners (developer_id, owner_type, owner_id); +CREATE INDEX IF NOT EXISTS idx_agent_files_agent ON agent_files (developer_id, agent_id); +CREATE INDEX IF NOT EXISTS idx_user_files_user ON user_files (developer_id, user_id); -- Create function to validate owner reference CREATE OR REPLACE FUNCTION validate_file_owner() diff --git a/memory-store/migrations/000006_docs.up.sql b/memory-store/migrations/000006_docs.up.sql index 193fae122..e30f42cc2 100644 --- a/memory-store/migrations/000006_docs.up.sql +++ b/memory-store/migrations/000006_docs.up.sql @@ -29,7 +29,8 @@ CREATE TABLE IF NOT EXISTS docs ( CONSTRAINT ct_docs_embedding_dimensions_positive CHECK (embedding_dimensions > 0), CONSTRAINT ct_docs_valid_modality CHECK (modality IN ('text', 'image', 'mixed')), CONSTRAINT ct_docs_index_positive CHECK (index >= 0), - CONSTRAINT ct_docs_valid_language CHECK (is_valid_language (language)) + CONSTRAINT ct_docs_valid_language CHECK (is_valid_language (language)), + CONSTRAINT ct_metadata_is_object CHECK (jsonb_typeof(metadata) = 'object') ); -- Create sorted index on doc_id if not exists @@ -78,6 +79,7 @@ CREATE TABLE IF NOT EXISTS doc_owners ( CREATE INDEX IF NOT EXISTS idx_doc_owners_owner ON doc_owners (developer_id, owner_type, owner_id); + -- Create function to validate owner reference CREATE OR REPLACE FUNCTION validate_doc_owner() RETURNS TRIGGER AS $$ @@ -107,7 +109,29 @@ BEFORE INSERT OR UPDATE ON doc_owners FOR EACH ROW EXECUTE FUNCTION validate_doc_owner(); +-- Create the user_docs table +CREATE TABLE IF NOT EXISTS user_docs ( + developer_id UUID NOT NULL, + user_id UUID NOT NULL, + doc_id UUID NOT NULL, + CONSTRAINT pk_user_docs PRIMARY KEY (developer_id, user_id, doc_id), + CONSTRAINT fk_user_docs_user FOREIGN KEY (developer_id, user_id) REFERENCES users (developer_id, user_id), + CONSTRAINT fk_user_docs_doc FOREIGN KEY (developer_id, doc_id) REFERENCES docs (developer_id, doc_id) ON DELETE CASCADE +); + +-- Create the agent_docs table +CREATE TABLE IF NOT EXISTS agent_docs ( + developer_id UUID NOT NULL, + agent_id UUID NOT NULL, + doc_id UUID NOT NULL, + CONSTRAINT pk_agent_docs PRIMARY KEY (developer_id, agent_id, doc_id), + CONSTRAINT fk_agent_docs_agent FOREIGN KEY (developer_id, agent_id) REFERENCES agents (developer_id, agent_id), + CONSTRAINT fk_agent_docs_doc FOREIGN KEY (developer_id, doc_id) REFERENCES docs (developer_id, doc_id) ON DELETE CASCADE +); + -- Create indexes if not exists +CREATE INDEX IF NOT EXISTS idx_user_docs_user ON user_docs (developer_id, user_id); +CREATE INDEX IF NOT EXISTS idx_agent_docs_agent ON agent_docs (developer_id, agent_id); CREATE INDEX IF NOT EXISTS idx_docs_metadata ON docs USING GIN (metadata); -- Enable necessary PostgreSQL extensions diff --git a/memory-store/migrations/000008_tools.up.sql b/memory-store/migrations/000008_tools.up.sql index 159ef3688..3318df8d8 100644 --- a/memory-store/migrations/000008_tools.up.sql +++ b/memory-store/migrations/000008_tools.up.sql @@ -22,7 +22,8 @@ CREATE TABLE IF NOT EXISTS tools ( spec JSONB NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, - CONSTRAINT pk_tools PRIMARY KEY (developer_id, agent_id, tool_id, type, name) + CONSTRAINT pk_tools PRIMARY KEY (developer_id, agent_id, tool_id, type, name), + CONSTRAINT ct_spec_is_object CHECK (jsonb_typeof(spec) = 'object') ); -- Create sorted index on tool_id if it doesn't exist @@ -41,7 +42,7 @@ DO $$ BEGIN ALTER TABLE tools ADD CONSTRAINT fk_tools_agent FOREIGN KEY (developer_id, agent_id) - REFERENCES agents(developer_id, agent_id); + REFERENCES agents(developer_id, agent_id) ON DELETE CASCADE; END IF; END $$; diff --git a/memory-store/migrations/000009_sessions.up.sql b/memory-store/migrations/000009_sessions.up.sql index 75b5fde9a..b014017e0 100644 --- a/memory-store/migrations/000009_sessions.up.sql +++ b/memory-store/migrations/000009_sessions.up.sql @@ -16,21 +16,21 @@ CREATE TABLE IF NOT EXISTS sessions ( recall_options JSONB NOT NULL DEFAULT '{}'::JSONB, CONSTRAINT pk_sessions PRIMARY KEY (developer_id, session_id), CONSTRAINT uq_sessions_session_id UNIQUE (session_id), - CONSTRAINT chk_sessions_token_budget_positive CHECK ( + CONSTRAINT ct_sessions_token_budget_positive CHECK ( token_budget IS NULL OR token_budget > 0 ), - CONSTRAINT chk_sessions_context_overflow_valid CHECK ( + CONSTRAINT ct_sessions_context_overflow_valid CHECK ( context_overflow IS NULL OR context_overflow IN ('truncate', 'adaptive') ), - CONSTRAINT chk_sessions_system_template_not_empty CHECK (length(trim(system_template)) > 0), - CONSTRAINT chk_sessions_situation_not_empty CHECK ( + CONSTRAINT ct_sessions_system_template_not_empty CHECK (length(trim(system_template)) > 0), + CONSTRAINT ct_sessions_situation_not_empty CHECK ( situation IS NULL OR length(trim(situation)) > 0 ), - CONSTRAINT chk_sessions_metadata_valid CHECK (jsonb_typeof(metadata) = 'object'), - CONSTRAINT chk_sessions_recall_options_valid CHECK (jsonb_typeof(recall_options) = 'object') + CONSTRAINT ct_sessions_metadata_is_object CHECK (jsonb_typeof(metadata) = 'object'), + CONSTRAINT ct_sessions_recall_options_is_object CHECK (jsonb_typeof(recall_options) = 'object') ); -- Create indexes if they don't exist @@ -84,7 +84,7 @@ CREATE TABLE IF NOT EXISTS session_lookup ( participant_type, participant_id ), - FOREIGN KEY (developer_id, session_id) REFERENCES sessions (developer_id, session_id) + FOREIGN KEY (developer_id, session_id) REFERENCES sessions (developer_id, session_id) ON DELETE CASCADE ); -- Create indexes if they don't exist diff --git a/memory-store/migrations/000010_tasks.up.sql b/memory-store/migrations/000010_tasks.up.sql index ad27d5bdc..918a09255 100644 --- a/memory-store/migrations/000010_tasks.up.sql +++ b/memory-store/migrations/000010_tasks.up.sql @@ -30,12 +30,12 @@ CREATE TABLE IF NOT EXISTS tasks ( updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, metadata JSONB DEFAULT '{}'::JSONB, CONSTRAINT pk_tasks PRIMARY KEY (developer_id, task_id, "version"), - CONSTRAINT uq_tasks_canonical_name_unique UNIQUE (developer_id, canonical_name), - CONSTRAINT fk_tasks_agent FOREIGN KEY (developer_id, agent_id) REFERENCES agents (developer_id, agent_id), + CONSTRAINT uq_tasks_canonical_name_unique UNIQUE (developer_id, canonical_name, "version"), + CONSTRAINT fk_tasks_agent FOREIGN KEY (developer_id, agent_id) REFERENCES agents (developer_id, agent_id) ON DELETE CASCADE, CONSTRAINT ct_tasks_canonical_name_valid_identifier CHECK (canonical_name ~ '^[a-zA-Z][a-zA-Z0-9_]*$'), - CONSTRAINT chk_tasks_metadata_valid CHECK (jsonb_typeof(metadata) = 'object'), - CONSTRAINT chk_tasks_input_schema_valid CHECK (jsonb_typeof(input_schema) = 'object'), - CONSTRAINT chk_tasks_version_positive CHECK ("version" > 0) + CONSTRAINT ct_tasks_metadata_is_object CHECK (jsonb_typeof(metadata) = 'object'), + CONSTRAINT ct_tasks_input_schema_is_object CHECK (jsonb_typeof(input_schema) = 'object'), + CONSTRAINT ct_tasks_version_positive CHECK ("version" > 0) ); -- Create sorted index on task_id if it doesn't exist @@ -98,20 +98,19 @@ COMMENT ON TABLE tasks IS 'Stores tasks associated with AI agents for developers CREATE TABLE IF NOT EXISTS workflows ( developer_id UUID NOT NULL, task_id UUID NOT NULL, - version INTEGER NOT NULL, - name TEXT NOT NULL CONSTRAINT chk_workflows_name_length CHECK ( - length(name) >= 1 AND length(name) <= 255 - ), - step_idx INTEGER NOT NULL CONSTRAINT chk_workflows_step_idx_positive CHECK (step_idx >= 0), - step_type TEXT NOT NULL CONSTRAINT chk_workflows_step_type_length CHECK ( - length(step_type) >= 1 AND length(step_type) <= 255 + "version" INTEGER NOT NULL, + name TEXT NOT NULL CONSTRAINT ct_workflows_name_length CHECK ( + length(name) >= 1 + AND length(name) <= 255 ), - step_definition JSONB NOT NULL CONSTRAINT chk_workflows_step_definition_valid CHECK ( - jsonb_typeof(step_definition) = 'object' + step_idx INTEGER NOT NULL CONSTRAINT ct_workflows_step_idx_positive CHECK (step_idx >= 0), + step_type TEXT NOT NULL CONSTRAINT ct_workflows_step_type_length CHECK ( + length(step_type) >= 1 + AND length(step_type) <= 255 ), - CONSTRAINT pk_workflows PRIMARY KEY (developer_id, task_id, version, step_idx), - CONSTRAINT fk_workflows_tasks FOREIGN KEY (developer_id, task_id, version) - REFERENCES tasks (developer_id, task_id, version) ON DELETE CASCADE + step_definition JSONB NOT NULL CONSTRAINT ct_workflows_step_definition_valid CHECK (jsonb_typeof(step_definition) = 'object'), + CONSTRAINT pk_workflows PRIMARY KEY (developer_id, task_id, "version", name, step_idx), + CONSTRAINT fk_workflows_tasks FOREIGN KEY (developer_id, task_id, "version") REFERENCES tasks (developer_id, task_id, "version") ON DELETE CASCADE ); -- Create index for 'workflows' table if it doesn't exist diff --git a/memory-store/migrations/000011_executions.up.sql b/memory-store/migrations/000011_executions.up.sql index 976ead369..5184601b2 100644 --- a/memory-store/migrations/000011_executions.up.sql +++ b/memory-store/migrations/000011_executions.up.sql @@ -16,7 +16,8 @@ CREATE TABLE IF NOT EXISTS executions ( created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, CONSTRAINT pk_executions PRIMARY KEY (execution_id), CONSTRAINT fk_executions_developer FOREIGN KEY (developer_id) REFERENCES developers (developer_id), - CONSTRAINT fk_executions_task FOREIGN KEY (developer_id, task_id, task_version) REFERENCES tasks (developer_id, task_id, "version") + CONSTRAINT fk_executions_task FOREIGN KEY (developer_id, task_id, task_version) REFERENCES tasks (developer_id, task_id, "version"), + CONSTRAINT ct_metadata_is_object CHECK (jsonb_typeof(metadata) = 'object') ); -- Create sorted index on execution_id (optimized for UUID v7) diff --git a/memory-store/migrations/000012_transitions.up.sql b/memory-store/migrations/000012_transitions.up.sql index 7bbcf2ad5..5c07172f9 100644 --- a/memory-store/migrations/000012_transitions.up.sql +++ b/memory-store/migrations/000012_transitions.up.sql @@ -49,7 +49,9 @@ CREATE TABLE IF NOT EXISTS transitions ( output JSONB, task_token TEXT DEFAULT NULL, metadata JSONB DEFAULT '{}'::JSONB, - CONSTRAINT pk_transitions PRIMARY KEY (created_at, execution_id, transition_id) + CONSTRAINT pk_transitions PRIMARY KEY (created_at, execution_id, transition_id), + CONSTRAINT ct_step_definition_is_object CHECK (jsonb_typeof(step_definition) = 'object'), + CONSTRAINT ct_metadata_is_object CHECK (jsonb_typeof(metadata) = 'object') ); -- Convert to hypertable if not already @@ -104,7 +106,8 @@ BEGIN ALTER TABLE transitions ADD CONSTRAINT fk_transitions_execution FOREIGN KEY (execution_id) - REFERENCES executions(execution_id); + REFERENCES executions(execution_id) + ON DELETE CASCADE; END IF; END $$; diff --git a/memory-store/migrations/000014_temporal_lookup.up.sql b/memory-store/migrations/000014_temporal_lookup.up.sql index 724ee1340..59c19a781 100644 --- a/memory-store/migrations/000014_temporal_lookup.up.sql +++ b/memory-store/migrations/000014_temporal_lookup.up.sql @@ -9,7 +9,7 @@ CREATE TABLE IF NOT EXISTS temporal_executions_lookup ( result_run_id TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, CONSTRAINT pk_temporal_executions_lookup PRIMARY KEY (execution_id, id), - CONSTRAINT fk_temporal_executions_lookup_execution FOREIGN KEY (execution_id) REFERENCES executions (execution_id) + CONSTRAINT fk_temporal_executions_lookup_execution FOREIGN KEY (execution_id) REFERENCES executions (execution_id) ON DELETE CASCADE ); -- Create sorted index on execution_id (optimized for UUID v7) diff --git a/memory-store/migrations/000015_entries.down.sql b/memory-store/migrations/000015_entries.down.sql index d8afbb826..fdfd6c8dd 100644 --- a/memory-store/migrations/000015_entries.down.sql +++ b/memory-store/migrations/000015_entries.down.sql @@ -14,7 +14,10 @@ DROP INDEX IF EXISTS idx_entries_by_session; -- Drop the hypertable (this will also drop the table) DROP TABLE IF EXISTS entries; +-- Drop the function +DROP FUNCTION IF EXISTS all_jsonb_elements_are_objects; + -- Drop the enum type DROP TYPE IF EXISTS chat_role; -COMMIT; +COMMIT; \ No newline at end of file diff --git a/memory-store/migrations/000015_entries.up.sql b/memory-store/migrations/000015_entries.up.sql index 73723a8bc..8abbee4cf 100644 --- a/memory-store/migrations/000015_entries.up.sql +++ b/memory-store/migrations/000015_entries.up.sql @@ -1,9 +1,33 @@ BEGIN; -- Create chat_role enum -CREATE TYPE chat_role AS ENUM('user', 'assistant', 'tool', 'system', 'developer'); +CREATE TYPE chat_role AS ENUM( + 'user', + 'assistant', + 'tool', + 'system', + 'developer' +); + +-- Create a custom function that checks if `content` is non-empty +-- and that every JSONB element in the array is an 'object'. +CREATE +OR REPLACE FUNCTION all_jsonb_elements_are_objects (content jsonb[]) RETURNS boolean AS $$ +DECLARE + elem jsonb; +BEGIN + -- Check each element in the `content` array + FOREACH elem IN ARRAY content + LOOP + IF jsonb_typeof(elem) <> 'object' THEN + RETURN false; + END IF; + END LOOP; + + RETURN true; +END; +$$ LANGUAGE plpgsql IMMUTABLE; --- Create entries table CREATE TABLE IF NOT EXISTS entries ( session_id UUID NOT NULL, entry_id UUID NOT NULL, @@ -13,13 +37,15 @@ CREATE TABLE IF NOT EXISTS entries ( name TEXT, content JSONB[] NOT NULL, tool_call_id TEXT DEFAULT NULL, - tool_calls JSONB[] NOT NULL DEFAULT '{}', + tool_calls JSONB[] NOT NULL DEFAULT '{}'::JSONB[], model TEXT NOT NULL, token_count INTEGER DEFAULT NULL, tokenizer TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, timestamp DOUBLE PRECISION NOT NULL, - CONSTRAINT pk_entries PRIMARY KEY (session_id, entry_id, created_at) + CONSTRAINT pk_entries PRIMARY KEY (session_id, entry_id, created_at), + CONSTRAINT ct_content_is_array_of_objects CHECK (all_jsonb_elements_are_objects (content)), + CONSTRAINT ct_tool_calls_is_array_of_objects CHECK (all_jsonb_elements_are_objects (tool_calls)) ); -- Convert to hypertable if not already @@ -49,7 +75,7 @@ BEGIN ALTER TABLE entries ADD CONSTRAINT fk_entries_session FOREIGN KEY (session_id) - REFERENCES sessions(session_id); + REFERENCES sessions(session_id) ON DELETE CASCADE; END IF; END $$; @@ -87,8 +113,8 @@ UPDATE ON entries FOR EACH ROW EXECUTE FUNCTION optimized_update_token_count_after (); -- Add trigger to update parent session's updated_at -CREATE OR REPLACE FUNCTION update_session_updated_at() -RETURNS TRIGGER AS $$ +CREATE +OR REPLACE FUNCTION update_session_updated_at () RETURNS TRIGGER AS $$ BEGIN UPDATE sessions SET updated_at = CURRENT_TIMESTAMP @@ -98,8 +124,9 @@ END; $$ LANGUAGE plpgsql; CREATE TRIGGER trg_update_session_updated_at -AFTER INSERT OR UPDATE ON entries -FOR EACH ROW -EXECUTE FUNCTION update_session_updated_at(); +AFTER INSERT +OR +UPDATE ON entries FOR EACH ROW +EXECUTE FUNCTION update_session_updated_at (); COMMIT; diff --git a/memory-store/migrations/000016_entry_relations.up.sql b/memory-store/migrations/000016_entry_relations.up.sql index bcdb7fb72..6e9af3f2a 100644 --- a/memory-store/migrations/000016_entry_relations.up.sql +++ b/memory-store/migrations/000016_entry_relations.up.sql @@ -22,7 +22,7 @@ BEGIN ALTER TABLE entry_relations ADD CONSTRAINT fk_entry_relations_session FOREIGN KEY (session_id) - REFERENCES sessions(session_id); + REFERENCES sessions(session_id) ON DELETE CASCADE; END IF; END $$; diff --git a/typespec/tasks/models.tsp b/typespec/tasks/models.tsp index c3b301bd2..ca6b72e00 100644 --- a/typespec/tasks/models.tsp +++ b/typespec/tasks/models.tsp @@ -50,9 +50,14 @@ model ToolRef { /** Object describing a Task */ model Task { - @visibility("read", "create") - name: string; + /** The name of the task. */ + @visibility("read", "create", "update") + name: displayName; + + /** The canonical name of the task. */ + canonical_name?: canonicalName; + /** The description of the task. */ description: string = ""; /** The entrypoint of the task. */ diff --git a/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml b/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml index d4835a695..768f27ea3 100644 --- a/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml +++ b/typespec/tsp-output/@typespec/openapi3/openapi-1.0.0.yaml @@ -4574,9 +4574,16 @@ components: - inherit_tools properties: name: - type: string + allOf: + - $ref: '#/components/schemas/Common.displayName' + description: The name of the task. + canonical_name: + allOf: + - $ref: '#/components/schemas/Common.canonicalName' + description: The canonical name of the task. description: type: string + description: The description of the task. default: '' main: type: array @@ -5190,8 +5197,17 @@ components: Tasks.PatchTaskRequest: type: object properties: + name: + allOf: + - $ref: '#/components/schemas/Common.displayName' + description: The name of the task. + canonical_name: + allOf: + - $ref: '#/components/schemas/Common.canonicalName' + description: The canonical name of the task. description: type: string + description: The description of the task. default: '' main: type: array @@ -5986,9 +6002,16 @@ components: - updated_at properties: name: - type: string + allOf: + - $ref: '#/components/schemas/Common.displayName' + description: The name of the task. + canonical_name: + allOf: + - $ref: '#/components/schemas/Common.canonicalName' + description: The canonical name of the task. description: type: string + description: The description of the task. default: '' main: type: array @@ -6333,14 +6356,24 @@ components: Tasks.UpdateTaskRequest: type: object required: + - name - description - main - input_schema - tools - inherit_tools properties: + name: + allOf: + - $ref: '#/components/schemas/Common.displayName' + description: The name of the task. + canonical_name: + allOf: + - $ref: '#/components/schemas/Common.canonicalName' + description: The canonical name of the task. description: type: string + description: The description of the task. default: '' main: type: array