diff --git a/docs/components/ai_application.md b/docs/components/ai_application.md index 2a5a6691d..f34b10751 100644 --- a/docs/components/ai_application.md +++ b/docs/components/ai_application.md @@ -10,6 +10,7 @@ You can think of state as a JSON object that the `AIApplication` will update as ```python from marvin.beta.applications import AIApplication +# define any functions that you want the AI to be able to call def read_gcal() -> list[dict]: return [ { diff --git a/src/marvin/kv/__init__.py b/src/marvin/kv/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/marvin/kv/base.py b/src/marvin/kv/base.py deleted file mode 100644 index fa8d10ebb..000000000 --- a/src/marvin/kv/base.py +++ /dev/null @@ -1,48 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Generic, List, Mapping, Optional, TypeVar - -from pydantic import BaseModel -from typing_extensions import ParamSpec - -K = TypeVar("K") # Key type -V = TypeVar("V") # Value type -R = TypeVar("R") # Return type for write/delete operations -P = ParamSpec("P") # Additional parameters - - -class StorageInterface(BaseModel, Generic[K, V, R], ABC): - """An abstract key-value store interface. - - Example: - ```python - store = SomeStorageInterface() - - store.write("foo", "bar") - store.write("baz", "qux") - assert store.read("foo") == "bar" - assert store.read_all() == {"foo": "bar", "baz": "qux"} - assert store.list_keys() == ["foo", "baz"] - store.delete("foo") - assert store.read("foo") is None - assert store.read_all() == {"baz": "qux"} - """ - - @abstractmethod - def write(self, key: K, value: V, *args: P.args, **kwargs: P.kwargs) -> Optional[R]: - pass - - @abstractmethod - def read(self, key: K, *args: P.args, **kwargs: P.kwargs) -> Optional[V]: - pass - - @abstractmethod - def read_all(self, *args: P.args, **kwargs: P.kwargs) -> Mapping[K, V]: - pass - - @abstractmethod - def delete(self, key: K, *args: P.args, **kwargs: P.kwargs) -> Optional[R]: - pass - - @abstractmethod - def list_keys(self, *args: P.args, **kwargs: P.kwargs) -> List[K]: - pass diff --git a/src/marvin/kv/disk.py b/src/marvin/kv/disk.py deleted file mode 100644 index 2a4af3021..000000000 --- a/src/marvin/kv/disk.py +++ /dev/null @@ -1,84 +0,0 @@ -import json -import os -import pickle -from pathlib import Path -from typing import Optional, TypeVar, Union - -from pydantic import Field, field_validator -from typing_extensions import Literal - -from marvin.kv.base import StorageInterface - -K = TypeVar("K", bound=str) -V = TypeVar("V") - - -class DiskKV(StorageInterface[K, V, str]): - """ - A key-value store that stores values on disk. - - Example: - ```python - from marvin.kv.disk_based import DiskBasedKV - store = DiskBasedKV(storage_path="/path/to/storage") - store.write("key", "value") - assert store.read("key") == "value" - ``` - """ - - storage_path: Path = Field(...) - serializer: Literal["json", "pickle"] = Field("json") - - @field_validator("storage_path") - def _validate_storage_path(cls, v: Union[str, Path]) -> Path: - expanded_path = Path(v).expanduser().resolve() - if not expanded_path.exists(): - expanded_path.mkdir(parents=True, exist_ok=True) - return expanded_path - - def _get_file_path(self, key: K) -> Path: - file_extension = ".json" if self.serializer == "json" else ".pkl" - return self.storage_path / f"{key}{file_extension}" - - def _serialize(self, value: V) -> bytes: - if self.serializer == "json": - return json.dumps(value).encode() - else: - return pickle.dumps(value) - - def _deserialize(self, data: bytes) -> V: - if self.serializer == "json": - return json.loads(data) - else: - return pickle.loads(data) - - def write(self, key: K, value: V) -> str: - file_path = self._get_file_path(key) - serialized_value = self._serialize(value) - with open(file_path, "wb") as file: - file.write(serialized_value) - return f"Stored {key}= {value}" - - def delete(self, key: K) -> str: - file_path = self._get_file_path(key) - try: - os.remove(file_path) - return f"Deleted {key}" - except FileNotFoundError: - return f"Key {key} not found" - - def read(self, key: K) -> Optional[V]: - file_path = self._get_file_path(key) - try: - with open(file_path, "rb") as file: - serialized_value = file.read() - return self._deserialize(serialized_value) - except FileNotFoundError: - return None - - def read_all(self, limit: Optional[int] = None) -> dict[K, V]: - files = os.listdir(self.storage_path)[:limit] - return {file.split(".")[0]: self.read(file.split(".")[0]) for file in files} - - def list_keys(self) -> list[K]: - return [file.split(".")[0] for file in os.listdir(self.storage_path)] diff --git a/src/marvin/kv/in_memory.py b/src/marvin/kv/in_memory.py deleted file mode 100644 index 1619b7da3..000000000 --- a/src/marvin/kv/in_memory.py +++ /dev/null @@ -1,42 +0,0 @@ -from typing import Optional, TypeVar - -from pydantic import Field - -from marvin.kv.base import StorageInterface - -K = TypeVar("K", bound=str) -V = TypeVar("V") - - -class InMemoryKV(StorageInterface[K, V, str]): - """An in-memory key-value store. - - Example: - ```python - from marvin.kv.in_memory import InMemoryKV - store = InMemoryKV() - store.write("key", "value") - assert store.read("key") == "value" - ``` - """ - - store: dict[K, V] = Field(default_factory=dict) - - def write(self, key: K, value: V) -> str: - self.store[key] = value - return f"Stored {key}= {value}" - - def delete(self, key: K) -> str: - v = self.store.pop(key, None) - return f"Deleted {key}= {v}" - - def read(self, key: K) -> Optional[V]: - return self.store.get(key) - - def read_all(self, limit: Optional[int] = None) -> dict[K, V]: - if limit is None: - return self.store - return dict(list(self.store.items())[:limit]) - - def list_keys(self) -> list[K]: - return list(self.store.keys()) diff --git a/src/marvin/kv/json_block.py b/src/marvin/kv/json_block.py deleted file mode 100644 index d7b424b0c..000000000 --- a/src/marvin/kv/json_block.py +++ /dev/null @@ -1,69 +0,0 @@ -from typing import Mapping, Optional, TypeVar - -try: - from prefect.blocks.system import JSON - from prefect.exceptions import ObjectNotFound -except ImportError: - raise ModuleNotFoundError( - "The `prefect` package is required to use the JSONBlockKV class." - " You can install it with `pip install prefect` or `pip install marvin[prefect]`." - ) -from pydantic import Field, PrivateAttr, model_validator - -from marvin.kv.base import StorageInterface -from marvin.utilities.asyncio import run_sync, run_sync_if_awaitable - -K = TypeVar("K", bound=str) -V = TypeVar("V") - - -async def load_json_block(block_name: str) -> JSON: - try: - return await JSON.load(name=block_name) - except Exception as exc: - if "Unable to find block document" in str(exc): - json_block = JSON(value={}) - await json_block.save(name=block_name) - return json_block - raise ObjectNotFound(f"Unable to load JSON block {block_name}") from exc - - -class JSONBlockKV(StorageInterface): - block_name: str = Field(default="marvin-kv") - _state: dict[K, Mapping] = PrivateAttr(default_factory=dict) - - @model_validator(mode="after") - def load_state(self) -> "JSONBlockKV": - json_block = run_sync(load_json_block(self.block_name)) - self._state = json_block.value or {} - return self - - def write(self, key: K, value: V) -> str: - self._state[key] = value - json_block = run_sync(load_json_block(self.block_name)) - json_block.value = self._state - run_sync_if_awaitable(json_block.save(name=self.block_name, overwrite=True)) - return f"Stored {key}= {value}" - - def delete(self, key: K) -> str: - if key in self._state: - self._state.pop(key, None) - json_block = run_sync(load_json_block(self.block_name)) - if key in json_block.value: - json_block.value = self._state - run_sync_if_awaitable(json_block.save(name=self.block_name, overwrite=True)) - return f"Deleted {key}" - - def read(self, key: K) -> Optional[V]: - json_block = run_sync(load_json_block(self.block_name)) - return json_block.value.get(key) - - def read_all(self, limit: Optional[int] = None) -> dict[K, V]: - json_block = run_sync(load_json_block(self.block_name)) - - limited_items = dict(list(json_block.value.items())[:limit]) - return limited_items - - def list_keys(self) -> list[K]: - json_block = run_sync(load_json_block(self.block_name)) - return list(json_block.value.keys())