Skip to content

Commit

Permalink
Merge branch 'f/switch-to-pg' into f/simplify-blob-store
Browse files Browse the repository at this point in the history
  • Loading branch information
creatorrr authored Dec 21, 2024
2 parents ca5f4e2 + 0e32cbe commit 76819e1
Show file tree
Hide file tree
Showing 22 changed files with 1,239 additions and 426 deletions.
38 changes: 29 additions & 9 deletions agents-api/agents_api/queries/agents/delete_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,39 @@

# Define the raw SQL query
agent_query = parse_one("""
WITH deleted_docs AS (
WITH deleted_file_owners AS (
DELETE FROM file_owners
WHERE developer_id = $1
AND owner_type = 'agent'
AND owner_id = $2
),
deleted_doc_owners AS (
DELETE FROM doc_owners
WHERE developer_id = $1
AND owner_type = 'agent'
AND owner_id = $2
),
deleted_files AS (
DELETE FROM files
WHERE developer_id = $1
AND file_id IN (
SELECT file_id FROM file_owners
WHERE developer_id = $1
AND owner_type = 'agent'
AND owner_id = $2
)
),
deleted_docs AS (
DELETE FROM docs
WHERE developer_id = $1
AND doc_id IN (
SELECT ad.doc_id
FROM agent_docs ad
WHERE ad.agent_id = $2
AND ad.developer_id = $1
SELECT doc_id FROM doc_owners
WHERE developer_id = $1
AND owner_type = 'agent'
AND owner_id = $2
)
), deleted_agent_docs AS (
DELETE FROM agent_docs
WHERE agent_id = $2 AND developer_id = $1
), deleted_tools AS (
),
deleted_tools AS (
DELETE FROM tools
WHERE agent_id = $2 AND developer_id = $1
)
Expand Down
106 changes: 58 additions & 48 deletions agents-api/agents_api/queries/entries/create_entries.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from typing import Literal
from uuid import UUID

import asyncpg
from beartype import beartype
from fastapi import HTTPException
from litellm.utils import _select_tokenizer as select_tokenizer
from uuid_extensions import uuid7

from ...autogen.openapi_model import CreateEntryRequest, Entry, Relation
from ...common.utils.datetime import utcnow
from ...common.utils.messages import content_to_json
from ...metrics.counters import increase_counter
from ..utils import pg_query, wrap_in_class
from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class

# Query for checking if the session exists
session_exists_query = """
Expand All @@ -22,7 +25,7 @@
entry_query = """
INSERT INTO entries (
session_id,
entry_id,
entry_id,
source,
role,
event_type,
Expand All @@ -32,9 +35,10 @@
tool_calls,
model,
token_count,
tokenizer,
created_at,
timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
RETURNING *;
"""

Expand All @@ -50,34 +54,34 @@
"""


# @rewrap_exceptions(
# {
# asyncpg.ForeignKeyViolationError: partialclass(
# HTTPException,
# status_code=404,
# detail="Session not found",
# ),
# asyncpg.UniqueViolationError: partialclass(
# HTTPException,
# status_code=409,
# detail="Entry already exists",
# ),
# asyncpg.NotNullViolationError: partialclass(
# HTTPException,
# status_code=400,
# detail="Not null violation",
# ),
# asyncpg.NoDataFoundError: partialclass(
# HTTPException,
# status_code=404,
# detail="Session not found",
# ),
# }
# )
@rewrap_exceptions(
{
asyncpg.ForeignKeyViolationError: partialclass(
HTTPException,
status_code=404,
detail="Session not found",
),
asyncpg.UniqueViolationError: partialclass(
HTTPException,
status_code=409,
detail="Entry already exists",
),
asyncpg.NotNullViolationError: partialclass(
HTTPException,
status_code=400,
detail="Not null violation",
),
asyncpg.NoDataFoundError: partialclass(
HTTPException,
status_code=404,
detail="Session not found",
),
}
)
@wrap_in_class(
Entry,
transform=lambda d: {
"id": UUID(d.pop("entry_id")),
"id": d.pop("entry_id"),
**d,
},
)
Expand All @@ -89,7 +93,7 @@ async def create_entries(
developer_id: UUID,
session_id: UUID,
data: list[CreateEntryRequest],
) -> list[tuple[str, list, Literal["fetch", "fetchmany"]]]:
) -> list[tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]]:
# Convert the data to a list of dictionaries
data_dicts = [item.model_dump(mode="json") for item in data]

Expand All @@ -100,7 +104,7 @@ async def create_entries(
params.append(
[
session_id, # $1
item.pop("id", None) or str(uuid7()), # $2
item.pop("id", None) or uuid7(), # $2
item.get("source"), # $3
item.get("role"), # $4
item.get("event_type") or "message.create", # $5
Expand All @@ -110,16 +114,17 @@ async def create_entries(
content_to_json(item.get("tool_calls") or {}), # $9
item.get("model"), # $10
item.get("token_count"), # $11
item.get("created_at") or utcnow(), # $12
utcnow(), # $13
select_tokenizer(item.get("model"))["type"], # $12
item.get("created_at") or utcnow(), # $13
utcnow().timestamp(), # $14
]
)

return [
(
session_exists_query,
[session_id, developer_id],
"fetch",
"fetchrow",
),
(
entry_query,
Expand All @@ -129,20 +134,25 @@ async def create_entries(
]


# @rewrap_exceptions(
# {
# asyncpg.ForeignKeyViolationError: partialclass(
# HTTPException,
# status_code=404,
# detail="Session not found",
# ),
# asyncpg.UniqueViolationError: partialclass(
# HTTPException,
# status_code=409,
# detail="Entry already exists",
# ),
# }
# )
@rewrap_exceptions(
{
asyncpg.ForeignKeyViolationError: partialclass(
HTTPException,
status_code=404,
detail="Session not found",
),
asyncpg.UniqueViolationError: partialclass(
HTTPException,
status_code=409,
detail="Entry already exists",
),
asyncpg.NoDataFoundError: partialclass(
HTTPException,
status_code=404,
detail="Session not found",
),
}
)
@wrap_in_class(Relation)
@increase_counter("add_entry_relations")
@pg_query
Expand All @@ -152,7 +162,7 @@ async def add_entry_relations(
developer_id: UUID,
session_id: UUID,
data: list[Relation],
) -> list[tuple[str, list, Literal["fetch", "fetchmany"]]]:
) -> list[tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]]:
# Convert the data to a list of dictionaries
data_dicts = [item.model_dump(mode="json") for item in data]

Expand Down
90 changes: 55 additions & 35 deletions agents-api/agents_api/queries/entries/delete_entries.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from typing import Literal
from uuid import UUID

import asyncpg
from beartype import beartype
from fastapi import HTTPException
from sqlglot import parse_one

from ...autogen.openapi_model import ResourceDeletedResponse
from ...common.utils.datetime import utcnow
from ...metrics.counters import increase_counter
from ..utils import pg_query, wrap_in_class
from ..utils import partialclass, pg_query, rewrap_exceptions, wrap_in_class

# Define the raw SQL query for deleting entries with a developer check
delete_entry_query = parse_one("""
Expand Down Expand Up @@ -55,20 +57,25 @@
"""


# @rewrap_exceptions(
# {
# asyncpg.ForeignKeyViolationError: partialclass(
# HTTPException,
# status_code=404,
# detail="The specified session or developer does not exist.",
# ),
# asyncpg.UniqueViolationError: partialclass(
# HTTPException,
# status_code=409,
# detail="The specified session has already been deleted.",
# ),
# }
# )
@rewrap_exceptions(
{
asyncpg.ForeignKeyViolationError: partialclass(
HTTPException,
status_code=404,
detail="The specified session or developer does not exist.",
),
asyncpg.UniqueViolationError: partialclass(
HTTPException,
status_code=409,
detail="The specified session has already been deleted.",
),
asyncpg.NoDataFoundError: partialclass(
HTTPException,
status_code=404,
detail="Session not found",
),
}
)
@wrap_in_class(
ResourceDeletedResponse,
one=True,
Expand All @@ -85,29 +92,34 @@ async def delete_entries_for_session(
*,
developer_id: UUID,
session_id: UUID,
) -> list[tuple[str, list, Literal["fetch", "fetchmany"]]]:
) -> list[tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]]:
"""Delete all entries for a given session."""
return [
(session_exists_query, [session_id, developer_id], "fetch"),
(session_exists_query, [session_id, developer_id], "fetchrow"),
(delete_entry_relations_query, [session_id], "fetchmany"),
(delete_entry_query, [session_id, developer_id], "fetchmany"),
]


# @rewrap_exceptions(
# {
# asyncpg.ForeignKeyViolationError: partialclass(
# HTTPException,
# status_code=404,
# detail="The specified entries, session, or developer does not exist.",
# ),
# asyncpg.UniqueViolationError: partialclass(
# HTTPException,
# status_code=409,
# detail="One or more specified entries have already been deleted.",
# ),
# }
# )
@rewrap_exceptions(
{
asyncpg.ForeignKeyViolationError: partialclass(
HTTPException,
status_code=404,
detail="The specified entries, session, or developer does not exist.",
),
asyncpg.UniqueViolationError: partialclass(
HTTPException,
status_code=409,
detail="One or more specified entries have already been deleted.",
),
asyncpg.NoDataFoundError: partialclass(
HTTPException,
status_code=404,
detail="Session not found",
),
}
)
@wrap_in_class(
ResourceDeletedResponse,
transform=lambda d: {
Expand All @@ -121,10 +133,18 @@ async def delete_entries_for_session(
@beartype
async def delete_entries(
*, developer_id: UUID, session_id: UUID, entry_ids: list[UUID]
) -> list[tuple[str, list, Literal["fetch", "fetchmany"]]]:
) -> list[tuple[str, list, Literal["fetch", "fetchmany", "fetchrow"]]]:
"""Delete specific entries by their IDs."""
return [
(session_exists_query, [session_id, developer_id], "fetch"),
(delete_entry_relations_by_ids_query, [session_id, entry_ids], "fetchmany"),
(delete_entry_by_ids_query, [entry_ids, developer_id, session_id], "fetchmany"),
(
session_exists_query,
[session_id, developer_id],
"fetchrow",
),
(delete_entry_relations_by_ids_query, [session_id, entry_ids], "fetch"),
(
delete_entry_by_ids_query,
[entry_ids, developer_id, session_id],
"fetch",
),
]
Loading

0 comments on commit 76819e1

Please sign in to comment.