diff --git a/setup.py b/setup.py index c4961b960..fcd21ddab 100644 --- a/setup.py +++ b/setup.py @@ -36,5 +36,7 @@ "httpx>=0.23.0", "oauthlib", "redis", + "typing", + "typing_extensions", ], ) diff --git a/shared/helpers/cache.py b/shared/helpers/cache.py index ab2f7b06b..b8e73c73d 100644 --- a/shared/helpers/cache.py +++ b/shared/helpers/cache.py @@ -4,7 +4,7 @@ import logging import pickle from functools import wraps -from typing import Any, Callable, Hashable +from typing import Any, Callable, Hashable, List from redis import Redis, RedisError @@ -115,6 +115,23 @@ def set(self, key: str, ttl: int, value: Any): log.warning("Unable to set cache on redis", exc_info=True) +class LogMapping(dict): + """This let cached functions to specify what arguments they want to log. + In some cases we want to log cache hits for debugging purposes, + but some of the arguments might be dangerous to log (e.g. tokens) + """ + + @property + def args_indexes_to_log(self) -> List[int]: + """List of args from the function to be logged (if present)""" + return self.get("args_indexes_to_log", []) + + @property + def kwargs_keys_to_log(self) -> List[Any]: + """List of args from the function to be logged (if present)""" + return self.get("kwargs_keys_to_log", []) + + class OurOwnCache(object): """ This is codecov distributed cache's implementation. @@ -162,7 +179,9 @@ def configure(self, backend: BaseBackend, app: str = "shared"): def get_backend(self) -> BaseBackend: return self._backend - def cache_function(self, ttl: int = DEFAULT_TTL) -> "FunctionCacher": + def cache_function( + self, ttl: int = DEFAULT_TTL, log_hits: bool = False, log_map: LogMapping = None + ) -> "FunctionCacher": """Creates a FunctionCacher with all the needed configuration to cache a function Args: @@ -171,19 +190,39 @@ def cache_function(self, ttl: int = DEFAULT_TTL) -> "FunctionCacher": Returns: FunctionCacher: A FunctionCacher that can decorate any callable """ - return FunctionCacher(self, ttl) + return FunctionCacher( + self, ttl, log_hits, LogMapping(log_map if log_map is not None else {}) + ) class FunctionCacher(object): - def __init__(self, cache_instance: OurOwnCache, ttl: int): + def __init__( + self, cache_instance: OurOwnCache, ttl: int, log_hits: bool, log_map: LogMapping + ): self.cache_instance = cache_instance self.ttl = ttl + self.log_hits = log_hits + self.log_map = log_map def __call__(self, func) -> Callable: if asyncio.iscoroutinefunction(func): return self.cache_async_function(func) return self.cache_synchronous_function(func) + def _log_hits(self, func, args, kwargs, key) -> None: + args_to_log = [] + for idx in self.log_map.args_indexes_to_log: + if idx < len(args): + args_to_log.append(args[idx]) + kwargs_to_log = {} + for lkey in self.log_map.kwargs_keys_to_log: + if lkey in kwargs: + kwargs_to_log[lkey] = kwargs[lkey] + log.info( + "Returning cache hit", + extra=dict(func=func, args=args_to_log, kwargs=kwargs_to_log, key=key), + ) + def cache_synchronous_function(self, func: Callable) -> Callable: @wraps(func) def wrapped(*args, **kwargs): @@ -191,6 +230,8 @@ def wrapped(*args, **kwargs): value = self.cache_instance.get_backend().get(key) if value is not NO_VALUE: metrics.incr(f"{self.cache_instance._app}.caches.{func.__name__}.hits") + if self.log_hits: + self._log_hits(func, args, kwargs, key) return value metrics.incr(f"{self.cache_instance._app}.caches.{func.__name__}.misses") with metrics.timer( @@ -225,6 +266,3 @@ async def wrapped(*args, **kwargs): return result return wrapped - - -cache = OurOwnCache() diff --git a/shared/torngit/base.py b/shared/torngit/base.py index 2849793b3..563af07eb 100644 --- a/shared/torngit/base.py +++ b/shared/torngit/base.py @@ -4,6 +4,7 @@ import httpx +from shared.torngit.cache import torngit_cache from shared.torngit.enums import Endpoints from shared.typings.oauth_token_types import OauthConsumerToken, OnRefreshCallback @@ -90,8 +91,10 @@ def __init__( self._oauth = oauth_consumer_token self.data = {"owner": {}, "repo": {}} self.verify_ssl = verify_ssl - self.data.update(kwargs) + # This has the side effect of initializing the torngit_cache + # (if not yet initialized) + torngit_cache.initialize() def __repr__(self): return "<%s slug=%s ownerid=%s repoid=%s>" % ( diff --git a/shared/torngit/cache/__init__.py b/shared/torngit/cache/__init__.py new file mode 100644 index 000000000..ad1318701 --- /dev/null +++ b/shared/torngit/cache/__init__.py @@ -0,0 +1,67 @@ +from typing import Union + +from redis import Redis +from typing_extensions import Literal + +from shared.config import get_config +from shared.helpers.cache import OurOwnCache, RedisBackend + + +def get_redis_url() -> str: + url = get_config("services", "redis_url") + if url is not None: + return url + hostname = "redis" + port = 6379 + return f"redis://{hostname}:{port}" + + +CachedEndpoint = Union[Literal["check"], Literal["compare"], Literal["status"]] + + +class TorngitCache(OurOwnCache): + def __init__(self): + super().__init__() + self.ttls = {} + self._initialized = False + self._enabled = False + + def initialize(self) -> None: + """Initializes and configures the TorngitCache according to config.""" + if self.is_initialized: + return + use_cache = get_config("services", "vcs_cache", "enabled", default=False) + if use_cache: + redis = Redis.from_url(get_redis_url()) + backend = RedisBackend(redis_connection=redis) + app = get_config("services", "vcs_cache", "metrics_app", default=None) + self.configure(backend=backend, app=app) + self._enabled = True + ttls = { + "check": get_config("services", "vcs_cache", "check_duration", default=120), + "compare": get_config( + "services", "vcs_cache", "compare_duration", default=120 + ), + "status": get_config( + "services", "vcs_cache", "status_duration", default=120 + ), + } + self.ttls = ttls + self._initialized = True + + @property + def is_initialized(self) -> bool: + return self._initialized + + @property + def is_enabled(self) -> bool: + return self._enabled + + def get_ttl(self, endpoint: CachedEndpoint) -> dict: + return self.ttls.get(endpoint, 120) + + +# This instance is shared among all the Torngit instances created on the same process. +# It doesn't matter because the cache is distributed and configuration can't change +# between these instances, so it's ok to share the same cache. +torngit_cache = TorngitCache() diff --git a/shared/torngit/github.py b/shared/torngit/github.py index ce40168fe..b5f33efa7 100644 --- a/shared/torngit/github.py +++ b/shared/torngit/github.py @@ -10,6 +10,7 @@ from shared.metrics import metrics from shared.torngit.base import TokenType, TorngitBaseAdapter +from shared.torngit.cache import torngit_cache from shared.torngit.enums import Endpoints from shared.torngit.exceptions import ( TorngitClientError, @@ -23,7 +24,7 @@ TorngitUnauthorizedError, ) from shared.torngit.status import Status -from shared.utils.urls import url_concat, url_escape +from shared.utils.urls import url_concat log = logging.getLogger(__name__) @@ -663,6 +664,11 @@ async def set_commit_status( ) return res + @torngit_cache.cache_function( + torngit_cache.get_ttl("status"), + log_hits=True, + log_map={"args_indexes_to_log": [0], "kwargs_keys_to_log": ["commit"]}, + ) async def get_commit_statuses(self, commit, token=None): token = self.get_token_by_type_if_none(token, TokenType.status) page = 0 @@ -694,7 +700,6 @@ async def get_commit_statuses(self, commit, token=None): ) if len(provided_statuses) < 100: break - return Status(statuses) # Source @@ -743,6 +748,11 @@ async def get_commit_diff(self, commit, context=None, token=None): raise return self.diff_to_json(res) + @torngit_cache.cache_function( + torngit_cache.get_ttl("compare"), + log_hits=True, + log_map={"args_indexes_to_log": [0, 1], "kwargs_keys_to_log": ["base", "head"]}, + ) async def get_compare( self, base, head, context=None, with_commits=True, token=None ): @@ -1088,6 +1098,14 @@ async def create_check_run( ) return res["id"] + @torngit_cache.cache_function( + torngit_cache.get_ttl("check"), + log_hits=True, + log_map={ + "args_indexes_to_log": [0, 1, 2], + "kwargs_keys_to_log": ["check_suite_id", "head_sha", "name"], + }, + ) async def get_check_runs( self, check_suite_id=None, head_sha=None, name=None, token=None ): diff --git a/shared/validation/install.py b/shared/validation/install.py index 94404f974..0e46dee94 100644 --- a/shared/validation/install.py +++ b/shared/validation/install.py @@ -272,6 +272,16 @@ def check_task_config_key(field, value, error): }, }, }, + "vsc_cache": { + "type": "dict", + "schema": { + "enabled": {"type": "boolean"}, + "metrics_app": {"type": "string"}, + "check_duration": {"type": "integer"}, + "compare_duration": {"type": "integer"}, + "status_duration": {"type": "integer"}, + }, + }, }, }, "site": {"type": "dict", "schema": user_yaml_schema}, diff --git a/tests/unit/helpers/test_cache.py b/tests/unit/helpers/test_cache.py index 34c8277bd..c72f92cce 100644 --- a/tests/unit/helpers/test_cache.py +++ b/tests/unit/helpers/test_cache.py @@ -6,10 +6,12 @@ from shared.helpers.cache import ( NO_VALUE, BaseBackend, + LogMapping, OurOwnCache, RedisBackend, - make_hash_sha256, ) +from shared.helpers.cache import log as cache_log +from shared.helpers.cache import make_hash_sha256 class RandomCounter(object): @@ -20,6 +22,9 @@ def call_function(self): self.value += 1 return self.value + def call_function_args(self, base, head, something=None, danger=True): + return base + head + async def async_call_function(self): self.value += 2 self.value *= 4 @@ -123,6 +128,54 @@ def test_simple_caching_fake_backend_no_params(self, mocker): assert cached_function() == 1 assert cached_function() == 1 + def test_simple_caching_fake_backend_with_params(self, mocker): + mock_log = mocker.patch.object(cache_log, "info") + log_map = LogMapping( + { + "args_indexes_to_log": [0, 1], + "kwargs_keys_to_log": ["base", "head", "something"], + } + ) + cache = OurOwnCache() + cache.configure(FakeBackend()) + assert cache._app == "shared" + sample_function = RandomCounter().call_function_args + cached_function = cache.cache_function(log_hits=True, log_map=log_map)( + sample_function + ) + assert cached_function("base", "head", something="batata") == "basehead" + mock_log.assert_not_called() # not a hit, not logger + assert cached_function("base", "head", something="else") == "basehead" + mock_log.assert_not_called() # not a hit, not logger + assert cached_function("base", "head", something="else") == "basehead" + assert mock_log.call_count == 1 + args, kwargs = mock_log.call_args + assert args == ("Returning cache hit",) + assert "extra" in kwargs + extra_logged = kwargs["extra"] + extra_args = extra_logged["args"] + extra_kwargs = extra_logged["kwargs"] + assert extra_args == ["base", "head"] + assert extra_kwargs == { + "something": "else" + } # Notice that danger is not part of it + # Changing the way we call the function + assert cached_function("base", head="head", something="else") == "basehead" + assert mock_log.call_count == 1 # This is not cached because the args changed + assert cached_function("base", head="head", something="else") == "basehead" + assert mock_log.call_count == 2 + args, kwargs = mock_log.call_args + assert args == ("Returning cache hit",) + assert "extra" in kwargs + extra_logged = kwargs["extra"] + extra_args = extra_logged["args"] + extra_kwargs = extra_logged["kwargs"] + assert extra_args == ["base"] + assert extra_kwargs == { + "head": "head", + "something": "else", + } # Notice that danger is not part of it + @pytest.mark.asyncio async def test_simple_caching_fake_backend_async_no_params(self, mocker): cache = OurOwnCache() diff --git a/tests/unit/torngit/test_cache.py b/tests/unit/torngit/test_cache.py new file mode 100644 index 000000000..52e77b9f6 --- /dev/null +++ b/tests/unit/torngit/test_cache.py @@ -0,0 +1,67 @@ +from shared.helpers.cache import NullBackend, RedisBackend +from shared.torngit.cache import get_redis_url, torngit_cache + + +def test_get_redis_default(): + assert get_redis_url() == "redis://redis:6379" + + +def test_get_redis_from_url(mock_configuration): + mock_configuration.set_params( + {"services": {"redis_url": "https://my-redis-instance:6378"}} + ) + assert get_redis_url() == "https://my-redis-instance:6378" + + +class TestTorngitCacheConfig(object): + def test_initialize_active(self, mock_configuration, mocker): + # Manually return torngit cache to default state + # Because other tests might have messed with it + torngit_cache._enabled = False + torngit_cache._initialized = False + torngit_cache.configure(NullBackend()) + # Now we initialize + mock_configuration.set_params( + { + "services": { + "vcs_cache": { + "enabled": True, + "check_duration": 100, + "compare_duration": 80, + "status_duration": 60, + "metrics_app": "worker", + } + } + } + ) + torngit_cache.initialize() + assert isinstance(torngit_cache._backend, RedisBackend) + assert torngit_cache._app == "worker" + assert torngit_cache.is_initialized == True + assert torngit_cache.get_ttl("check") == 100 + assert torngit_cache.get_ttl("compare") == 80 + assert torngit_cache.get_ttl("status") == 60 + assert torngit_cache.is_enabled == True + # Now we test that by calling init_torngit_cache again + # It will do nothing since the cache is initialized + mock_get_redis = mocker.patch("shared.torngit.cache.get_redis_url") + torngit_cache.initialize() + assert torngit_cache.is_initialized == True + assert torngit_cache.is_enabled == True + mock_get_redis.assert_not_called() + + def test_initialize_not_active(self, mock_configuration): + # Manually return torngit cache to default state + # Because other tests might have messed with it + torngit_cache._enabled = False + torngit_cache._initialized = False + torngit_cache.configure(NullBackend()) + # Now we initialize it + mock_configuration.set_params({"services": {"vcs_cache": {"enabled": False}}}) + torngit_cache.initialize() + assert torngit_cache.is_initialized == True + assert torngit_cache.is_enabled == False + assert isinstance(torngit_cache._backend, NullBackend) + assert torngit_cache.get_ttl("check") == 120 + assert torngit_cache.get_ttl("compare") == 120 + assert torngit_cache.get_ttl("status") == 120 diff --git a/tests/unit/validation/test_install_validation.py b/tests/unit/validation/test_install_validation.py index f0f9be415..e484acba1 100644 --- a/tests/unit/validation/test_install_validation.py +++ b/tests/unit/validation/test_install_validation.py @@ -94,6 +94,13 @@ def test_validate_sample_production_config(mocker): }, "database_url": "pokemon01_database_url", "redis_url": "kaploft-memorystore-url", + "vsc_cache": { + "enabled": True, + "metrics_app": "shared", + "check_duration": 100, + "compare_duration": 110, + "status_duration": 90, + }, }, "site": { "codecov": {"require_ci_to_pass": True}, @@ -218,6 +225,13 @@ def test_validate_sample_production_config(mocker): }, "database_url": "pokemon01_database_url", "redis_url": "kaploft-memorystore-url", + "vsc_cache": { + "enabled": True, + "metrics_app": "shared", + "check_duration": 100, + "compare_duration": 110, + "status_duration": 90, + }, }, "site": { "codecov": {"require_ci_to_pass": True},