Skip to content

Commit

Permalink
[NHUB-534] Implement async cacheable service (superdesk#2688)
Browse files Browse the repository at this point in the history
* Implement `AsyncCacheableService`

This service is an implementation based on `superdesk.services.CacheableService`

NHUB-534

* Update cache backend to avoid app context error

This prevents errors of app context not available when running cached async coroutines

NHUB-534

* Fix black removed line

* Add docstrings

NHUB-534

* Update documentation and broken reference

NHUB-534
  • Loading branch information
eos87 authored Sep 10, 2024
1 parent 938774b commit b0b3f11
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 18 deletions.
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
# documentation.
#
html_theme_options = {
"description": "Superdesk server reference v1.4",
"description": "Superdesk v3.0.dev.0",
}

# Add any paths that contain custom themes here, relative to this directory.
Expand Down
8 changes: 6 additions & 2 deletions docs/core/resource_management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
Resource Management
===================

Resource Services:
------------------
Resource Services
-----------------

The management of resources is performed using the :class:`AsyncResourceService <superdesk.core.resources.service.AsyncResourceService>` class
instances. This is similar to how it is done in Superdesk < v3.0, with some slight improvements.
Expand Down Expand Up @@ -73,3 +73,7 @@ Search References
.. autoclass:: superdesk.core.resources.cursor.MongoResourceCursorAsync
:member-order: bysource
:members:

.. autoclass:: superdesk.core.resources.service.AsyncCacheableService
:member-order: bysource
:members:
File renamed without changes.
25 changes: 18 additions & 7 deletions superdesk/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from superdesk import json_utils
from superdesk.logging import logger
from superdesk.flask import Flask


class SuperdeskMangler(hermes.Mangler):
Expand Down Expand Up @@ -37,7 +38,12 @@ class SuperdeskCacheBackend(hermes.backend.AbstractBackend):
or memcached.
"""

app: Flask

def init_app(self, app):
# set app instance for later usage in `_backend`
self.app = app

if not hasattr(app, "extensions"):
app.extensions = {}

Expand Down Expand Up @@ -73,13 +79,18 @@ def init_app(self, app):

@property
def _backend(self):
from superdesk.core import get_current_app

current_app = get_current_app().as_any()
if not current_app:
raise RuntimeError("You can only use cache within app context.")
self.init_app(current_app)
return current_app.extensions["superdesk_cache"]
# TODO-ASYNC: Figure out how to properly fix this as it throws an error
# of missing app context when run from async coroutines. For the time being,
# using the app instance that is set in the `init_app` method does the trick

# from superdesk.core import get_current_app
# current_app = get_current_app().as_any()
# if not current_app:
# raise RuntimeError("You can only use cache within app context.")
# self.init_app(current_app)
# return current_app.extensions["superdesk_cache"]

return self.app.extensions["superdesk_cache"]

def lock(self, key):
return self._backend.lock(key)
Expand Down
3 changes: 2 additions & 1 deletion superdesk/core/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .model import Resources, ResourceModel, ResourceModelWithObjectId, ResourceConfig, dataclass
from .resource_rest_endpoints import RestEndpointConfig
from .service import AsyncResourceService
from .service import AsyncResourceService, AsyncCacheableService
from ..mongo import MongoResourceConfig, MongoIndexOptions
from ..elastic.resources import ElasticResourceConfig

Expand All @@ -23,6 +23,7 @@
"fields",
"RestEndpointConfig",
"AsyncResourceService",
"AsyncCacheableService",
"MongoResourceConfig",
"MongoIndexOptions",
"ElasticResourceConfig",
Expand Down
87 changes: 80 additions & 7 deletions superdesk/core/resources/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

import random
from typing import (
Optional,
Generic,
Expand All @@ -25,18 +26,20 @@
)
import logging
import ast
import simplejson as json
from copy import deepcopy
from hashlib import sha1

from bson import ObjectId, UuidRepresentation
from bson.json_util import dumps, DEFAULT_JSON_OPTIONS
import simplejson as json
from motor.motor_asyncio import AsyncIOMotorCursor

from superdesk.errors import SuperdeskApiError
from superdesk.flask import g
from superdesk.utc import utcnow
from superdesk.cache import cache
from superdesk.errors import SuperdeskApiError
from superdesk.json_utils import SuperdeskJSONEncoder
from superdesk.core.types import SearchRequest, SortListParam, SortParam
from superdesk.core.types import SearchRequest, SortParam

from ..app import SuperdeskAsyncApp, get_current_async_app
from .fields import ObjectId as ObjectIdField
Expand Down Expand Up @@ -417,14 +420,12 @@ async def get_all_batch(self, size=500, max_iterations=10000, lookup=None) -> As
logger.warning(f"Not enough iterations for resource {self.resource_name}")

@overload
async def find(self, req: SearchRequest) -> ResourceCursorAsync[ResourceModelType]:
...
async def find(self, req: SearchRequest) -> ResourceCursorAsync[ResourceModelType]: ...

@overload
async def find(
self, req: dict, page: int = 1, max_results: int = 25, sort: SortParam | None = None
) -> ResourceCursorAsync[ResourceModelType]:
...
) -> ResourceCursorAsync[ResourceModelType]: ...

async def find(
self,
Expand Down Expand Up @@ -564,4 +565,76 @@ def filter_ignore_fields(d, fields):
return h.hexdigest()


class AsyncCacheableService(AsyncResourceService[ResourceModelType]):
"""
Handles caching for the resource, will invalidate on any changes to the resource.
Attributes:
resource_name (str): The name of the resource this service handles.
cache_lookup (dict): A dictionary to specify custom lookup parameters for caching.
"""

cache_lookup = {}

@property
def cache_key(self) -> str:
return "cached:{}".format(self.resource_name)

def get_cache(self) -> Any:
"""
Retrieve the cached value from Flask's `g` object if available.
"""
return getattr(g, self.cache_key, None)

def set_cache(self, value: Any) -> None:
"""
Set the cached value in Flask's `g` object.
"""
setattr(g, self.cache_key, value)

async def get_cached(self) -> List[Dict[str, Any]]:
"""
Retrieve cached data for the resource. If the cache is empty, fetches data from the database
and sets it in the cache. The cache is automatically refreshed with a time-to-live (TTL).
Returns:
List[Dict[str, Any]]: A list of dictionaries containing the cached data.
"""

@cache(
ttl=3600 + random.randrange(1, 300),
tags=(self.resource_name,),
key=lambda fn: f"_cache_mixin:{self.resource_name}",
)
async def _get_cached_from_db():
cursor = await self.search(lookup=self.cache_lookup, use_mongo=True)
return await cursor.to_list_raw()

cached_data = self.get_cache()

if cached_data is None:
cached_data = await _get_cached_from_db()
self.set_cache(cached_data)

return cached_data

async def get_cached_by_id(self, _id: str):
"""
Retrieve a specific resource by its ID from the cached data. If the resource is not found in
the cache, fetches it directly from the database.
Args:
_id (string): The ID of the resource to retrieve.
Returns:
Optional[Dict[str, Any]]: A dictionary representing the resource if found, otherwise None.
"""
cached = await self.get_cached()
for item in cached:
if item.get("_id") == _id:
return item
logger.warning("Cound not find item in cache resource=%s id=%s", self.resource_name, _id)
return await self.find_by_id(_id)


from .model import ResourceConfig, ResourceModel # noqa: E402
76 changes: 76 additions & 0 deletions tests/core/cacheable_service_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from unittest.mock import AsyncMock, patch
from superdesk.tests import AsyncTestCase

from superdesk.core.module import Module
from superdesk.core.resources import ResourceModel, ResourceConfig, AsyncCacheableService


class TestAsyncCacheableService(AsyncCacheableService[ResourceModel]):
resource_name = "tests_cacheable"


model_config = ResourceConfig(name="tests_cacheable", data_class=ResourceModel, service=TestAsyncCacheableService)
module = Module(name="tests.cacheable", resources=[model_config])


class AsyncCacheableServiceTestCase(AsyncTestCase):
app_config = {"MONGO_DBNAME": "sptests", "MODULES": ["tests.core.cacheable_service_test"]}

async def asyncSetUp(self):
await super().asyncSetUp()
self.service = TestAsyncCacheableService()

self.patcher1 = patch("superdesk.core.resources.service.cache")
self.patcher2 = patch.object(TestAsyncCacheableService, "get_cache")
self.patcher3 = patch.object(TestAsyncCacheableService, "set_cache")
self.patcher4 = patch.object(TestAsyncCacheableService, "search")

self.mock_cache = self.patcher1.start()
self.mock_get_cache = self.patcher2.start()
self.mock_set_cache = self.patcher3.start()
self.mock_search = self.patcher4.start()

# make cache decorator to directly return the function
self.mock_cache.side_effect = lambda *args, **kwargs: lambda fn: fn

async def asyncTearDown(self):
self.addCleanup(self.patcher1.stop)
self.addCleanup(self.patcher2.stop)
self.addCleanup(self.patcher3.stop)
self.addCleanup(self.patcher4.stop)

await super().asyncTearDown()

async def test_get_cached_from_db(self):
mock_cursor = AsyncMock()
mock_cursor.to_list_raw = AsyncMock(return_value=[{"_id": "1", "data": "test"}])
self.mock_search.return_value = mock_cursor
self.mock_get_cache.return_value = None

result = await self.service.get_cached()

assert result == [{"_id": "1", "data": "test"}]
self.mock_get_cache.is_called_once()
self.mock_set_cache.assert_called_once_with([{"_id": "1", "data": "test"}])
self.mock_search.assert_called_once()

async def test_get_cached_by_id_found_in_cache(self):
self.mock_get_cache.return_value = [{"_id": "1", "data": "test"}]

result = await self.service.get_cached_by_id("1")

assert result == {"_id": "1", "data": "test"}
self.mock_get_cache.assert_called_once()
self.mock_set_cache.assert_not_called()
self.mock_search.assert_not_called()

@patch.object(TestAsyncCacheableService, "find_by_id", return_value={"_id": "1", "data": "from_db"})
async def test_get_cached_by_id_NOT_found_in_cache(self, mock_find_by_id):
self.mock_get_cache.return_value = [{"_id": "2", "data": "test"}]

result = await self.service.get_cached_by_id("1")
assert result == {"_id": "1", "data": "from_db"}

self.mock_get_cache.assert_called_once()
self.mock_set_cache.assert_not_called()
mock_find_by_id.assert_called_once_with("1")

0 comments on commit b0b3f11

Please sign in to comment.