From 460ccc26d7703750787286b3d6b4f1034d1da12a Mon Sep 17 00:00:00 2001 From: BespalovSergey <50576059+BespalovSergey@users.noreply.github.com> Date: Mon, 27 May 2024 12:50:51 +0300 Subject: [PATCH] Isolate caching module (#32) * add http cache unit tests * add logger to caching module * add app_id argument to _update_lunary_event method * add _can_update_lunary_event method in BaseHttpCache class * switching to the motleycache package * remove motleycrew caching package * update unit test cache files * remove unit test cache --------- Co-authored-by: User --- examples/Caching and observability.ipynb | 4 +- examples/_test_single_openai_tools_react.py | 2 +- motleycrew/caching/__init__.py | 9 - motleycrew/caching/caching.py | 64 ---- motleycrew/caching/http_cache.py | 397 -------------------- motleycrew/caching/utils.py | 58 --- poetry.lock | 25 +- pyproject.toml | 1 + tests/run_integration_tests.py | 2 +- tests/test_caching/__init__.py | 0 tests/test_caching/test_http_cache.py | 45 --- 11 files changed, 29 insertions(+), 578 deletions(-) delete mode 100644 motleycrew/caching/__init__.py delete mode 100644 motleycrew/caching/caching.py delete mode 100644 motleycrew/caching/http_cache.py delete mode 100644 motleycrew/caching/utils.py delete mode 100644 tests/test_caching/__init__.py delete mode 100644 tests/test_caching/test_http_cache.py diff --git a/examples/Caching and observability.ipynb b/examples/Caching and observability.ipynb index b20d2e20..73d89154 100644 --- a/examples/Caching and observability.ipynb +++ b/examples/Caching and observability.ipynb @@ -66,7 +66,7 @@ "metadata": {}, "outputs": [], "source": [ - "from motleycrew.caching import enable_cache\n", + "from motleycache import enable_cache\n", "enable_cache() # Caching is on!" ] }, @@ -225,7 +225,7 @@ "metadata": {}, "outputs": [], "source": [ - "from motleycrew.caching import set_cache_whitelist, set_cache_blacklist\n", + "from motleycache import set_cache_whitelist, set_cache_blacklist\n", "set_cache_whitelist([\"*//api.openai.com/*\"]) # Will only cache OpenAI API requests\n", "\n", "# Alternatively, you can specify a blacklist\n", diff --git a/examples/_test_single_openai_tools_react.py b/examples/_test_single_openai_tools_react.py index aa3d80aa..01837c96 100644 --- a/examples/_test_single_openai_tools_react.py +++ b/examples/_test_single_openai_tools_react.py @@ -6,7 +6,7 @@ from motleycrew.agents.langchain.react import ReactMotleyAgent from motleycrew.common.utils import configure_logging from motleycrew.tasks import SimpleTask -from motleycrew.caching import enable_cache +from motleycache import enable_cache def main(): diff --git a/motleycrew/caching/__init__.py b/motleycrew/caching/__init__.py deleted file mode 100644 index eb6d0656..00000000 --- a/motleycrew/caching/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from .caching import ( - enable_cache, - disable_cache, - set_cache_location, - set_strong_cache, - set_update_cache_if_exists, - set_cache_whitelist, - set_cache_blacklist, -) diff --git a/motleycrew/caching/caching.py b/motleycrew/caching/caching.py deleted file mode 100644 index 9f62d548..00000000 --- a/motleycrew/caching/caching.py +++ /dev/null @@ -1,64 +0,0 @@ -import os - -from motleycrew.caching.http_cache import ( - BaseHttpCache, - RequestsHttpCaching, - HttpxHttpCaching, - CurlCffiHttpCaching, -) - -is_caching = False -caching_http_library_list = [ - RequestsHttpCaching(), - HttpxHttpCaching(), - CurlCffiHttpCaching(), -] - - -def set_cache_whitelist(whitelist: list[str]): - """Set the cache whitelist""" - BaseHttpCache.cache_whitelist = whitelist - BaseHttpCache.cache_blacklist = [] - - -def set_cache_blacklist(blacklist: list[str]): - """Set the cache blacklist""" - BaseHttpCache.cache_blacklist = blacklist - BaseHttpCache.cache_whitelist = [] - - -def set_strong_cache(val: bool): - """Enable or disable the strict-caching option""" - BaseHttpCache.strong_cache = bool(val) - - -def set_update_cache_if_exists(val: bool): - """Enable or disable cache updates""" - BaseHttpCache.update_cache_if_exists = bool(val) - - -def set_cache_location(location: str) -> str: - """Set the caching root directory, return the absolute path of the directory""" - BaseHttpCache.root_cache_dir = location - return os.path.abspath(BaseHttpCache.root_cache_dir) - - -def enable_cache(): - """Enable global caching""" - global is_caching - for http_cache in caching_http_library_list: - http_cache.enable() - is_caching = True - - -def disable_cache(): - """Disable global caching""" - global is_caching - for http_cache in caching_http_library_list: - http_cache.disable() - is_caching = False - - -def check_is_caching(): - """Checking caching""" - return all([http_cache.is_caching for http_cache in caching_http_library_list]) diff --git a/motleycrew/caching/http_cache.py b/motleycrew/caching/http_cache.py deleted file mode 100644 index 92258764..00000000 --- a/motleycrew/caching/http_cache.py +++ /dev/null @@ -1,397 +0,0 @@ -import os -from pathlib import Path -from abc import ABC, abstractmethod -from typing import List, Callable, Any, Union -from urllib.parse import urlparse -import logging -import inspect -import fnmatch -import cloudpickle -import platformdirs -import traceback - -import requests -from requests.structures import CaseInsensitiveDict -from httpx import ( - Client as HTTPX__Client, - AsyncClient as HTTPX_AsyncClient, - Headers as HTTPX__Headers, -) -from curl_cffi.requests import AsyncSession as CurlCFFI__AsyncSession -from curl_cffi.requests import Headers as CurlCFFI__Headers - - -try: - from lunary import track_event, run_ctx, event_queue_ctx - - do_update_lunary_event = True -except ImportError: - track_event = None - run_ctx = None - event_queue_ctx = None - do_update_lunary_event = False - - -from motleycrew.common.enums import LunaryEventName, LunaryRunType -from .utils import recursive_hash, shorten_filename, FakeRLock - - -FORCED_CACHE_BLACKLIST = [ - "*//api.lunary.ai/*", -] - -CACHE_FILENAME_LENGTH_LIMIT = 64 - - -class CacheException(Exception): - """Exception for caching process""" - - -class StrongCacheException(BaseException): - """Exception use of cache only""" - - -def file_cache(http_cache: "BaseHttpCache", updating_parameters: dict = {}): - """Decorator to cache function output based on its inputs, ignoring specified parameters.""" - - def decorator(func): - def wrapper(*args, **kwargs): - kwargs.update(updating_parameters) - return http_cache.get_response(func, *args, **kwargs) - - return wrapper - - return decorator - - -def afile_cache(http_cache: "BaseHttpCache", updating_parameters: dict = {}): - """Async decorator to cache function output based on its inputs, ignoring specified parameters.""" - - def decorator(func): - async def wrapper(*args, **kwargs): - kwargs.update(updating_parameters) - return await http_cache.aget_response(func, *args, **kwargs) - - return wrapper - - return decorator - - -class BaseHttpCache(ABC): - """Basic abstract class for replacing http library methods""" - - ignore_params: List[str] = [] # ignore params names for create hash file name - library_name: str = "" - app_name = os.environ.get("APP_NAME") or "motleycrew" - root_cache_dir = platformdirs.user_cache_dir(app_name) - strong_cache: bool = False - update_cache_if_exists: bool = False - cache_blacklist: List[str] = [] - cache_whitelist: List[str] = [] - - def __init__(self, *args, **kwargs): - self.is_caching = False - - @abstractmethod - def get_url(self, *args, **kwargs) -> str: - """Finds the url in the arguments and returns it""" - - @abstractmethod - def _enable(self): - """Replacing the original function with a caching function""" - - @abstractmethod - def _disable(self): - """Replacing the caching function with the original one""" - - def enable(self): - """Enable caching""" - self._enable() - self.is_caching = True - - library_log = "for {} library.".format(self.library_name) if self.library_name else "." - logging.info("Enable caching {} class {}".format(self.__class__, library_log)) - - def disable(self): - """Disable caching""" - self._disable() - self.is_caching = False - - library_log = "for {} library.".format(self.library_name) if self.library_name else "." - logging.info("Disable caching {} class {}".format(self.__class__, library_log)) - - def prepare_response(self, response: Any) -> Any: - """Preparing the response object before saving""" - return response - - def should_cache(self, url: str) -> bool: - if self.match_url(url, FORCED_CACHE_BLACKLIST): - return False - - if self.cache_whitelist and self.cache_blacklist: - raise CacheException( - "You can't use both cache whitelist and blacklist at the same time." - ) - elif self.cache_whitelist: - return self.match_url(url, self.cache_whitelist) - elif self.cache_blacklist: - return not self.match_url(url, self.cache_blacklist) - return True - - def get_cache_file(self, func: Callable, *args, **kwargs) -> Union[tuple, None]: - url = self.get_url(*args, **kwargs) - url_parsed = urlparse(url) - - # Check valid url - if not self.should_cache(url): - logging.info("Ignore url to cache: {}".format(url)) - return None - - # check or create cache dirs - root_dir = Path(self.root_cache_dir) - - cache_dir = ( - root_dir - / shorten_filename(url_parsed.hostname, length=CACHE_FILENAME_LENGTH_LIMIT) - / shorten_filename( - url_parsed.path.strip("/").replace("/", "_"), length=CACHE_FILENAME_LENGTH_LIMIT - ) - ) - cache_dir.mkdir(parents=True, exist_ok=True) - - # Convert args to a dictionary based on the function's signature - args_names = func.__code__.co_varnames[: func.__code__.co_argcount] - args_dict = dict(zip(args_names, args)) - - # Remove ignored params - kwargs_clone = kwargs.copy() - for param in self.ignore_params: - args_dict.pop(param, None) - kwargs_clone.pop(param, None) - - # Create hash based on argument names, argument values, and function source code - hashing_base = [args_dict, kwargs_clone, inspect.getsource(func)] - call_hash = shorten_filename( - recursive_hash(hashing_base, ignore_params=self.ignore_params), - length=CACHE_FILENAME_LENGTH_LIMIT, - ) - - cache_file = cache_dir / "{}.pkl".format(call_hash) - return cache_file, url - - def get_response(self, func: Callable, *args, **kwargs) -> Any: - """Returns a response from the cache if it is found, or executes the request first""" - cache_data = self.get_cache_file(func, *args, **kwargs) - if cache_data is None: - return func(*args, **kwargs) - cache_file, url = cache_data - - # If cache exists, load and return it - result = self.load_cache_response(cache_file, url) - if result is not None: - if do_update_lunary_event: - self._update_lunary_event(run_ctx.get()) - return result - - # Otherwise, call the function and save its result to the cache - result = func(*args, **kwargs) - - self.write_to_cache(result, cache_file, url) - return result - - async def aget_response(self, func: Callable, *args, **kwargs) -> Any: - """Async returns a response from the cache if it is found, or executes the request first""" - cache_data = self.get_cache_file(func, *args, **kwargs) - if cache_data is None: - return await func(*args, **kwargs) - cache_file, url = cache_data - - # If cache exists, load and return it - result = self.load_cache_response(cache_file, url) - if result is not None: - if do_update_lunary_event: - self._update_lunary_event(run_ctx.get()) - return result - - # Otherwise, call the function and save its result to the cache - result = await func(*args, **kwargs) - - self.write_to_cache(result, cache_file, url) - return result - - @staticmethod - def _update_lunary_event( - run_id: str, run_type: str = LunaryRunType.LLM, is_cache: bool = True - ) -> None: - """Updating lunary event""" - - if not do_update_lunary_event: - return - - event_params = { - "run_type": run_type, - "event_name": LunaryEventName.UPDATE, - "run_id": run_id, - "callback_queue": event_queue_ctx.get(), - } - if is_cache: - event_params["metadata"] = {"cache": True} - - try: - track_event(**event_params) - except Exception as exc: - msg = "[Lunary] An error occurred while updating lunary event {}: {}\n{}".format( - run_id, exc, traceback.format_exc() - ) - logging.warning(msg) - raise exc - - def load_cache_response(self, cache_file: Path, url: str) -> Union[Any, None]: - """Loads and returns the cached response""" - if cache_file.exists() and not self.update_cache_if_exists: - return self.read_from_cache(cache_file, url) - elif self.strong_cache: - msg = "Cache file not found: {}\nthe strictly caching option is enabled.".format( - str(cache_file) - ) - raise StrongCacheException(msg) - - def read_from_cache(self, cache_file: Path, url: str = "") -> Union[Any, None]: - """Reads and returns a serialized object from a file""" - try: - with cache_file.open("rb") as f: - logging.info("Used cache for {} url from {}".format(url, cache_file)) - result = cloudpickle.load(f) - return result - except Exception as e: - logging.warning("Unpickling failed for {}".format(cache_file)) - if self.strong_cache: - msg = "Error reading cached file: {}\n{}".format(str(e), str(cache_file)) - raise StrongCacheException(msg) - return None - - def write_to_cache(self, response: Any, cache_file: Path, url: str = "") -> None: - """Writes the response object to a file""" - response = self.prepare_response(response) - try: - with cache_file.open("wb") as f: - cloudpickle.dump(response, f) - logging.info("Write cache for {} url to {}".format(url, cache_file)) - except Exception as e: - logging.warning("Pickling failed for {} url: {}".format(cache_file, e)) - - @staticmethod - def match_url(url: str, patterns: List[str]) -> bool: - """Checking the url for a match in the list of templates""" - return any([fnmatch.fnmatch(url, pat) for pat in patterns]) - - -class RequestsHttpCaching(BaseHttpCache): - """Requests library caching""" - - ignore_params = ["timestamp", "runId", "parentRunId"] - library_name = "requests" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.library_method = requests.api.request - - def get_url(self, *args, **kwargs) -> str: - """Finds the url in the arguments and returns it""" - return args[1] - - def prepare_response(self, response: Any) -> Any: - """Preparing the response object before saving""" - response.headers = CaseInsensitiveDict() - response.request.headers = CaseInsensitiveDict() - return response - - def _enable(self): - """Replacing the original function with a caching function""" - - @file_cache(self) - def request_func(*args, **kwargs): - return self.library_method(*args, **kwargs) - - requests.api.request = request_func - - def _disable(self): - """Replacing the caching function with the original one""" - requests.api.request = self.library_method - - -class HttpxHttpCaching(BaseHttpCache): - """Httpx library caching""" - - ignore_params = ["s", "headers", "stream", "extensions"] - library_name = "Httpx" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.library_method = HTTPX__Client.send - self.alibrary_method = HTTPX_AsyncClient.send - - def get_url(self, *args, **kwargs) -> str: - """Finds the url in the arguments and returns it""" - return str(args[1].url) - - def prepare_response(self, response: Any) -> Any: - """Preparing the response object before saving""" - response.headers = HTTPX__Headers() - response.request.headers = HTTPX__Headers() - return response - - def _enable(self): - """Replacing the original function with a caching function""" - - @file_cache(self, updating_parameters={"stream": False}) - def request_func(s, request, *args, **kwargs): - return self.library_method(s, request, **kwargs) - - @afile_cache(self, updating_parameters={"stream": False}) - async def arequest_func(s, request, *args, **kwargs): - return await self.alibrary_method(s, request, **kwargs) - - HTTPX__Client.send = request_func - HTTPX_AsyncClient.send = arequest_func - - def _disable(self): - """Replacing the caching function with the original one""" - HTTPX__Client.send = self.library_method - HTTPX_AsyncClient.send = self.alibrary_method - - -class CurlCffiHttpCaching(BaseHttpCache): - """Curl Cffi library caching""" - - ignore_params = ["s"] - library_name = "Curl cffi" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.library_method = CurlCFFI__AsyncSession.request - - def get_url(self, *args, **kwargs) -> str: - """Finds the url in the arguments and returns it""" - return args[2] - - def prepare_response(self, response: Any) -> Any: - """Preparing the response object before saving""" - response.headers = CurlCFFI__Headers() - response.request.headers = CurlCFFI__Headers() - response.curl = None - response.cookies.jar._cookies_lock = FakeRLock() - return response - - def _enable(self): - """Replacing the original function with a caching function""" - - @afile_cache(self) - async def request_func(s, method, url, *args, **kwargs): - return await self.library_method(s, method, url, *args, **kwargs) - - CurlCFFI__AsyncSession.request = request_func - - def _disable(self): - """Replacing the caching function with the original one""" - CurlCFFI__AsyncSession.request = self.library_method diff --git a/motleycrew/caching/utils.py b/motleycrew/caching/utils.py deleted file mode 100644 index d4ada9fa..00000000 --- a/motleycrew/caching/utils.py +++ /dev/null @@ -1,58 +0,0 @@ -import hashlib - - -MAX_DEPTH = 6 - - -class FakeRLock: - """A fake class to replace threading.RLock during serialization""" - - def acquire(self): - pass - - def release(self): - pass - - -def recursive_hash(value, depth=0, ignore_params=[]): - """Hash primitives recursively with maximum depth.""" - if depth > MAX_DEPTH: - return hashlib.sha256("max_depth_reached".encode()).hexdigest() - - if isinstance(value, (int, float, str, bool, bytes)): - return hashlib.sha256(str(value).encode()).hexdigest() - elif isinstance(value, (list, tuple)): - return hashlib.sha256( - "".join([recursive_hash(item, depth + 1, ignore_params) for item in value]).encode() - ).hexdigest() - elif isinstance(value, dict): - return hashlib.sha256( - "".join( - [ - recursive_hash(key, depth + 1, ignore_params) - + recursive_hash(val, depth + 1, ignore_params) - for key, val in value.items() - if key not in ignore_params - ] - ).encode() - ).hexdigest() - elif hasattr(value, "__dict__") and value.__class__.__name__ not in ignore_params: - return recursive_hash(value.__dict__, depth, ignore_params) - else: - return hashlib.sha256("unknown".encode()).hexdigest() - - -def shorten_filename(filename, length, hash_length=16): - """ - Shorten the filename to a fixed length, keeping it unique by collapsing partly into a hash. - Keeps the start and end of the filename for readability. - """ - assert length > hash_length + 2, "Length should be greater than hash length + 2" - if len(filename) > length: - hash_part = hashlib.sha256(filename.encode()).hexdigest()[:hash_length] - filename = "{}_{}_{}".format( - filename[: length // 2 - hash_length // 2 - 1], - hash_part, - filename[-length // 2 + hash_length // 2 + 1 :], - ) - return filename diff --git a/poetry.lock b/poetry.lock index 88daea06..822879d0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2834,6 +2834,29 @@ files = [ {file = "more_itertools-10.2.0-py3-none-any.whl", hash = "sha256:686b06abe565edfab151cb8fd385a05651e1fdf8f0a14191e4439283421f8684"}, ] +[[package]] +name = "motleycache" +version = "0.0.1" +description = "Package for caching http requests" +optional = false +python-versions = "<4.0,>=3.10" +files = [ + {file = "motleycache-0.0.1-py3-none-any.whl", hash = "sha256:7950f66641cb7d8fbe057b5c5fea1c9e3bfc7b2b12037000e56c1f6e32310043"}, + {file = "motleycache-0.0.1.tar.gz", hash = "sha256:d62f7fdfe29f337ff336dad34f02374b7f5e03aa24021b4a56d28e82dc28420e"}, +] + +[package.dependencies] +cloudpickle = ">=3.0.0,<4.0.0" +curl-cffi = ">=0.6.4,<0.7.0" +httpx = ">=0.27.0,<0.28.0" +platformdirs = ">=4.2.1,<5.0.0" +pytest = ">=8.0.2,<9.0.0" +pytest-cov = ">=4.1.0,<5.0.0" +requests = ">=2.31.0,<3.0.0" + +[package.extras] +lunary = ["lunary (>=1.0.21,<2.0.0)"] + [[package]] name = "multidict" version = "6.0.5" @@ -5770,4 +5793,4 @@ llama-index = ["llama-index"] [metadata] lock-version = "2.0" python-versions = ">=3.10,<=3.13" -content-hash = "a7426b0bfd64bc3c27bcacdccec54fbc47c3d616d4509a813b3b7231480477c4" +content-hash = "d90bb9ace1e33cfdf32a2e6cfb2ef3beeb1a640c09c169dbc74441dfce5c46ec" diff --git a/pyproject.toml b/pyproject.toml index c97d4207..09aa5c3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ requests = "^2.31.0" curl-cffi = "^0.6.4" httpx = "^0.27.0" autogen = {version = "^1.0.16", optional = true} +motleycache = "^0.0.1" [tool.poetry.group.dev.dependencies] black = "^24.2.0" diff --git a/tests/run_integration_tests.py b/tests/run_integration_tests.py index 593faddb..ca9ddb37 100644 --- a/tests/run_integration_tests.py +++ b/tests/run_integration_tests.py @@ -18,7 +18,7 @@ from motleycrew.common.exceptions import IntegrationTestException from motleycrew.common.utils import configure_logging -from motleycrew.caching import ( +from motleycache import ( enable_cache, set_cache_location, set_strong_cache, diff --git a/tests/test_caching/__init__.py b/tests/test_caching/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/test_caching/test_http_cache.py b/tests/test_caching/test_http_cache.py deleted file mode 100644 index 3653b3da..00000000 --- a/tests/test_caching/test_http_cache.py +++ /dev/null @@ -1,45 +0,0 @@ -import pytest - -from motleycrew.caching.http_cache import RequestsHttpCaching, CacheException -from motleycrew.caching import set_cache_whitelist, set_cache_blacklist - - -@pytest.fixture -def requests_cache(): - return RequestsHttpCaching() - - -@pytest.mark.parametrize( - "url, expected_result", - [ - ("https://api.lunary.ai/v1/runs/ingest", False), - ("https://api.openai.com/v1/chat/completions", True), - ("https://duckduckgo.com/", False), - ("https://links.duckduckgo.com/d.j", False), - ], -) -def test_cache_whitelist(requests_cache, url, expected_result): - set_cache_whitelist(["*//api.openai.com/v1/*"]) - assert requests_cache.should_cache(url) == expected_result - - -@pytest.mark.parametrize( - "url, expected_result", - [ - ("https://api.lunary.ai/v1/runs/ingest", False), - ("https://api.openai.com/v1/chat/completions", True), - ("https://duckduckgo.com/", True), - ("https://links.duckduckgo.com/d.j", False), - ], -) -def test_cache_blacklist(requests_cache, url, expected_result): - set_cache_blacklist(["*//api.lunary.ai/v1/*", "*//links.duckduckgo.com/*"]) - assert requests_cache.should_cache(url) == expected_result - - -def test_exception_if_both_lists_set(requests_cache): - requests_cache.cache_whitelist = ["*//links.duckduckgo.com/*"] - requests_cache.cache_blacklist = ["*//api.lunary.ai/v1/*"] - url = "https://api.openai.com/v1/chat/completions" - with pytest.raises(CacheException): - requests_cache.should_cache(url)