Skip to content

Commit

Permalink
Move worker cache to shared (#380)
Browse files Browse the repository at this point in the history
We want to cache some of our GitHub requests to reduce usage of GH API for very large customers.
While doing that we noticed that we do in fact have a very nice cache implemented already, in [worker](https://github.com/codecov/worker/blob/48ff3f11d78b35b806d5d24a96780e22210f66e8/helpers/cache.py).
We think that this cache would be useful in this particular use case and possibly in the api as well,
so we are moving it to shared, making it easier to be reused.
  • Loading branch information
giovanni-guidini authored Jun 6, 2023
1 parent 21bc2a9 commit 20f2646
Show file tree
Hide file tree
Showing 3 changed files with 382 additions and 0 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@
"analytics-python==1.3.0b1",
"httpx>=0.23.0",
"oauthlib",
"redis",
],
)
230 changes: 230 additions & 0 deletions shared/helpers/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import asyncio
import base64
import hashlib
import logging
import pickle
from functools import wraps
from typing import Any, Callable, Hashable

from redis import Redis, RedisError

from shared.metrics import metrics

log = logging.getLogger(__name__)

NO_VALUE = object()

DEFAULT_TTL = 120


def make_hash_sha256(o: Any) -> str:
"""Provides a machine-independent, consistent hash value for any object
Args:
o (Any): Any object we want
Returns:
str: a sha256-based hash that is always the same for the same object
"""
hasher = hashlib.sha256()
hasher.update(repr(make_hashable(o)).encode())
return base64.b64encode(hasher.digest()).decode()


def make_hashable(o: Any) -> Hashable:
"""
Converts any object into an object that will have a consistent hash
"""
if isinstance(o, (tuple, list)):
return tuple((make_hashable(e) for e in o))
if isinstance(o, dict):
return tuple(sorted((k, make_hashable(v)) for k, v in o.items()))
if isinstance(o, (set, frozenset)):
return tuple(sorted(make_hashable(e) for e in o))
return o


class BaseBackend(object):
"""
This is the interface a class needs to honor in order to work as a backend.
The only two needed functions are `get` and `set`, which will fetch information from the
cache and send information to it, respectively.
However the cache wants to work internally, it's their choice. They only need to be able to
`set` and `get` without raising any exceptions
"""

def get(self, key: str) -> Any:
"""Returns a cached value from the cache, or NO_VALUE, if no cache is set for that key
Args:
key (str): The key that represents the objecr
Returns:
Any: The object that is possibly cached, or NO_VALUE, if no cache was there
"""
raise NotImplementedError()

def set(self, key: str, ttl: int, value: Any):
raise NotImplementedError()


class NullBackend(BaseBackend):
"""
This is the default implementation of BaseBackend that is used.
It essentially `gets` as if nothing is cached, and does not cache anything when requested
to.
This makes the cache virtually transparent. It acts as if no cache was there
"""

def get(self, key: str) -> Any:
return NO_VALUE

def set(self, key: str, ttl: int, value: Any):
pass


class RedisBackend(BaseBackend):

current_protocol = pickle.DEFAULT_PROTOCOL

def __init__(self, redis_connection: Redis):
self.redis_connection = redis_connection

def get(self, key: str) -> Any:
try:
serialized_value = self.redis_connection.get(key)
except RedisError:
log.warning("Unable to fetch from cache on redis", exc_info=True)
return NO_VALUE
if serialized_value is None:
return NO_VALUE
try:
return pickle.loads(serialized_value)
except ValueError:
return NO_VALUE

def set(self, key: str, ttl: int, value: Any):
serialized_value = pickle.dumps(value, self.current_protocol)
try:
self.redis_connection.setex(key, ttl, serialized_value)
except RedisError:
log.warning("Unable to set cache on redis", exc_info=True)


class OurOwnCache(object):
"""
This is codecov distributed cache's implementation.
The tldr to use it is, given a function f:
```
from helpers.cache import cache
@cache.cache_function()
def f(...):
...
```
Now to explain its internal workings.
This is a configurable-at-runtime cache. Its whole idea is based on the fact that it does
not need information at import-time. This allows us to use it transparently and still
not have to change tests, for example, due to it. All tests occur as if the cache was
not there.
All that is needed to configure the backend is to do
```
cache.configure(any_backend)
```
which we currently do at `worker_process_init` time with a RedisBackend instance. Other
instances can be plugged in easily, once needed. A backend is any implementation
of `BaseBackend`, which is described at their docstrings.
When `cache.cache_function()` is called, a `FunctionCacher` is returned. They do the heavy
lifting of actually decorating the function properly, dealign with sync-async context.
"""

def __init__(self):
self._backend = NullBackend()
self._app = "not_configured"

def configure(self, backend: BaseBackend, app: str = "shared"):
self._backend = backend
self._app = app

def get_backend(self) -> BaseBackend:
return self._backend

def cache_function(self, ttl: int = DEFAULT_TTL) -> "FunctionCacher":
"""Creates a FunctionCacher with all the needed configuration to cache a function
Args:
ttl (int, optional): The time-to-live of the cache
Returns:
FunctionCacher: A FunctionCacher that can decorate any callable
"""
return FunctionCacher(self, ttl)


class FunctionCacher(object):
def __init__(self, cache_instance: OurOwnCache, ttl: int):
self.cache_instance = cache_instance
self.ttl = ttl

def __call__(self, func) -> Callable:
if asyncio.iscoroutinefunction(func):
return self.cache_async_function(func)
return self.cache_synchronous_function(func)

def cache_synchronous_function(self, func: Callable) -> Callable:
@wraps(func)
def wrapped(*args, **kwargs):
key = self.generate_key(func, args, kwargs)
value = self.cache_instance.get_backend().get(key)
if value is not NO_VALUE:
metrics.incr(f"{self.cache_instance._app}.caches.{func.__name__}.hits")
return value
metrics.incr(f"{self.cache_instance._app}.caches.{func.__name__}.misses")
with metrics.timer(
f"{self.cache_instance._app}.caches.{func.__name__}.runtime"
):
result = func(*args, **kwargs)
self.cache_instance.get_backend().set(key, self.ttl, result)
return result

return wrapped

def generate_key(self, func, args, kwargs) -> str:
func_name = make_hash_sha256(func.__name__)
tupled_args = make_hash_sha256(args)
frozen_kwargs = make_hash_sha256(kwargs)
return ":".join(["cache", func_name, tupled_args, frozen_kwargs])

def cache_async_function(self, func: Callable) -> Callable:
@wraps(func)
async def wrapped(*args, **kwargs):
key = self.generate_key(func, args, kwargs)
value = self.cache_instance.get_backend().get(key)
if value is not NO_VALUE:
metrics.incr(f"{self.cache_instance._app}.caches.{func.__name__}.hits")
return value
metrics.incr(f"{self.cache_instance._app}.caches.{func.__name__}.misses")
with metrics.timer(
f"{self.cache_instance._app}.caches.{func.__name__}.runtime"
):
result = await func(*args, **kwargs)
self.cache_instance.get_backend().set(key, self.ttl, result)
return result

return wrapped


cache = OurOwnCache()
151 changes: 151 additions & 0 deletions tests/unit/helpers/test_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import pickle

import pytest
from redis.exceptions import TimeoutError

from shared.helpers.cache import (
NO_VALUE,
BaseBackend,
OurOwnCache,
RedisBackend,
make_hash_sha256,
)


class RandomCounter(object):
def __init__(self):
self.value = 0

def call_function(self):
self.value += 1
return self.value

async def async_call_function(self):
self.value += 2
self.value *= 4
return self.value


class FakeBackend(BaseBackend):
def __init__(self):
self.all_keys = {}

def get(self, key):
possible_values = self.all_keys.get(key, {})
for ttl, val in possible_values.items():
return val
return NO_VALUE

def set(self, key, ttl, value):
if key not in self.all_keys:
self.all_keys[key] = {}
self.all_keys[key][ttl] = value


class FakeRedis(object):
def __init__(self):
self.all_keys = {}

def get(self, key):
return self.all_keys.get(key)

def setex(self, key, expire, value):
self.all_keys[key] = value


class FakeRedisWithIssues(object):
def get(self, key):
raise TimeoutError()

def setex(self, key, expire, value):
raise TimeoutError()


class TestRedisBackend(object):
def test_simple_redis_call(self):
redis_backend = RedisBackend(FakeRedis())
assert redis_backend.get("normal_key") == NO_VALUE
redis_backend.set("normal_key", 120, {"value_1": set("ascdefgh"), 1: [1, 3]})
assert redis_backend.get("normal_key") == {
"value_1": set("ascdefgh"),
1: [1, 3],
}

def test_simple_redis_call_invalid_pickle_version(self):
redis_instance = FakeRedis()
# PICKLE HERE WILL BE SET TO VERSION 9 (\x09 in the second byte of the value)
# IF THIS STOPS FAILING WITH ValueError, CHANGE THE SECOND BYTE TO SOMETHING HIGHER
redis_instance.setex("key", 120, b"\x80\x09X\x05\x00\x00\x00valueq\x00.")
redis_backend = RedisBackend(redis_instance)
assert redis_backend.get("key") == NO_VALUE

def test_simple_redis_call_exception(self):
redis_backend = RedisBackend(FakeRedisWithIssues())
assert redis_backend.get("normal_key") == NO_VALUE
redis_backend.set("normal_key", 120, {"value_1": set("ascdefgh"), 1: [1, 3]})
assert redis_backend.get("normal_key") == NO_VALUE


class TestCache(object):
def test_simple_caching_no_backend_no_params(self, mocker):
cache = OurOwnCache()
sample_function = RandomCounter().call_function
cached_function = cache.cache_function()(sample_function)
assert cached_function() == 1
assert cached_function() == 2
assert cached_function() == 3

def test_simple_caching_no_backend_no_params_with_ttl(self, mocker):
cache = OurOwnCache()
sample_function = RandomCounter().call_function
cached_function = cache.cache_function(ttl=300)(sample_function)
assert cached_function() == 1
assert cached_function() == 2
assert cached_function() == 3

@pytest.mark.asyncio
async def test_simple_caching_no_backend_async_no_params(self, mocker):
cache = OurOwnCache()
assert cache._app == "not_configured"
sample_function = RandomCounter().async_call_function
cached_function = cache.cache_function()(sample_function)
assert (await cached_function()) == 8
assert (await cached_function()) == 40
assert (await cached_function()) == 168

def test_simple_caching_fake_backend_no_params(self, mocker):
cache = OurOwnCache()
cache.configure(FakeBackend())
assert cache._app == "shared"
sample_function = RandomCounter().call_function
cached_function = cache.cache_function()(sample_function)
assert cached_function() == 1
assert cached_function() == 1
assert cached_function() == 1

@pytest.mark.asyncio
async def test_simple_caching_fake_backend_async_no_params(self, mocker):
cache = OurOwnCache()
cache.configure(FakeBackend(), app="worker")
assert cache._app == "worker"
sample_function = RandomCounter().async_call_function
cached_function = cache.cache_function()(sample_function)
assert (await cached_function()) == 8
assert (await cached_function()) == 8
assert (await cached_function()) == 8

@pytest.mark.asyncio
async def test_make_hash_sha256(self):
assert make_hash_sha256(1) == "a4ayc/80/OGda4BO/1o/V0etpOqiLx1JwB5S3beHW0s="
assert (
make_hash_sha256("somestring")
== "l5nfZJ7iQAll9QGKjGm4wPuSgUoikOMrdpOw/36GLyw="
)
this_set = set(["1", "something", "True", "another_string_of_values"])
assert (
make_hash_sha256(this_set) == "siFp5vd4+aI5SxlURDMV3Z5Yfn5qnpSbCctIewE6m44="
)
this_set.add("ooops")
assert (
make_hash_sha256(this_set) == "aoU2Of3YNk0/iW1hqfSkXPbhIAzGMHCSCoxsiLI2b8U="
)

0 comments on commit 20f2646

Please sign in to comment.