Skip to content
This repository has been archived by the owner on Nov 3, 2024. It is now read-only.

Commit

Permalink
[HN-111/HN-116] refactor: agent data storage (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
ToJen authored May 27, 2024
1 parent 8a54881 commit 590081e
Show file tree
Hide file tree
Showing 10 changed files with 528 additions and 516 deletions.
113 changes: 49 additions & 64 deletions hive_agent_client/client.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import httpx
import logging

from typing import AsyncGenerator, Dict
from typing import Dict

from hive_agent_client.chat import send_chat_message
from hive_agent_client.entry import (
create_entry,
stream_entry,
get_entries,
get_entry_by_id,
update_entry,
delete_entry
from hive_agent_client.database import (
create_table,
insert_data,
read_data,
update_data,
delete_data
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class HiveAgentClient:
"""
Client for interacting with a Hive Agent's API.
Expand Down Expand Up @@ -44,95 +44,80 @@ async def chat(self, content: str) -> str:
logger.error(f"Failed to send chat message - {content}: {e}")
raise Exception(f"Failed to send chat message: {e}")

async def create_entry(self, namespace: str, data: dict) -> Dict:
"""
Create a new entry in the specified namespace.
:param namespace: The namespace in which to create the entry.
:param data: The data to be stored in the entry.
:return: A dictionary representing the created entry.
"""
try:
return await create_entry(self.http_client, self.base_url, namespace, data)
except Exception as e:
logger.error(f"Failed to create entry {data} in {namespace}: {e}")
raise Exception(f"Failed to create entry: {e}")

async def stream_entry_data(self, namespace: str, data_stream: AsyncGenerator) -> AsyncGenerator:
async def create_table(self, table_name: str, columns: dict) -> Dict:
"""
Stream data to the entry endpoint.
Create a new table in the database.
:param namespace: The namespace in which to stream the data.
:param data_stream: The async generator providing the data to stream.
:yield: Messages received in response to the streamed data.
:param table_name: The name of the table to create.
:param columns: The columns of the table to create.
:return: A dictionary with a message about the table creation.
"""
try:
async for message in stream_entry(self.http_client, self.base_url, namespace, data_stream):
yield message
return await create_table(self.http_client, self.base_url, table_name, columns)
except Exception as e:
logger.error(f"Failed to stream entry data from {namespace}: {e}")
raise Exception(f"Failed to stream entry data: {e}")
logger.error(f"Failed to create table {table_name} with columns {columns}: {e}")
raise Exception(f"Failed to create table: {e}")

async def get_entries(self, namespace: str) -> Dict:
async def insert_data(self, table_name: str, data: dict) -> Dict:
"""
Retrieve all entries from the specified namespace.
Insert data into a table.
:param namespace: The namespace from which to retrieve entries.
:return: A dictionary containing the retrieved entries.
:param table_name: The name of the table to insert data into.
:param data: The data to insert.
:return: A dictionary with a message and the id of the inserted data.
"""
try:
return await get_entries(self.http_client, self.base_url, namespace)
return await insert_data(self.http_client, self.base_url, table_name, data)
except Exception as e:
logger.error(f"Failed to get entries in {namespace}: {e}")
raise Exception(f"Failed to get entries: {e}")
logger.error(f"Failed to insert data {data} into table {table_name}: {e}")
raise Exception(f"Failed to insert data: {e}")

async def get_entry_by_id(self, namespace: str, entry_id: str) -> Dict:
async def read_data(self, table_name: str, filters: dict = None) -> Dict:
"""
Retrieve a specific entry by its ID.
Retrieve data from a table.
:param namespace: The namespace of the entry.
:param entry_id: The ID of the entry to retrieve.
:return: A dictionary representing the retrieved entry.
:param table_name: The name of the table to read data from.
:param filters: Optional filters to apply when reading data.
:return: A list of dictionaries representing the read data.
"""
try:
return await get_entry_by_id(self.http_client, self.base_url, namespace, entry_id)
return await read_data(self.http_client, self.base_url, table_name, filters)
except Exception as e:
logger.error(f"Failed to get entry {entry_id} from {namespace}: {e}")
raise Exception(f"Failed to get entry by ID: {e}")
logger.error(f"Failed to read data from table {table_name} with filters {filters}: {e}")
raise Exception(f"Failed to read data: {e}")

async def update_entry(self, namespace: str, entry_id: str, data: dict) -> Dict:
async def update_data(self, table_name: str, row_id: int, new_data: dict) -> Dict:
"""
Update a specific entry by its ID.
Update data in a table.
:param namespace: The namespace of the entry.
:param entry_id: The ID of the entry to update.
:param data: The data to update the entry with.
:return: A dictionary representing the updated entry.
:param table_name: The name of the table to update data in.
:param row_id: The ID of the row to update.
:param new_data: The new data to update in the table.
:return: A dictionary with a message about the update.
"""
try:
return await update_entry(self.http_client, self.base_url, namespace, entry_id, data)
return await update_data(self.http_client, self.base_url, table_name, row_id, new_data)
except Exception as e:
logger.error(f"Failed to update entry {entry_id} from {namespace} with {data}: {e}")
raise Exception(f"Failed to update entry: {e}")
logger.error(f"Failed to update data in table {table_name} with id {row_id} and data {new_data}: {e}")
raise Exception(f"Failed to update data: {e}")

async def delete_entry(self, namespace: str, entry_id: str) -> Dict:
async def delete_data(self, table_name: str, row_id: int) -> Dict:
"""
Delete a specific entry by its ID.
Delete data from a table.
:param namespace: The namespace of the entry.
:param entry_id: The ID of the entry to delete.
:return: A dictionary containing the status of the deletion.
:param table_name: The name of the table to delete data from.
:param row_id: The ID of the row to delete.
:return: A dictionary with a message about the deletion.
"""
try:
return await delete_entry(self.http_client, self.base_url, namespace, entry_id)
return await delete_data(self.http_client, self.base_url, table_name, row_id)
except Exception as e:
logger.error(f"Failed to delete entry {entry_id} from {namespace}: {e}")
raise Exception(f"Failed to delete entry: {e}")
logger.error(f"Failed to delete data from table {table_name} with id {row_id}: {e}")
raise Exception(f"Failed to delete data: {e}")

async def close(self):
"""
Close the HTTP client session.
"""
logger.debug("Closing HTTP client session...")
await self.http_client.aclose()

7 changes: 7 additions & 0 deletions hive_agent_client/database/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .database import (
create_table,
insert_data,
read_data,
update_data,
delete_data
)
154 changes: 154 additions & 0 deletions hive_agent_client/database/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import json

import httpx
import logging
import os
import sys

from typing import Optional


def get_log_level():
HIVE_AGENT_LOG_LEVEL = os.getenv('HIVE_AGENT_LOG_LEVEL', 'INFO').upper()
return getattr(logging, HIVE_AGENT_LOG_LEVEL, logging.INFO)


logging.basicConfig(stream=sys.stdout, level=get_log_level())
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

logger = logging.getLogger()
logger.setLevel(get_log_level())


async def create_table(http_client: httpx.AsyncClient, base_url: str, table_name: str, columns: dict) -> dict:
"""
Creates a table in the database.
:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table to be created.
:param columns: The columns of the table to be created.
:return: A dictionary with a message about the table creation.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/create-table"
url = f"{base_url}{endpoint}"
data = {"table_name": table_name, "columns": columns}

try:
logger.debug(f"Creating table {table_name} at {url}")
response = await http_client.post(url, json=data)
response.raise_for_status()
logger.debug(f"Response for creating table {table_name} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to create table {table_name}: {e}")
raise Exception(f"Failed to create table {table_name}: {e.response.text}") from e


async def insert_data(http_client: httpx.AsyncClient, base_url: str, table_name: str, data: dict) -> dict:
"""
Inserts data into a table in the database.
:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table where the data will be inserted.
:param data: The data to be inserted.
:return: A dictionary with a message and the id of the inserted data.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/insert-data"
url = f"{base_url}{endpoint}"
payload = {"table_name": table_name, "data": data}

try:
logger.debug(f"Inserting data into {table_name} at {url}")
response = await http_client.post(url, json=payload)
response.raise_for_status()
logger.debug(f"Response for inserting data into {table_name} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to insert data into {table_name}: {e}")
raise Exception(f"Failed to insert data into {table_name}: {e.response.text}") from e


async def read_data(http_client: httpx.AsyncClient, base_url: str, table_name: str,
filters: Optional[dict] = None) -> list:
"""
Reads data from a table in the database.
:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table from which to read data.
:param filters: Optional filters to apply when reading data.
:return: A list of dictionaries representing the read data.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/read-data"
url = f"{base_url}{endpoint}"
payload = {"table_name": table_name, "filters": filters}

try:
logger.debug(f"Reading data from {table_name} at {url} with filters: {filters}")
response = await http_client.post(url, json=payload)
response.raise_for_status()
logger.debug(f"Response for reading data from {table_name} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to read data from {table_name}: {e}")
raise Exception(f"Failed to read data from {table_name}: {e.response.text}") from e


async def update_data(http_client: httpx.AsyncClient, base_url: str, table_name: str, row_id: int,
new_data: dict) -> dict:
"""
Updates data in a table in the database.
:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table where the data will be updated.
:param row_id: The ID of the row to be updated.
:param new_data: The new data to update in the table.
:return: A dictionary with a message about the update.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/update-data"
url = f"{base_url}{endpoint}"
payload = {"table_name": table_name, "id": row_id, "data": new_data}

try:
logger.debug(f"Updating data in {table_name} with id {row_id} at {url} with new data: {new_data}")
response = await http_client.put(url, json=payload)
response.raise_for_status()
logger.debug(f"Response for updating data in {table_name} with id {row_id} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to update data in {table_name} with id {row_id}: {e}")
raise Exception(f"Failed to update data in {table_name} with id {row_id}: {e.response.text}") from e


async def delete_data(http_client: httpx.AsyncClient, base_url: str, table_name: str, row_id: int) -> dict:
"""
Deletes data from a table in the database.
:param http_client: An HTTP client for sending requests.
:param base_url: The base URL of the Hive Agent API.
:param table_name: The name of the table where the data will be deleted.
:param row_id: The ID of the row to be deleted.
:return: A dictionary with a message about the deletion.
:raises Exception: If the HTTP request fails or the API returns an error response.
"""
endpoint = "/database/delete-data"
url = f"{base_url}{endpoint}"
payload = {"table_name": table_name, "id": row_id}

try:
logger.debug(f"Deleting data from {table_name} with id {row_id} at {url}")
response = await http_client.request("DELETE", url, content=json.dumps(payload))
response.raise_for_status()
logger.debug(f"Response for deleting data from {table_name} with id {row_id} at {url}: {response.json()}")
return response.json()
except httpx.HTTPStatusError as e:
logging.error(f"Failed to delete data from {table_name} with id {row_id}: {e}")
raise Exception(f"Failed to delete data from {table_name} with id {row_id}: {e.response.text}") from e

8 changes: 0 additions & 8 deletions hive_agent_client/entry/__init__.py

This file was deleted.

Loading

0 comments on commit 590081e

Please sign in to comment.