Skip to content
This repository has been archived by the owner on Nov 3, 2024. It is now read-only.

[HN-111/HN-116] refactor: agent data storage #12

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 49 additions & 64 deletions hive_agent_client/client.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import httpx
import logging

from typing import AsyncGenerator, Dict
from typing import Dict

from hive_agent_client.chat import send_chat_message
from hive_agent_client.entry import (
create_entry,
stream_entry,
get_entries,
get_entry_by_id,
update_entry,
delete_entry
from hive_agent_client.database import (
create_table,
insert_data,
read_data,
update_data,
delete_data
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class HiveAgentClient:
"""
Client for interacting with a Hive Agent's API.
Expand Down Expand Up @@ -44,95 +44,80 @@ async def chat(self, content: str) -> str:
logger.error(f"Failed to send chat message - {content}: {e}")
raise Exception(f"Failed to send chat message: {e}")

async def create_entry(self, namespace: str, data: dict) -> Dict:
"""
Create a new entry in the specified namespace.
:param namespace: The namespace in which to create the entry.
:param data: The data to be stored in the entry.
:return: A dictionary representing the created entry.
"""
try:
return await create_entry(self.http_client, self.base_url, namespace, data)
except Exception as e:
logger.error(f"Failed to create entry {data} in {namespace}: {e}")
raise Exception(f"Failed to create entry: {e}")

async def stream_entry_data(self, namespace: str, data_stream: AsyncGenerator) -> AsyncGenerator:
async def create_table(self, table_name: str, columns: dict) -> Dict:
"""
Stream data to the entry endpoint.
Create a new table in the database.
:param namespace: The namespace in which to stream the data.
:param data_stream: The async generator providing the data to stream.
:yield: Messages received in response to the streamed data.
:param table_name: The name of the table to create.
:param columns: The columns of the table to create.
:return: A dictionary with a message about the table creation.
"""
try:
async for message in stream_entry(self.http_client, self.base_url, namespace, data_stream):
yield message
return await create_table(self.http_client, self.base_url, table_name, columns)
except Exception as e:
logger.error(f"Failed to stream entry data from {namespace}: {e}")
raise Exception(f"Failed to stream entry data: {e}")
logger.error(f"Failed to create table {table_name} with columns {columns}: {e}")
raise Exception(f"Failed to create table: {e}")

async def get_entries(self, namespace: str) -> Dict:
async def insert_data(self, table_name: str, data: dict) -> Dict:
"""
Retrieve all entries from the specified namespace.
Insert data into a table.
:param namespace: The namespace from which to retrieve entries.
:return: A dictionary containing the retrieved entries.
:param table_name: The name of the table to insert data into.
:param data: The data to insert.
:return: A dictionary with a message and the id of the inserted data.
"""
try:
return await get_entries(self.http_client, self.base_url, namespace)
return await insert_data(self.http_client, self.base_url, table_name, data)
except Exception as e:
logger.error(f"Failed to get entries in {namespace}: {e}")
raise Exception(f"Failed to get entries: {e}")
logger.error(f"Failed to insert data {data} into table {table_name}: {e}")
raise Exception(f"Failed to insert data: {e}")

async def get_entry_by_id(self, namespace: str, entry_id: str) -> Dict:
async def read_data(self, table_name: str, filters: dict = None) -> Dict:
"""
Retrieve a specific entry by its ID.
Retrieve data from a table.
:param namespace: The namespace of the entry.
:param entry_id: The ID of the entry to retrieve.
:return: A dictionary representing the retrieved entry.
:param table_name: The name of the table to read data from.
:param filters: Optional filters to apply when reading data.
:return: A list of dictionaries representing the read data.
"""
try:
return await get_entry_by_id(self.http_client, self.base_url, namespace, entry_id)
return await read_data(self.http_client, self.base_url, table_name, filters)
except Exception as e:
logger.error(f"Failed to get entry {entry_id} from {namespace}: {e}")
raise Exception(f"Failed to get entry by ID: {e}")
logger.error(f"Failed to read data from table {table_name} with filters {filters}: {e}")
raise Exception(f"Failed to read data: {e}")

async def update_entry(self, namespace: str, entry_id: str, data: dict) -> Dict:
async def update_data(self, table_name: str, row_id: int, new_data: dict) -> Dict:
"""
Update a specific entry by its ID.
Update data in a table.
:param namespace: The namespace of the entry.
:param entry_id: The ID of the entry to update.
:param data: The data to update the entry with.
:return: A dictionary representing the updated entry.
:param table_name: The name of the table to update data in.
:param row_id: The ID of the row to update.
:param new_data: The new data to update in the table.
:return: A dictionary with a message about the update.
"""
try:
return await update_entry(self.http_client, self.base_url, namespace, entry_id, data)
return await update_data(self.http_client, self.base_url, table_name, row_id, new_data)
except Exception as e:
logger.error(f"Failed to update entry {entry_id} from {namespace} with {data}: {e}")
raise Exception(f"Failed to update entry: {e}")
logger.error(f"Failed to update data in table {table_name} with id {row_id} and data {new_data}: {e}")
raise Exception(f"Failed to update data: {e}")

async def delete_entry(self, namespace: str, entry_id: str) -> Dict:
async def delete_data(self, table_name: str, row_id: int) -> Dict:
"""
Delete a specific entry by its ID.
Delete data from a table.
:param namespace: The namespace of the entry.
:param entry_id: The ID of the entry to delete.
:return: A dictionary containing the status of the deletion.
:param table_name: The name of the table to delete data from.
:param row_id: The ID of the row to delete.
:return: A dictionary with a message about the deletion.
"""
try:
return await delete_entry(self.http_client, self.base_url, namespace, entry_id)
return await delete_data(self.http_client, self.base_url, table_name, row_id)
except Exception as e:
logger.error(f"Failed to delete entry {entry_id} from {namespace}: {e}")
raise Exception(f"Failed to delete entry: {e}")
logger.error(f"Failed to delete data from table {table_name} with id {row_id}: {e}")
raise Exception(f"Failed to delete data: {e}")

async def close(self):
"""
Close the HTTP client session.
"""
logger.debug("Closing HTTP client session...")
await self.http_client.aclose()

7 changes: 7 additions & 0 deletions hive_agent_client/database/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .database import (
create_table,
insert_data,
read_data,
update_data,
delete_data
)
154 changes: 154 additions & 0 deletions hive_agent_client/database/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import json

import httpx
import logging
import os
import sys

from typing import Optional


def get_log_level():
HIVE_AGENT_LOG_LEVEL = os.getenv('HIVE_AGENT_LOG_LEVEL', 'INFO').upper()
return getattr(logging, HIVE_AGENT_LOG_LEVEL, logging.INFO)


logging.basicConfig(stream=sys.stdout, level=get_log_level())
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

logger = logging.getLogger()
logger.setLevel(get_log_level())


async def create_table(http_client: httpx.AsyncClient, base_url: str, table_name: str, columns: dict) -> dict:
"""
Creates a table in the database.

:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table to be created.
:param columns: The columns of the table to be created.
:return: A dictionary with a message about the table creation.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/create-table"
url = f"{base_url}{endpoint}"
data = {"table_name": table_name, "columns": columns}

try:
logger.debug(f"Creating table {table_name} at {url}")
response = await http_client.post(url, json=data)
response.raise_for_status()
logger.debug(f"Response for creating table {table_name} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to create table {table_name}: {e}")
raise Exception(f"Failed to create table {table_name}: {e.response.text}") from e


async def insert_data(http_client: httpx.AsyncClient, base_url: str, table_name: str, data: dict) -> dict:
"""
Inserts data into a table in the database.

:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table where the data will be inserted.
:param data: The data to be inserted.
:return: A dictionary with a message and the id of the inserted data.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/insert-data"
url = f"{base_url}{endpoint}"
payload = {"table_name": table_name, "data": data}

try:
logger.debug(f"Inserting data into {table_name} at {url}")
response = await http_client.post(url, json=payload)
response.raise_for_status()
logger.debug(f"Response for inserting data into {table_name} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to insert data into {table_name}: {e}")
raise Exception(f"Failed to insert data into {table_name}: {e.response.text}") from e


async def read_data(http_client: httpx.AsyncClient, base_url: str, table_name: str,
filters: Optional[dict] = None) -> list:
"""
Reads data from a table in the database.

:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table from which to read data.
:param filters: Optional filters to apply when reading data.
:return: A list of dictionaries representing the read data.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/read-data"
url = f"{base_url}{endpoint}"
payload = {"table_name": table_name, "filters": filters}

try:
logger.debug(f"Reading data from {table_name} at {url} with filters: {filters}")
response = await http_client.post(url, json=payload)
response.raise_for_status()
logger.debug(f"Response for reading data from {table_name} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to read data from {table_name}: {e}")
raise Exception(f"Failed to read data from {table_name}: {e.response.text}") from e


async def update_data(http_client: httpx.AsyncClient, base_url: str, table_name: str, row_id: int,
new_data: dict) -> dict:
"""
Updates data in a table in the database.

:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table where the data will be updated.
:param row_id: The ID of the row to be updated.
:param new_data: The new data to update in the table.
:return: A dictionary with a message about the update.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/update-data"
url = f"{base_url}{endpoint}"
payload = {"table_name": table_name, "id": row_id, "data": new_data}

try:
logger.debug(f"Updating data in {table_name} with id {row_id} at {url} with new data: {new_data}")
response = await http_client.put(url, json=payload)
response.raise_for_status()
logger.debug(f"Response for updating data in {table_name} with id {row_id} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to update data in {table_name} with id {row_id}: {e}")
raise Exception(f"Failed to update data in {table_name} with id {row_id}: {e.response.text}") from e


async def delete_data(http_client: httpx.AsyncClient, base_url: str, table_name: str, row_id: int) -> dict:
"""
Deletes data from a table in the database.

:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table where the data will be deleted.
:param row_id: The ID of the row to be deleted.
:return: A dictionary with a message about the deletion.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/delete-data"
url = f"{base_url}{endpoint}"
payload = {"table_name": table_name, "id": row_id}

try:
logger.debug(f"Deleting data from {table_name} with id {row_id} at {url}")
response = await http_client.request("DELETE", url, content=json.dumps(payload))
response.raise_for_status()
logger.debug(f"Response for deleting data from {table_name} with id {row_id} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to delete data from {table_name} with id {row_id}: {e}")
raise Exception(f"Failed to delete data from {table_name} with id {row_id}: {e.response.text}") from e

8 changes: 0 additions & 8 deletions hive_agent_client/entry/__init__.py

This file was deleted.

Loading
Loading