From c90f21b46949b86df57e9db182cce824ecc86a1d Mon Sep 17 00:00:00 2001 From: Helmy Giacoman Date: Wed, 4 Sep 2024 09:37:40 +0200 Subject: [PATCH 1/4] [SDESK-7284] Fix async commands to build a Superdesk instance (#2680) * Implement new async_cli module This module includes a custom Blueprint, AppGroup and a helper to easily create a command blueprint ready to register commands based on async function. The registered async commands will be run sync using `asgiref.sync.async_to_sync` SDESK-7284 * Rework `app:initialize_data` command The command was converted from the old class based commands into a function one using `Click` and our new `cli.register_async_command` decorator in order to use our new async services in the synchronous context of Quart's command line interface SDESK-7284 * Update docker-compose.yml * Fix type and docstrings * Rework `users:create` command Command converted from the old class based commands into a function one using `Click` and our new `cli.register_async_command` decorator. SDESK-7284 * Rework `schema:migrate` command SDESK-7284 * Update app_initialize.py * Rework `data:upgrade` and `data:downgrade` commands SDESK-7284 * Remove not needed false return * Fix async commands to use proper app context Async commands fail when using app context as they are executed in different threads or outside of the normal request context lifecycle. In order to fix that, we set the current app instance into the AsyncAppGroup so later it can be passed optionally to the command at the moment of its execution. SDESK-7284 * Fix `schema:migrate` command SDESK-7284 * Small improvement to use of app context in async commands SDESK-7284 --- apps/auth/db/__init__.py | 2 +- apps/auth/db/commands.py | 82 ++--- apps/prepopulate/__init__.py | 2 +- apps/prepopulate/app_initialize.py | 314 +++++++++---------- docker-compose.yml | 2 +- setup.py | 1 + superdesk/commands/__init__.py | 10 + superdesk/commands/async_cli.py | 107 +++++++ superdesk/commands/data_manipulation.py | 13 +- superdesk/commands/data_updates.py | 285 ++++++++--------- superdesk/commands/schema.py | 40 +-- superdesk/factory/app.py | 7 +- superdesk/flask.py | 2 + superdesk/tests/environment.py | 5 +- superdesk/tests/steps.py | 11 +- tests/auth/create_user_command_test.py | 13 +- tests/commands/data_updates_test.py | 24 +- tests/prepopulate/app_initialization_test.py | 6 +- 18 files changed, 534 insertions(+), 392 deletions(-) create mode 100644 superdesk/commands/async_cli.py diff --git a/apps/auth/db/__init__.py b/apps/auth/db/__init__.py index 63173988ae..97bc6b6bb9 100644 --- a/apps/auth/db/__init__.py +++ b/apps/auth/db/__init__.py @@ -13,7 +13,7 @@ from .reset_password import ResetPasswordService, ResetPasswordResource, ActiveTokensResource import superdesk from .db import DbAuthService -from .commands import CreateUserCommand, HashUserPasswordsCommand # noqa +from .commands import create_user_command, HashUserPasswordsCommand # noqa from superdesk.services import BaseService from apps.auth.db.change_password import ChangePasswordService, ChangePasswordResource diff --git a/apps/auth/db/commands.py b/apps/auth/db/commands.py index ac107d1236..23428cfa89 100644 --- a/apps/auth/db/commands.py +++ b/apps/auth/db/commands.py @@ -8,13 +8,19 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license -import logging -import json import csv +import json +import click +import logging + +import superdesk + from pathlib import Path from base64 import b64encode from superdesk.core import get_app_config, get_current_app -import superdesk + +from superdesk.flask import Flask +from superdesk.commands import cli from superdesk.utils import get_hash, is_hashed @@ -22,7 +28,13 @@ USER_FIELDS_NAMES = {"username", "email", "password", "first_name", "last_name", "sign_off", "role"} -class CreateUserCommand(superdesk.Command): +@cli.register_async_command("users:create", with_appcontext=True) +@click.option("--username", "-u", required=True, help="Username for the new user.") +@click.option("--password", "-p", required=True, help="Password for the new user.") +@click.option("--email", "-e", required=True, help="Email address for the new user.") +@click.option("--admin", "-a", is_flag=True, help="Specify if the user is an administrator.") +@click.option("--support", "-s", is_flag=True, help="Specify if the user is a support user.") +async def create_user_command(*args, **kwargs): """Create a user with given username, password and email. If user with given username exists it's noop. @@ -33,46 +45,37 @@ class CreateUserCommand(superdesk.Command): $ python manage.py users:create -u admin -p admin -e 'admin@example.com' --admin """ + return await create_user_command_handler(*args, **kwargs) - option_list = [ - superdesk.Option("--username", "-u", dest="username", required=True), - superdesk.Option("--password", "-p", dest="password", required=True), - superdesk.Option("--email", "-e", dest="email", required=True), - superdesk.Option("--admin", "-a", dest="admin", required=False, action="store_true"), - superdesk.Option("--support", "-s", dest="support", required=False, action="store_true"), - ] - async def run(self, username, password, email, admin=False, support=False): - # force type conversion to boolean - user_type = "administrator" if admin else "user" - - userdata = { - "username": username, - "password": password, - "email": email, - "user_type": user_type, - "is_active": admin, - "is_support": support, - "needs_activation": not admin, - } - - app = get_current_app().as_any() - async with app.test_request_context("/users", method="POST"): - if userdata.get("password", None) and not is_hashed(userdata.get("password")): - userdata["password"] = get_hash( - userdata.get("password"), get_app_config("BCRYPT_GENSALT_WORK_FACTOR", 12) - ) +async def create_user_command_handler(username: str, password: str, email: str, admin=False, support=False): + user_type = "administrator" if admin else "user" + userdata = { + "username": username, + "password": password, + "email": email, + "user_type": user_type, + "is_active": admin, + "is_support": support, + "needs_activation": not admin, + } + + app = get_current_app().as_any() - user = superdesk.get_resource_service("users").find_one(username=userdata.get("username"), req=None) + async with app.test_request_context("/users", method="POST"): + if userdata.get("password", None) and not is_hashed(userdata.get("password")): + userdata["password"] = get_hash(userdata.get("password"), get_app_config("BCRYPT_GENSALT_WORK_FACTOR", 12)) - if user: - logger.info("user already exists %s" % (userdata)) - else: - logger.info("creating user %s" % (userdata)) - superdesk.get_resource_service("users").post([userdata]) - logger.info("user saved %s" % (userdata)) + user = superdesk.get_resource_service("users").find_one(username=userdata.get("username"), req=None) + + if user: + logger.info("user already exists %s" % (userdata)) + else: + logger.info("creating user %s" % (userdata)) + superdesk.get_resource_service("users").post([userdata]) + logger.info("user saved %s" % (userdata)) - return userdata + return userdata class ImportUsersCommand(superdesk.Command): @@ -279,7 +282,6 @@ def run(self, username, password): return encoded_token -superdesk.command("users:create", CreateUserCommand()) superdesk.command("users:import", ImportUsersCommand()) superdesk.command("users:hash_passwords", HashUserPasswordsCommand()) superdesk.command("users:get_auth_token", GetAuthTokenCommand()) diff --git a/apps/prepopulate/__init__.py b/apps/prepopulate/__init__.py index 66ca13d8c5..23a7bcf77e 100644 --- a/apps/prepopulate/__init__.py +++ b/apps/prepopulate/__init__.py @@ -12,7 +12,7 @@ from .app_prepopulate import PrepopulateService, PrepopulateResource from .app_populate import AppPopulateCommand # NOQA -from .app_initialize import AppInitializeWithDataCommand # NOQA +from .app_initialize import app_initialize_data_command # NOQA from .app_scaffold_data import AppScaffoldDataCommand # NOQA diff --git a/apps/prepopulate/app_initialize.py b/apps/prepopulate/app_initialize.py index 536599d8c0..b999afbaba 100644 --- a/apps/prepopulate/app_initialize.py +++ b/apps/prepopulate/app_initialize.py @@ -1,15 +1,17 @@ -import json -import logging import os import re +import json +import click +import logging +import pymongo +import superdesk import elasticsearch.exceptions -from collections import OrderedDict from pathlib import Path +from collections import OrderedDict -import superdesk -import pymongo - +from superdesk.flask import Flask +from superdesk.commands import cli from superdesk.core import get_current_app, get_app_config from superdesk.resource_fields import ETAG from superdesk.commands.flush_elastic_index import FlushElasticIndex @@ -216,7 +218,13 @@ def get_filepath(filename, path=None): return filepath -class AppInitializeWithDataCommand(superdesk.Command): +@cli.register_async_command("app:initialize_data", with_appcontext=True) +@click.option("--entity-name", "-n", multiple=True, help="Entity(ies) to initialize") +@click.option("--full-path", "-p", help="Path of the file to import") +@click.option("--sample-data", is_flag=True, help="Use sample data") +@click.option("--force", "-f", is_flag=True, help="Update item even if modified by user") +@click.option("--init-index-only", "-i", is_flag=True, help="Initialize index only") +async def app_initialize_data_command(*args, **kwargs): """Initialize application with predefined data for various entities. Loads predefined data (vocabularies, desks, etc..) for instance. @@ -227,17 +235,15 @@ class AppInitializeWithDataCommand(superdesk.Command): Supported entities: :: - roles, users, desks, stages, vocabularies, validators, - content_templates, content_types, published, activity, - archive, archive_versions, ingest, publish_queue, archived, - legal_archive, legal_archive_versions, legal_publish_queue, - dictionaries, ingest_providers, search_providers, products, - subscribers, workspaces, item_comments, audit, contacts, - planning_types + roles, users, desks, stages, vocabularies, validators, content_templates, + content_types, published, activity, archive, archive_versions, ingest, + publish_queue, archived, legal_archive, legal_archive_versions, legal_publish_queue, + dictionaries, ingest_providers, search_providers, products, subscribers, + workspaces, item_comments, audit, contacts, planning_types If no --entity-name parameter is supplied, all the entities are inserted. - The entities: + The entities: :: vocabularies, validators, content_types, dictionaries, ingest_providers, @@ -247,6 +253,13 @@ class AppInitializeWithDataCommand(superdesk.Command): will be updated with the predefined data if it already exists, no action will be taken for the other entities. + Args: + entity_name (str | list | None, optional): Entity(ies) to initialize. + path (str | None, optional): Path of the file to import. + sample_data (bool, optional): If True, sample data will be used. + force (bool, optional): If True, update items even if they have been modified by the user. + init_index_only (bool, optional): If True, only the indexes are initialized. + Example: :: @@ -255,150 +268,138 @@ class AppInitializeWithDataCommand(superdesk.Command): $ python manage.py app:initialize_data --entity-name=content_types """ + return await app_initialize_data_handler(*args, **kwargs) + + +async def app_initialize_data_handler( + entity_name=None, full_path=None, sample_data=False, force=False, init_index_only=False +): + logger.info("Starting data initialization") + logger.info("Config: %s", get_app_config("APP_ABSPATH")) + + # create indexes in mongo + # We can safely ignore duplicate key errors as this only affects performance + # As we want the rest of this command to still execute + app = get_current_app() + app.init_indexes(ignore_duplicate_keys=True) + + rebuild_elastic_on_init_data_error = get_app_config("REBUILD_ELASTIC_ON_INIT_DATA_ERROR") + + # put mapping to elastic + try: + await app.data.init_elastic(app, raise_on_mapping_error=rebuild_elastic_on_init_data_error) + except elasticsearch.exceptions.TransportError as err: + logger.error(err) + if rebuild_elastic_on_init_data_error: + logger.warning("Can't update the mapping, running app:flush_elastic_index command now.") + FlushElasticIndex().run(sd_index=True, capi_index=True) + else: + logger.warning("Can't update the mapping, please run app:flush_elastic_index command.") - name = "app:initialize_data" - - option_list = [ - superdesk.Option("--entity-name", "-n", action="append"), - superdesk.Option("--full-path", "-p", dest="path"), - superdesk.Option("--sample-data", action="store_true"), - superdesk.Option("--force", "-f", action="store_true"), - superdesk.Option("--init-index-only", "-i", action="store_true"), - ] - - async def run(self, entity_name=None, path=None, sample_data=False, force=False, init_index_only=False, **kwargs): - """Run the initialization - - :param str,list,NoneType entity_name: entity(ies) to initialize - :param str,NoneType path: path of the file to import - :param bool sample_data: True if sample data need to be used - :param bool force: if True, update item even if it has been modified by user - :param bool init_index_only: if True, it only initializes index only - """ - logger.info("Starting data initialization") - logger.info("Config: %s", get_app_config("APP_ABSPATH")) - - # create indexes in mongo - # We can safely ignore duplicate key errors as this only affects performance - # As we want the rest of this command to still execute - app = get_current_app() - app.init_indexes(ignore_duplicate_keys=True) - - rebuild_elastic_on_init_data_error = get_app_config("REBUILD_ELASTIC_ON_INIT_DATA_ERROR") - - # put mapping to elastic - try: - await app.data.init_elastic(app, raise_on_mapping_error=rebuild_elastic_on_init_data_error) - except elasticsearch.exceptions.TransportError as err: - logger.error(err) - if rebuild_elastic_on_init_data_error: - logger.warning("Can't update the mapping, running app:flush_elastic_index command now.") - FlushElasticIndex().run(sd_index=True, capi_index=True) - else: - logger.warning("Can't update the mapping, please run app:flush_elastic_index command.") - - if init_index_only: - logger.info("Only indexes initialized.") - return 0 - - if sample_data: - if not path: - path = INIT_DATA_PATH.parent / "data_sample" - else: - raise ValueError("path and sample_data should not be set at the same time") - - if entity_name: - if isinstance(entity_name, str): - entity_name = [entity_name] - for name in entity_name: - (file_name, index_params, do_patch) = __entities__[name] - self.import_file(name, path, file_name, index_params, do_patch, force) - return 0 - - for name, (file_name, index_params, do_patch) in __entities__.items(): - try: - self.import_file(name, path, file_name, index_params, do_patch, force) - except KeyError: - continue - except Exception as ex: - logger.exception(ex) - logger.info("Exception loading entity {} from {}".format(name, file_name)) - - logger.info("Data import finished") + if init_index_only: + logger.info("Only indexes initialized.") return 0 - def import_file(self, entity_name, path, file_name, index_params, do_patch=False, force=False): - """Imports seed data based on the entity_name (resource name) from the file_name specified. - - index_params use to create index for that entity/resource - - :param str entity_name: name of the resource - :param str file_name: file name that contains seed data - :param list index_params: list of indexes that is created on that entity. - For example: - [[("first_name", pymongo.ASCENDING), ("last_name", pymongo.ASCENDING)], "username"] will create two indexes - - composite index of "first_name", "last_name" field. - - index on username field. - Alternatively index param can be specified as - [[("first_name", pymongo.ASCENDING), ("last_name", pymongo.ASCENDING)], [("username", pymongo.ASCENDING)]] - Refer to pymongo create_index documentation for more information. - http://api.mongodb.org/python/current/api/pymongo/collection.html - :param bool do_patch: if True then patch the document else don't patch. - """ - logger.info("Process %r", entity_name) - file_path = file_name and get_filepath(file_name, path) - app = get_current_app() - - if not file_path: - pass - elif not file_path.exists(): - logger.info(" - file not exists: %s", file_path) + path = full_path + if sample_data: + if not path: + path = INIT_DATA_PATH.parent / "data_sample" else: - logger.info(" - got file path: %s", file_path) - with file_path.open("rt", encoding="utf-8") as app_prepopulation: - service = superdesk.get_resource_service(entity_name) - json_data = json.loads(app_prepopulation.read()) - data = [fillEnvironmentVariables(item) for item in json_data] - data = [app.data.mongo._mongotize(item, service.datasource) for item in data if item] - existing_data = [] - existing = service.get_from_mongo(None, {}) - update_data = True - if not do_patch and existing.count() > 0: - logger.info(" - data already exists none will be loaded") - update_data = False - elif do_patch and existing.count() > 0: - logger.info(" - data already exists it will be updated") - - if update_data: - if do_patch: - for item in existing: - for loaded_item in data: - if "_id" in loaded_item and loaded_item["_id"] == item["_id"]: - data.remove(loaded_item) - if force or item.get("init_version", 0) < loaded_item.get("init_version", 0): - existing_data.append(loaded_item) - - if data: - for item in data: - if not item.get(ETAG): - item.setdefault(ETAG, "init") - service.post(data) - - if existing_data and do_patch: - for item in existing_data: - item["_etag"] = "init" - service.update(item["_id"], item, service.find_one(None, _id=item["_id"])) - - logger.info(" - file imported successfully: %s", file_name) - - if index_params: - for index in index_params: - crt_index = list(index) if isinstance(index, list) else index - options = crt_index.pop() if isinstance(crt_index[-1], dict) and isinstance(index, list) else {} - collection = app.data.mongo.pymongo(resource=entity_name).db[entity_name] - options.setdefault("background", True) - index_name = collection.create_index(crt_index, **options) - logger.info(" - index: %s for collection %s created successfully.", index_name, entity_name) + raise ValueError("path and sample_data should not be set at the same time") + + if entity_name: + if isinstance(entity_name, str): + entity_name = [entity_name] + for name in entity_name: + (file_name, index_params, do_patch) = __entities__[name] + import_file(name, path, file_name, index_params, do_patch, force) + return 0 + + for name, (file_name, index_params, do_patch) in __entities__.items(): + try: + import_file(name, path, file_name, index_params, do_patch, force) + except KeyError: + continue + except Exception as ex: + logger.exception(ex) + logger.info("Exception loading entity {} from {}".format(name, file_name)) + + logger.info("Data import finished") + return 0 + + +def import_file(entity_name, path, file_name, index_params, do_patch=False, force=False): + """Imports seed data based on the entity_name (resource name) from the file_name specified. + + index_params use to create index for that entity/resource + + :param str entity_name: name of the resource + :param str file_name: file name that contains seed data + :param list index_params: list of indexes that is created on that entity. + For example: + [[("first_name", pymongo.ASCENDING), ("last_name", pymongo.ASCENDING)], "username"] will create two indexes + - composite index of "first_name", "last_name" field. + - index on username field. + Alternatively index param can be specified as + [[("first_name", pymongo.ASCENDING), ("last_name", pymongo.ASCENDING)], [("username", pymongo.ASCENDING)]] + Refer to pymongo create_index documentation for more information. + http://api.mongodb.org/python/current/api/pymongo/collection.html + :param bool do_patch: if True then patch the document else don't patch. + """ + logger.info("Process %r", entity_name) + file_path = file_name and get_filepath(file_name, path) + app = get_current_app() + + if not file_path: + pass + elif not file_path.exists(): + logger.info(" - file not exists: %s", file_path) + else: + logger.info(" - got file path: %s", file_path) + with file_path.open("rt", encoding="utf-8") as app_prepopulation: + service = superdesk.get_resource_service(entity_name) + json_data = json.loads(app_prepopulation.read()) + data = [fillEnvironmentVariables(item) for item in json_data] + data = [app.data.mongo._mongotize(item, service.datasource) for item in data if item] + existing_data = [] + existing = service.get_from_mongo(None, {}) + update_data = True + if not do_patch and existing.count() > 0: + logger.info(" - data already exists none will be loaded") + update_data = False + elif do_patch and existing.count() > 0: + logger.info(" - data already exists it will be updated") + + if update_data: + if do_patch: + for item in existing: + for loaded_item in data: + if "_id" in loaded_item and loaded_item["_id"] == item["_id"]: + data.remove(loaded_item) + if force or item.get("init_version", 0) < loaded_item.get("init_version", 0): + existing_data.append(loaded_item) + + if data: + for item in data: + if not item.get(ETAG): + item.setdefault(ETAG, "init") + service.post(data) + + if existing_data and do_patch: + for item in existing_data: + item["_etag"] = "init" + service.update(item["_id"], item, service.find_one(None, _id=item["_id"])) + + logger.info(" - file imported successfully: %s", file_name) + + if index_params: + for index in index_params: + crt_index = list(index) if isinstance(index, list) else index + options = crt_index.pop() if isinstance(crt_index[-1], dict) and isinstance(index, list) else {} + collection = app.data.mongo.pymongo(resource=entity_name).db[entity_name] + options.setdefault("background", True) + index_name = collection.create_index(crt_index, **options) + logger.info(" - index: %s for collection %s created successfully.", index_name, entity_name) def fillEnvironmentVariables(item): @@ -416,6 +417,3 @@ def fillEnvironmentVariables(item): text = text.replace("#ENV_%s#" % name, variables[name]) return json.loads(text) - - -superdesk.register_command(AppInitializeWithDataCommand) diff --git a/docker-compose.yml b/docker-compose.yml index ee115e434b..07db3dcfc2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: "3.2" # Note: this file is mainly used for tests. If you want to run Superdesk or its # services from docker, please use docker-compose.yml found in main Superdesk # repository (https://github.com/superdesk/superdesk) - +name: superdesk-core-services services: elastic: image: docker.elastic.co/elasticsearch/elasticsearch:7.17.22 diff --git a/setup.py b/setup.py index a585bffc48..ad91164b31 100644 --- a/setup.py +++ b/setup.py @@ -65,6 +65,7 @@ "eve-elastic @ git+https://github.com/MarkLark86/eve-elastic@use-quart", "quart @ git+https://github.com/MarkLark86/quart@fix-test-client-with-utf8-url", "quart_babel @ git+https://github.com/MarkLark86/quart-babel@fix-get-format", + "asgiref>=3.8.1", # Patch Quart, Asyncio to work with Flask extensions # TODO-ASYNC: Remove this with our own flask patch (as quart-flask-patch also patches asyncio) "quart-flask-patch>=0.3.0,<0.4", diff --git a/superdesk/commands/__init__.py b/superdesk/commands/__init__.py index 49e4bd6e9f..ed110a54c0 100644 --- a/superdesk/commands/__init__.py +++ b/superdesk/commands/__init__.py @@ -10,6 +10,7 @@ from .generate_vocabularies import GenerateVocabularies # noqa from . import data_manipulation # noqa from . import schema # noqa +from .async_cli import cli, commands_blueprint # noqa import superdesk @@ -28,3 +29,12 @@ def init_app(app) -> None: data_manipulation.RestoreRecordResource(endpoint_name, app=app, service=service) superdesk.intrinsic_privilege(resource_name=endpoint_name, method=["POST"]) + + +def configure_cli(app) -> None: + """ + Sets the current app instance into the `AsyncAppGroup` to later be passed as context of the commands. + It also registers the commands blueprint + """ + cli.set_current_app(app) + app.register_blueprint(commands_blueprint) diff --git a/superdesk/commands/async_cli.py b/superdesk/commands/async_cli.py new file mode 100644 index 0000000000..c634e7f450 --- /dev/null +++ b/superdesk/commands/async_cli.py @@ -0,0 +1,107 @@ +from functools import wraps +from typing import Any, Callable, Optional, Tuple, cast +from asgiref.sync import async_to_sync + +from ..flask import Blueprint, AppGroup, Flask + + +class AsyncAppGroup(AppGroup): + """ + An extension of Quart's AppGroup to support registration of asynchronous command handlers. + + This class provides a mechanism to register asynchronous functions as command line commands, + which are automatically handled synchronously using `asgiref.sync.async_to_sync` conversion. + """ + + app: Flask + + def register_async_command( + self, name: Optional[str] = None, with_appcontext=False, **click_kwargs: dict[str, Any] + ) -> Callable[..., Any]: + """ + A decorator to register an asynchronous command within the Quart CLI app group. + If your command needs to use the app context, pass `with_appcontext=True` to ensure it will be run + wrapped with current app context. Otherwise getting current app context will fail in asynchronous + command contexts due to execution in different threads or outside of the normal request context lifecycle. + + Args: + name (str, optional): The name of the command. Defaults to the function's name if None. + with_appcontext (bool): If True, it will wrap the command function in the current_app's context. + Note that `set_current_app` method has to be called first. + **click_kwargs: Additional keyword arguments that are passed to the `AppGroup.command` decorator. + + Example: + Use the decorator to register a command that requires app context: + + .. code-block:: python + + from superdesk.commands import cli + + @cli.register_async_command("example-command", with_appcontext=True) + async def my_command(): + app = get_current_app() + with app.app_context(): + # your command logic here + """ + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + # This wrapper will convert the async function to a sync function + # using async_to_sync when the command is actually called. + def sync_wrapper(*args, **kwargs) -> Any: + if with_appcontext: + return async_to_sync(self.run_with_appcontext)(func, *args, **kwargs) + return async_to_sync(func)(*args, **kwargs) + + return self.command(name, **click_kwargs)(sync_wrapper) + + return decorator + + async def run_with_appcontext(self, func: Callable[..., Any], *args, **kwargs): + current_app = self.get_current_app() + if current_app is None: + raise RuntimeError("No app instance available. Make sure to run `set_current_app(app_instance)` first.") + + async with current_app.app_context(): + return await func(*args, **kwargs) + + def set_current_app(self, app: Flask): + """ + Sets current app instance so it can be passed down to commands if needed + """ + self.app = app + + def get_current_app(self) -> Flask: + return self.app + + +class CommandsBlueprint(Blueprint): + """ + Custom Blueprint that integrates AsyncAppGroup to register CLI commands that are asynchronous, + allowing them to be used in a synchronous context by Quart's command line interface. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.cli = AsyncAppGroup() + + +def create_commands_blueprint(blueprint_name: str) -> Tuple[CommandsBlueprint, AsyncAppGroup]: + """ + Create a Blueprint to organize all superdesk commands. + + By setting cli_group=None, any new CLI commands added to this blueprint + will still be compatible with the existing `python manage.py ` + format. + + Returns: + Tuple[Blueprint, AppGroup]: A tuple containing the configured Blueprint + object and its associated CLI AppGroup for command registration. + """ + blueprint = CommandsBlueprint(blueprint_name, __name__, cli_group=None) + + return blueprint, cast(AsyncAppGroup, blueprint.cli) + + +commands_blueprint, cli = create_commands_blueprint("superdesk") diff --git a/superdesk/commands/data_manipulation.py b/superdesk/commands/data_manipulation.py index fefb34466a..2e4e169ef2 100644 --- a/superdesk/commands/data_manipulation.py +++ b/superdesk/commands/data_manipulation.py @@ -18,9 +18,10 @@ import shutil import bz2 import pymongo.database +import multiprocessing.synchronize +from asgiref.sync import async_to_sync from multiprocessing import Process, Lock -import multiprocessing.synchronize from contextlib import contextmanager from datetime import datetime from enum import IntEnum @@ -841,8 +842,11 @@ def do_migration(self, ori_dump: str): record_process.start() # we have to wait for the recording to be actually started lock.acquire() + # recording is started, we can launch the migration scripts - data_updates.Upgrade().run() + # TODO-ASYNC: remove usage of async_to_sync once this command is migrated to async + async_to_sync(data_updates.upgrade_command_handler)() + # migration is done, we stop the recording if record_process.pid is None: logger.error("Process ID should available!") @@ -891,7 +895,10 @@ def do_migration(self, ori_dump: str): metadata = get_dump_metadata(p) StorageRestore().run(keep_existing=False, dump_path=p) print(f"{INFO}Applying data migration scripts") - data_updates.Upgrade().run() + + # TODO-ASYNC: remove usage of async_to_sync once this command is migrated to async + async_to_sync(data_updates.upgrade_command_handler)() + print(f"{INFO}Updating dump") if p.is_dir(): shutil.rmtree(p) diff --git a/superdesk/commands/data_updates.py b/superdesk/commands/data_updates.py index 3bd08d38b6..8b0726e3d5 100644 --- a/superdesk/commands/data_updates.py +++ b/superdesk/commands/data_updates.py @@ -8,19 +8,26 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license - -from string import Template -from types import ModuleType -from superdesk.core import get_app_config, get_current_app -from superdesk.services import BaseService -import superdesk -import getpass +from functools import wraps import os import re import time +import click +import getpass +import superdesk + +from string import Template +from types import ModuleType from typing import Optional, Tuple + from eve.utils import ParsedRequest +from superdesk.flask import Flask +from superdesk.services import BaseService +from superdesk.core import get_app_config, get_current_app + +from .async_cli import cli + DEFAULT_DATA_UPDATE_DIR_NAME = "data_updates" MAIN_DATA_UPDATES_DIR = os.path.abspath( @@ -102,70 +109,60 @@ def get_applied_updates(data_updates_service: Optional[BaseService] = None) -> T return tuple(data_updates_service.get(req=req, lookup={})) # type: ignore -class DataUpdateCommand(superdesk.Command): - """Parent class for Upgrade and Downgrade commands. +async def initialize_data_updates(fake, dry): + data_updates_service = superdesk.get_resource_service("data_updates") + data_updates_files = get_data_updates_files(strip_file_extension=True) + data_updates_applied = get_applied_updates(data_updates_service) + last_data_update = data_updates_applied and data_updates_applied[-1] or None - It defines options and initialize some variables in `run` method. - """ + if last_data_update and last_data_update["name"] not in data_updates_files: + print( + "A data update previously applied to this database (%s) can't be found in %s" + % (last_data_update["name"], ", ".join(get_dirs())) + ) + return data_updates_service, data_updates_files, last_data_update - option_list = [ - # superdesk.Option( - # "--id", - # "-i", - # dest="data_update_id", - # required=False, - # help="Data update id to run last", - # ), - # superdesk.Option( - # "--fake-init", - # dest="fake", - # required=False, - # action="store_true", - # help="Mark data updates as run without actually running them", - # ), - # superdesk.Option( - # "--dry-run", - # dest="dry", - # required=False, - # action="store_true", - # help="Does not mark data updates as done. This can be useful for development.", - # ), - ] - def run(self, data_update_id=None, fake=False, dry=False): - self.data_updates_service = superdesk.get_resource_service("data_updates") - self.data_updates_files = get_data_updates_files(strip_file_extension=True) - # retrieve existing data updates in database - data_updates_applied = get_applied_updates(self.data_updates_service) - self.last_data_update = data_updates_applied and data_updates_applied[-1] or None - if self.last_data_update: - if self.last_data_update["name"] not in self.data_updates_files: - print( - "A data update previously applied to this database (%s) can't be found in %s" - % (self.last_data_update["name"], ", ".join(get_dirs())) - ) - - def compile_update_in_module(self, data_update_name): - date_update_script_file = None - for folder in get_dirs(): - date_update_script_file = os.path.join(folder, "%s.py" % (data_update_name)) - if os.path.exists(date_update_script_file): - break - assert date_update_script_file is not None, "File %s has not been found" % (data_update_name) - # create a module instance to use as scope for our data update - module = ModuleType("data_update_module") - with open(date_update_script_file) as f: - # compile data update script file - script = compile(f.read(), date_update_script_file, "exec") - # excecute the script in the module - exec(script, module.__dict__) - return module - - def in_db(self, update): - return update in map(lambda _: _["name"], get_applied_updates(self.data_updates_service)) - - -class Upgrade(DataUpdateCommand): +def compile_update_in_module(data_update_name): + date_update_script_file = None + for folder in get_dirs(): + date_update_script_file = os.path.join(folder, "%s.py" % (data_update_name)) + if os.path.exists(date_update_script_file): + break + assert date_update_script_file is not None, "File %s has not been found" % (data_update_name) + + # create a module instance to use as scope for our data update + module = ModuleType("data_update_module") + with open(date_update_script_file) as f: + # compile data update script file + script = compile(f.read(), date_update_script_file, "exec") + # excecute the script in the module + exec(script, module.__dict__) + + return module + + +def in_db(update, data_updates_service): + applied_updates = get_applied_updates(data_updates_service) + return update in map(lambda _: _["name"], applied_updates) + + +def common_options(func): + @click.option("--data-update-id", "-i", type=str, required=False, help="Data update id to run last") + @click.option("--fake-init", "fake", is_flag=True, help="Mark data updates as run without actually running them") + @click.option( + "--dry-run", "dry", is_flag=True, help="Does not mark data updates as done. This can be useful for development." + ) + @wraps(func) + async def wrapper(*args, **kwargs): + return await func(*args, **kwargs) + + return wrapper + + +@cli.register_async_command("data:upgrade", with_appcontext=True) +@common_options +async def upgrade_command(*args, **kwargs): """Runs all the new data updates available. If ``data_update_id`` is given, runs new data updates until the given one. @@ -176,39 +173,42 @@ class Upgrade(DataUpdateCommand): $ python manage.py data:upgrade """ + return await upgrade_command_handler(*args, **kwargs) + - def run(self, data_update_id=None, fake=False, dry=False): - if data_update_id and data_update_id not in get_data_updates_files(strip_file_extension=True): - print( - "Error argument --id/-i: invalid choice: '{}'" - " (choose from {})".format(data_update_id, get_data_updates_files(strip_file_extension=True)) - ) - return - super().run(data_update_id, fake, dry) - data_updates_files = self.data_updates_files - # drops updates that already have been applied - data_updates_files = [update for update in data_updates_files if not self.in_db(update)] - # drop versions after given one - if data_update_id: - if data_update_id not in data_updates_files: - print("Given data update id not found in available updates. It may have been already applied") - return False - data_updates_files = data_updates_files[: data_updates_files.index(data_update_id) + 1] - # apply data updates - for data_update_name in data_updates_files: - print("data update %s running forward..." % (data_update_name)) - module_scope = self.compile_update_in_module(data_update_name) - # run the data update forward - if not fake: - module_scope.DataUpdate().apply("forwards") - if not dry: - # store the applied data update in the database - self.data_updates_service.create([{"name": data_update_name}]) - if not data_updates_files: - print("No data update to apply.") - - -class Downgrade(DataUpdateCommand): +async def upgrade_command_handler(data_update_id=None, fake=False, dry=False): + data_updates_service, data_updates_files, _ = await initialize_data_updates(fake, dry) + + if data_update_id and data_update_id not in data_updates_files: + print( + "Error argument --id/-i: invalid choice: '{}' (choose from {})".format(data_update_id, data_updates_files) + ) + return + + # Filter and apply updates + data_updates_files = [update for update in data_updates_files if not in_db(update, data_updates_service)] + if data_update_id: + if data_update_id not in data_updates_files: + print("Given data update id not found in available updates. It may have been already applied") + return False + data_updates_files = data_updates_files[: data_updates_files.index(data_update_id) + 1] + + # apply data updates + for data_update_name in data_updates_files: + print(f"data update {data_update_name} running forward...") + module_scope = compile_update_in_module(data_update_name) + if not fake: + module_scope.DataUpdate().apply("forwards") + if not dry: + data_updates_service.create([{"name": data_update_name}]) + + if not data_updates_files: + print("No data update to apply.") + + +@cli.register_async_command("data:downgrade", with_appcontext=True) +@common_options +async def downgrade_command(*args, **kwargs): """Runs the latest data update backward. If ``data_update_id`` is given, runs all the data updates backward until the given one. @@ -220,47 +220,50 @@ class Downgrade(DataUpdateCommand): """ - def run(self, data_update_id=None, fake=False, dry=False): - if data_update_id and data_update_id not in get_data_updates_files(strip_file_extension=True): - print( - "Error argument --id/-i: invalid choice: '{}'" - " (choose from {})".format(data_update_id, get_data_updates_files(strip_file_extension=True)) - ) - return - super().run(data_update_id, fake, dry) - data_updates_files = self.data_updates_files - # check if there is something to downgrade - if not self.last_data_update: - print("No data update has been already applied") + return await downgrade_command_handler(*args, **kwargs) + + +async def downgrade_command_handler(data_update_id=None, fake=False, dry=False): + data_updates_service, data_updates_files, last_data_update = await initialize_data_updates(fake, dry) + + if data_update_id and data_update_id not in data_updates_files: + print( + "Error argument --id/-i: invalid choice: '{}'" + " (choose from {})".format(data_update_id, get_data_updates_files(strip_file_extension=True)) + ) + return + + # check if there is something to downgrade + if not last_data_update: + print("No data update has been already applied") + return False + + # drops updates which have not been already made (this is rollback mode) + data_updates_files = [update for update in data_updates_files if in_db(update, data_updates_service)] + + # if data_update_id is given, go until this update (drop previous updates) + if data_update_id: + if data_update_id not in data_updates_files: + print(f"Update {data_update_id} can't be find. It may have been already downgraded") return False - # drops updates which have not been already made (this is rollback mode) - data_updates_files = [update for update in data_updates_files if self.in_db(update)] - - # if data_update_id is given, go until this update (drop previous updates) - if data_update_id: - if data_update_id not in data_updates_files: - print("Update %s can't be find. It may have been already downgraded" % (data_update_id)) - return False - data_updates_files = data_updates_files[data_updates_files.index(data_update_id) :] - # otherwise, just rollback one update - else: - print( - "No data update id has been provided. Dowgrading to previous version: %s" - % self.last_data_update["name"] - ) - data_updates_files = data_updates_files[len(data_updates_files) - 1 :] - # apply data updates, in the opposite direction - for data_update_name in reversed(data_updates_files): - print("data update %s running backward..." % (data_update_name)) - module_scope = self.compile_update_in_module(data_update_name) - # run the data update backward - if not fake: - module_scope.DataUpdate().apply("backwards") - if not dry: - # remove the applied data update from the database - self.data_updates_service.delete({"name": data_update_name}) - if not data_updates_files: - print("No data update to apply.") + data_updates_files = data_updates_files[data_updates_files.index(data_update_id) :] + # otherwise, just rollback one update + else: + print(f"No data update id has been provided. Dowgrading to previous version: {last_data_update['name']}") + data_updates_files = data_updates_files[len(data_updates_files) - 1 :] + + # apply data updates, in the opposite direction + for data_update_name in reversed(data_updates_files): + print(f"data update {data_update_name} running backward...") + module_scope = compile_update_in_module(data_update_name) + # run the data update backward + if not fake: + module_scope.DataUpdate().apply("backwards") + if not dry: + # remove the applied data update from the database + data_updates_service.delete({"name": data_update_name}) + if not data_updates_files: + print("No data update to apply.") class GenerateUpdate(superdesk.Command): @@ -315,8 +318,6 @@ def run(self, resource_name, global_update=False): superdesk.command("data:generate_update", GenerateUpdate()) -superdesk.command("data:upgrade", Upgrade()) -superdesk.command("data:downgrade", Downgrade()) class BaseDataUpdate: diff --git a/superdesk/commands/schema.py b/superdesk/commands/schema.py index 97aeaf5022..dfe4f83186 100644 --- a/superdesk/commands/schema.py +++ b/superdesk/commands/schema.py @@ -12,10 +12,13 @@ import superdesk -from superdesk.core import get_app_config, get_current_app +from superdesk.flask import Flask from superdesk.lock import lock, unlock +from superdesk.core import get_app_config, get_current_app from superdesk.commands.rebuild_elastic_index import RebuildElasticIndex +from .async_cli import cli + VERSION_ID = "schema_version" @@ -46,7 +49,8 @@ def update_schema(): RebuildElasticIndex().run() -class SchemaMigrateCommand(superdesk.Command): +@cli.register_async_command("schema:migrate", with_appcontext=True) +async def schema_migrate_command(): """Migrate elastic schema if needed, should be triggered on every deploy. It compares version set in code (latest) to one stored in db and only updates @@ -62,23 +66,23 @@ class SchemaMigrateCommand(superdesk.Command): """ - def run(self): - lock_name = "schema:migrate" + return await schema_migrate_command_handler() - if not lock(lock_name, expire=1800): - return - try: - app_schema_version = get_schema_version() - superdesk_schema_version = get_app_config("SCHEMA_VERSION", superdesk.SCHEMA_VERSION) - if app_schema_version < superdesk_schema_version: - print("Updating schema from version {} to {}.".format(app_schema_version, superdesk_schema_version)) - update_schema() - set_schema_version(superdesk_schema_version) - else: - print("App already at version ({}).".format(app_schema_version)) - finally: - unlock(lock_name) +async def schema_migrate_command_handler(): + lock_name = "schema:migrate" + if not lock(lock_name, expire=1800): + return -superdesk.command("schema:migrate", SchemaMigrateCommand()) + try: + app_schema_version = get_schema_version() + superdesk_schema_version = get_app_config("SCHEMA_VERSION", superdesk.SCHEMA_VERSION) + if app_schema_version < superdesk_schema_version: + print("Updating schema from version {} to {}.".format(app_schema_version, superdesk_schema_version)) + update_schema() + set_schema_version(superdesk_schema_version) + else: + print("App already at version ({}).".format(app_schema_version)) + finally: + unlock(lock_name) diff --git a/superdesk/factory/app.py b/superdesk/factory/app.py index 98ea23a972..0b0a638079 100644 --- a/superdesk/factory/app.py +++ b/superdesk/factory/app.py @@ -28,7 +28,7 @@ from babel import parse_locale from pymongo.errors import DuplicateKeyError -from quart import Quart +from superdesk.commands import configure_cli from superdesk.flask import g, url_for, Config, Request as FlaskRequest, abort, Blueprint, request as flask_request from superdesk.celery_app import init_celery from superdesk.datalayer import SuperdeskDataLayer # noqa @@ -39,6 +39,7 @@ from superdesk.validator import SuperdeskValidator from superdesk.json_utils import SuperdeskFlaskJSONProvider, SuperdeskJSONEncoder from superdesk.cache import cache_backend + from .elastic_apm import setup_apm from superdesk.core.app import SuperdeskAsyncApp from superdesk.core.web import Endpoint, Request, EndpointGroup, HTTP_METHOD, Response @@ -400,6 +401,10 @@ def install_app(module_name): configure_logging(app.config["LOG_CONFIG_FILE"]) + # configure the CLI only after modules and apps have been loaded + # to make sure all commands are registered + configure_cli(app) + return app diff --git a/superdesk/flask.py b/superdesk/flask.py index bb1daeaa35..795fcad226 100644 --- a/superdesk/flask.py +++ b/superdesk/flask.py @@ -36,6 +36,7 @@ abort, send_file, ) +from quart.cli import AppGroup from quart.json.provider import DefaultJSONProvider @@ -60,4 +61,5 @@ "Request", "abort", "send_file", + "AppGroup", ] diff --git a/superdesk/tests/environment.py b/superdesk/tests/environment.py index 14ef12e1a2..c7875bbd1a 100644 --- a/superdesk/tests/environment.py +++ b/superdesk/tests/environment.py @@ -15,7 +15,7 @@ from superdesk.core import json from apps.prepopulate.app_populate import AppPopulateCommand -from apps.prepopulate.app_initialize import AppInitializeWithDataCommand +from apps.prepopulate.app_initialize import app_initialize_data_handler from superdesk import tests from superdesk.factory.app import get_app from superdesk.tests import setup_auth_user @@ -84,8 +84,7 @@ async def setup_before_scenario(context, scenario, config, app_factory): if scenario.status != "skipped" and "app_init" in scenario.tags: async with context.app.app_context(): - command = AppInitializeWithDataCommand() - command.run() + await app_initialize_data_handler() def before_all(context): diff --git a/superdesk/tests/steps.py b/superdesk/tests/steps.py index eb762a74ee..cf26adfbe1 100644 --- a/superdesk/tests/steps.py +++ b/superdesk/tests/steps.py @@ -54,7 +54,7 @@ from apps.dictionaries.resource import DICTIONARY_FILE from superdesk.filemeta import get_filemeta from apps.preferences import enhance_document_with_default_prefs -from apps.prepopulate.app_initialize import AppInitializeWithDataCommand +from apps.prepopulate.app_initialize import app_initialize_data_handler # for auth server from authlib.jose import jwt @@ -712,9 +712,10 @@ def retrieve_and_parse_side_effect(ftp, config, filename, provider, registered_p ("ninjs2.json", "20181111123456"), ("ninjs3.json", "20181111123456"), ) - with mock.patch.object( - ftp.FTPFeedingService, "_retrieve_and_parse" - ) as retrieve_and_parse_mock, mock.patch.object(ftp.FTPFeedingService, "_is_empty") as empty_file_mock: + with ( + mock.patch.object(ftp.FTPFeedingService, "_retrieve_and_parse") as retrieve_and_parse_mock, + mock.patch.object(ftp.FTPFeedingService, "_is_empty") as empty_file_mock, + ): retrieve_and_parse_mock.side_effect = retrieve_and_parse_side_effect empty_file_mock.return_value = False # run command @@ -3045,7 +3046,7 @@ async def step_impl_then_we_dont_get_access_token(context): @async_run_until_complete async def setp_impl_when_we_init_data(context, entity): async with context.app.app_context(): - AppInitializeWithDataCommand().run(entity) + await app_initialize_data_handler(entity) @when('we run task "{name}"') diff --git a/tests/auth/create_user_command_test.py b/tests/auth/create_user_command_test.py index c381915b14..cd22cdb1ed 100644 --- a/tests/auth/create_user_command_test.py +++ b/tests/auth/create_user_command_test.py @@ -12,19 +12,18 @@ from os.path import dirname, join from superdesk.tests import TestCase, markers from superdesk.utc import utcnow -from apps.auth.db.commands import CreateUserCommand, ImportUsersCommand +from apps.auth.db.commands import create_user_command_handler, ImportUsersCommand class CreateUserCommandTestCase(TestCase): async def test_create_user_command(self): if not self.app.config.get("LDAP_SERVER"): user = {"username": "foo", "password": "bar", "email": "baz", "password_changed_on": utcnow()} - cmd = CreateUserCommand() - await cmd.run(user["username"], user["password"], user["email"], admin=True) + await create_user_command_handler(user["username"], user["password"], user["email"], admin=True) auth_user = get_resource_service("auth_db").authenticate(user) self.assertEquals(auth_user["username"], user["username"]) - await cmd.run(user["username"], user["password"], user["email"], admin=True) + await create_user_command_handler(user["username"], user["password"], user["email"], admin=True) auth_user2 = get_resource_service("auth_db").authenticate(user) self.assertEquals(auth_user2["username"], user["username"]) self.assertEquals(auth_user2["_id"], auth_user["_id"]) @@ -32,9 +31,9 @@ async def test_create_user_command(self): async def test_create_user_command_no_update(self): if not self.app.config.get("LDAP_SERVER"): user = {"username": "foo", "password": "bar", "email": "baz", "password_changed_on": utcnow()} - cmd = CreateUserCommand() - await cmd.run(user["username"], user["password"], user["email"], admin=True) - await cmd.run(user["username"], "new_password", user["email"], admin=True) + cmd = create_user_command_handler + await cmd(user["username"], user["password"], user["email"], admin=True) + await cmd(user["username"], "new_password", user["email"], admin=True) get_resource_service("auth_db").authenticate(user) async def test_import_users(self): diff --git a/tests/commands/data_updates_test.py b/tests/commands/data_updates_test.py index 31799f3e07..b0c1165a6c 100644 --- a/tests/commands/data_updates_test.py +++ b/tests/commands/data_updates_test.py @@ -4,7 +4,13 @@ import superdesk.commands.data_updates from superdesk import get_resource_service -from superdesk.commands.data_updates import get_data_updates_files, GenerateUpdate, Upgrade, Downgrade, get_dirs +from superdesk.commands.data_updates import ( + get_data_updates_files, + GenerateUpdate, + get_dirs, + upgrade_command_handler, + downgrade_command_handler, +) from superdesk.tests import TestCase # change the folder where to store updates for test purpose @@ -62,7 +68,7 @@ async def test_dry_data_update(self): """ self.assertEqual(self.number_of_data_updates_applied(), 0) GenerateUpdate().run(resource_name="data_updates") - Upgrade().run(dry=True) + await upgrade_command_handler(dry=True) self.assertEqual(self.number_of_data_updates_applied(), 0) async def test_fake_data_update(self): @@ -71,9 +77,9 @@ async def test_fake_data_update(self): superdesk.commands.data_updates.DEFAULT_DATA_UPDATE_BW_IMPLEMENTATION = "raise Exception()" GenerateUpdate().run(resource_name="data_updates") self.assertEqual(self.number_of_data_updates_applied(), 0) - Upgrade().run(fake=True) + await upgrade_command_handler(fake=True) self.assertEqual(self.number_of_data_updates_applied(), 1) - Downgrade().run(fake=True) + await downgrade_command_handler(fake=True) self.assertEqual(self.number_of_data_updates_applied(), 0) async def test_data_update(self): @@ -98,15 +104,15 @@ async def test_data_update(self): GenerateUpdate().run(resource_name="data_updates") assert self.number_of_data_updates_applied() == 0 thirdieth_update = get_data_updates_files(True)[29] - Upgrade().run(data_update_id=thirdieth_update) + await upgrade_command_handler(data_update_id=thirdieth_update) assert self.number_of_data_updates_applied() == 30 - Upgrade().run() + await upgrade_command_handler() assert self.number_of_data_updates_applied() == 40 - Downgrade().run() + await downgrade_command_handler() assert self.number_of_data_updates_applied() == 39 - Downgrade().run(data_update_id=thirdieth_update) + await downgrade_command_handler(data_update_id=thirdieth_update) assert self.number_of_data_updates_applied() == 29 - Upgrade().run() + await upgrade_command_handler() def test_multiple_dirs(self): with mock.patch.dict(self.app.config, {"APPS_DATA_UPDATES_PATHS": ["/tmp/foo", "tmp/bar"]}): diff --git a/tests/prepopulate/app_initialization_test.py b/tests/prepopulate/app_initialization_test.py index 54345e7b5e..79b3383ac3 100644 --- a/tests/prepopulate/app_initialization_test.py +++ b/tests/prepopulate/app_initialization_test.py @@ -4,7 +4,7 @@ from unittest.mock import patch from superdesk.core import json -from apps.prepopulate.app_initialize import AppInitializeWithDataCommand +from apps.prepopulate.app_initialize import app_initialize_data_handler from apps.prepopulate.app_scaffold_data import AppScaffoldDataCommand from apps.prepopulate.app_initialize import fillEnvironmentVariables from superdesk import get_resource_service @@ -13,8 +13,8 @@ class AppInitializeWithDataCommandTestCase(TestCase): async def _run(self, *a, **kw): - command = AppInitializeWithDataCommand() - return await command.run(*a, **kw) + command = app_initialize_data_handler + return await command(*a, **kw) async def test_app_initialization(self): result = await self._run() From d3de447edeb5840b29091604872dc4416090d8bf Mon Sep 17 00:00:00 2001 From: Helmy Giacoman Date: Wed, 4 Sep 2024 10:20:13 +0200 Subject: [PATCH 2/4] [SDESK-7284] Launch superdesk test instance on Github Actions (#2684) * Implement new async_cli module This module includes a custom Blueprint, AppGroup and a helper to easily create a command blueprint ready to register commands based on async function. The registered async commands will be run sync using `asgiref.sync.async_to_sync` SDESK-7284 * Rework `app:initialize_data` command The command was converted from the old class based commands into a function one using `Click` and our new `cli.register_async_command` decorator in order to use our new async services in the synchronous context of Quart's command line interface SDESK-7284 * Update docker-compose.yml * Fix type and docstrings * Rework `users:create` command Command converted from the old class based commands into a function one using `Click` and our new `cli.register_async_command` decorator. SDESK-7284 * Rework `schema:migrate` command SDESK-7284 * Update app_initialize.py * Rework `data:upgrade` and `data:downgrade` commands SDESK-7284 * Remove not needed false return * Fix async commands to use proper app context Async commands fail when using app context as they are executed in different threads or outside of the normal request context lifecycle. In order to fix that, we set the current app instance into the AsyncAppGroup so later it can be passed optionally to the command at the moment of its execution. SDESK-7284 * Fix `schema:migrate` command SDESK-7284 * Small improvement to use of app context in async commands SDESK-7284 * Implement basic Github Actions test instance SDESK-7284 * Removing not needed settings SDESK-7284 --- .github/workflows/test-instance.yml | 50 +++++++++++++++++++++ tests/test_instance/README.md | 3 ++ tests/test_instance/server/app.py | 10 +++++ tests/test_instance/server/manage.py | 15 +++++++ tests/test_instance/server/requirements.txt | 4 ++ 5 files changed, 82 insertions(+) create mode 100644 .github/workflows/test-instance.yml create mode 100644 tests/test_instance/README.md create mode 100644 tests/test_instance/server/app.py create mode 100644 tests/test_instance/server/manage.py create mode 100644 tests/test_instance/server/requirements.txt diff --git a/.github/workflows/test-instance.yml b/.github/workflows/test-instance.yml new file mode 100644 index 0000000000..bbe3540db9 --- /dev/null +++ b/.github/workflows/test-instance.yml @@ -0,0 +1,50 @@ +name: "Spawn Test Instance" + +on: [push, pull_request] + +jobs: + pytest: + runs-on: ubuntu-latest + + strategy: + matrix: + python-version: ['3.10'] + + services: + mongodb: + image: mongo:6 + ports: + - 27017:27017 + redis: + image: redis:alpine + ports: + - 6379:6379 + elastic: + image: docker.elastic.co/elasticsearch/elasticsearch:7.17.22 + ports: + - 9200:9200 + env: + discovery.type: single-node + + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - run: ./scripts/tests_setup + + - name: Install dependencies + working-directory: ./tests/test_instance/server + run: | + pip install -U pip wheel setuptools + pip install -r requirements.txt + + - name: Prepare instance + working-directory: ./tests/test_instance/server + run: | + python manage.py app:initialize_data + python manage.py users:create -u admin -p admin -e admin@example.com --admin + # python manage.py data:upgrade # until we decide what to do + python manage.py schema:migrate + + diff --git a/tests/test_instance/README.md b/tests/test_instance/README.md new file mode 100644 index 0000000000..83a2e0b655 --- /dev/null +++ b/tests/test_instance/README.md @@ -0,0 +1,3 @@ +### Superdesk Test Instance + +This is intended to be run from Github Actions. It contains a basic superdesk server setup in order to launch a test instance from there. \ No newline at end of file diff --git a/tests/test_instance/server/app.py b/tests/test_instance/server/app.py new file mode 100644 index 0000000000..2aeae18386 --- /dev/null +++ b/tests/test_instance/server/app.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8; -*- +# +# This file is part of Superdesk. +# +# Copyright 2013 to present Sourcefabric z.u. and contributors. +# +# For the full copyright and license information, please see the +# AUTHORS and LICENSE files distributed with this source code + +from superdesk.factory import get_app as create_app # noqa diff --git a/tests/test_instance/server/manage.py b/tests/test_instance/server/manage.py new file mode 100644 index 0000000000..e25ae579ea --- /dev/null +++ b/tests/test_instance/server/manage.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8; -*- +# +# This file is part of Superdesk. +# +# Copyright 2013 to present Sourcefabric z.u. and contributors. +# +# For the full copyright and license information, please see the +# AUTHORS and LICENSE files distributed with this source code + +"""Superdesk Manager""" + +from quart.cli import main + +if __name__ == "__main__": + main() diff --git a/tests/test_instance/server/requirements.txt b/tests/test_instance/server/requirements.txt new file mode 100644 index 0000000000..0edc55c1be --- /dev/null +++ b/tests/test_instance/server/requirements.txt @@ -0,0 +1,4 @@ +-e ../../../. + +-e git+https://github.com/superdesk/superdesk-planning.git@async#egg=superdesk_planning +-e git+https://github.com/superdesk/sams.git@develop#egg=sams_client&subdirectory=src/clients/python/ \ No newline at end of file From 938774b48981097d478bb120022bf5bcaecb8b6e Mon Sep 17 00:00:00 2001 From: MarkLark86 Date: Thu, 5 Sep 2024 16:38:52 +1000 Subject: [PATCH 3/4] [SDESK-7373] Support default sort config in async resources (#2685) * Move common types to superdesk.core.types * support default sort resource config * improve: add `to_list` to ResourceCursorAsync * improve: allow kwargs to AsyncResourceService.find * improve: support web endpoint with empty args * add/fix tests * update docs * fix mypy error * Remove Literal import --- docs/core/resource_management.rst | 10 --- docs/core/types.rst | 20 +++++ docs/index.rst | 1 + superdesk/core/elastic/async_client.py | 2 +- superdesk/core/elastic/base_client.py | 13 +-- superdesk/core/elastic/common.py | 4 +- superdesk/core/elastic/sync_client.py | 2 +- superdesk/core/resources/cursor.py | 75 ++-------------- superdesk/core/resources/model.py | 6 ++ .../core/resources/resource_rest_endpoints.py | 2 +- superdesk/core/resources/service.py | 45 +++++++++- superdesk/core/types.py | 90 +++++++++++++++++++ superdesk/core/web/rest_endpoints.py | 2 +- superdesk/core/web/types.py | 11 ++- superdesk/tests/__init__.py | 3 + tests/core/elastic_async_test.py | 2 +- tests/core/elastic_sync_test.py | 2 +- tests/core/modules/users/types.py | 6 +- tests/core/resource_model_test.py | 6 +- tests/core/resource_service_test.py | 74 ++++++++++++++- 20 files changed, 275 insertions(+), 101 deletions(-) create mode 100644 docs/core/types.rst create mode 100644 superdesk/core/types.py diff --git a/docs/core/resource_management.rst b/docs/core/resource_management.rst index 466e9a6c5b..ec4ec445a9 100644 --- a/docs/core/resource_management.rst +++ b/docs/core/resource_management.rst @@ -62,14 +62,6 @@ Search References :member-order: bysource :members: -.. autoclass:: superdesk.core.resources.cursor.SearchArgs - :member-order: bysource - :members: - -.. autoclass:: superdesk.core.resources.cursor.SearchRequest - :member-order: bysource - :members: - .. autoclass:: superdesk.core.resources.cursor.ResourceCursorAsync :member-order: bysource :members: @@ -81,5 +73,3 @@ Search References .. autoclass:: superdesk.core.resources.cursor.MongoResourceCursorAsync :member-order: bysource :members: - -.. autodata:: superdesk.core.resources.cursor.ProjectedFieldArg diff --git a/docs/core/types.rst b/docs/core/types.rst new file mode 100644 index 0000000000..b5312d676c --- /dev/null +++ b/docs/core/types.rst @@ -0,0 +1,20 @@ +.. core_types: + +Types +===== + +.. autodata:: superdesk.core.types.ProjectedFieldArg + +.. autodata:: superdesk.core.types.SortListParam + +.. autodata:: superdesk.core.types.SortParam + +.. autodata:: superdesk.core.types.VersionParam + +.. autoclass:: superdesk.core.types.SearchArgs + :member-order: bysource + :members: + +.. autoclass:: superdesk.core.types.SearchRequest + :member-order: bysource + :members: diff --git a/docs/index.rst b/docs/index.rst index 913c8932a5..e3761e06f9 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -51,6 +51,7 @@ existing code to the new framework. core/mongo core/elastic core/storage + core/types .. _reference: diff --git a/superdesk/core/elastic/async_client.py b/superdesk/core/elastic/async_client.py index 033918edc3..641f784561 100644 --- a/superdesk/core/elastic/async_client.py +++ b/superdesk/core/elastic/async_client.py @@ -14,7 +14,7 @@ from elasticsearch.exceptions import NotFoundError, TransportError, RequestError from elasticsearch.helpers import async_bulk -from ..resources.cursor import SearchRequest +from superdesk.core.types import SearchRequest from .base_client import BaseElasticResourceClient, ElasticCursor, InvalidSearchString diff --git a/superdesk/core/elastic/base_client.py b/superdesk/core/elastic/base_client.py index 3cd787c85f..7154e052a5 100644 --- a/superdesk/core/elastic/base_client.py +++ b/superdesk/core/elastic/base_client.py @@ -15,7 +15,7 @@ from eve.io.mongo.parser import parse from superdesk.errors import SuperdeskApiError -from ..resources.cursor import ProjectedFieldArg, SearchRequest +from superdesk.core.types import ProjectedFieldArg, SearchRequest, SortParam from .common import ElasticClientConfig, ElasticResourceConfig @@ -172,8 +172,7 @@ def _get_find_args( if "sort" not in query: if req.sort: - sort = ast.literal_eval(req.sort) - _set_sort(query, sort) + _set_sort(query, req.sort) elif self.resource_config.default_sort: _set_sort(query, self.resource_config.default_sort) @@ -282,9 +281,13 @@ def _format_doc(self, hit: Dict[str, Any]): return doc -def _set_sort(query, sort): +def _set_sort(query: dict, sort: SortParam | None) -> None: + if sort is None: + return + query["sort"] = [] - for key, sortdir in sort: + sort_list = ast.literal_eval(sort) if isinstance(sort, str) else sort + for key, sortdir in sort_list: sort_dict = dict([(key, "asc" if sortdir > 0 else "desc")]) query["sort"].append(sort_dict) diff --git a/superdesk/core/elastic/common.py b/superdesk/core/elastic/common.py index 95295bf6ce..b37f489e5f 100644 --- a/superdesk/core/elastic/common.py +++ b/superdesk/core/elastic/common.py @@ -14,7 +14,7 @@ from uuid import uuid4 from ..config import ConfigModel -from ..resources.cursor import SearchRequest +from superdesk.core.types import SearchRequest, SortParam @dataclass @@ -25,7 +25,7 @@ class ElasticResourceConfig: prefix: str = "ELASTICSEARCH" #: The default sort - default_sort: Optional[str] = None + default_sort: SortParam | None = None #: The default maximum number of documents to be returned default_max_results: Optional[int] = None diff --git a/superdesk/core/elastic/sync_client.py b/superdesk/core/elastic/sync_client.py index 6dfc110a5d..b49cc89311 100644 --- a/superdesk/core/elastic/sync_client.py +++ b/superdesk/core/elastic/sync_client.py @@ -14,7 +14,7 @@ from elasticsearch.exceptions import NotFoundError, TransportError, RequestError from elasticsearch.helpers import bulk -from ..resources.cursor import SearchRequest +from superdesk.core.types import SearchRequest from .base_client import BaseElasticResourceClient, ElasticCursor, InvalidSearchString diff --git a/superdesk/core/resources/cursor.py b/superdesk/core/resources/cursor.py index 7d894997ee..fbbafa5b5b 100644 --- a/superdesk/core/resources/cursor.py +++ b/superdesk/core/resources/cursor.py @@ -8,76 +8,11 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license -from typing import Dict, Any, Generic, TypeVar, Type, Optional, List, Union, Literal -from typing_extensions import TypedDict +from typing import Dict, Any, Generic, TypeVar, Type, Optional, List -from pydantic import BaseModel, ConfigDict from motor.motor_asyncio import AsyncIOMotorCollection, AsyncIOMotorCursor -#: The data type for projections, either a list of field names, or a dictionary containing -#: the field and enable/disable state -ProjectedFieldArg = Union[List[str], Dict[str, Literal[0]], Dict[str, Literal[1]]] - - -class SearchArgs(TypedDict, total=False): - """Dictionary containing Elasticsearch search arguments - - This is for use with the `.find` methods in elastic clients - """ - - #: A JSON string containing an elasticsearch query - source: str - - #: A query string - q: str - - #: Default field, for use with the query string - df: str - - #: Default operator, for use with the query string (defaults to "AND") - default_operator: str - - #: A JSON string containing bool query filters, to be applied to the elastic query - filter: str - - #: A list of dictionaries containing bool query filters, to be applied to the elastic query - filters: List[Dict[str, Any]] - - #: A JSON string containing the field projections to filter out the returned fields - projections: str - - -class SearchRequest(BaseModel): - """Dataclass containing Elasticsearch request arguments""" - - model_config = ConfigDict(extra="allow") - - #: Argument for the search filters - args: Optional[SearchArgs] = None - - #: Sorting to be used - sort: Optional[str] = None - - #: Maximum number of documents to be returned - max_results: int = 25 - - #: The page number to be returned - page: int = 1 - - #: A JSON string containing an Elasticsearch where query - where: Optional[Union[str, Dict]] = None - - #: If `True`, will include aggregations with the result - aggregations: bool = False - - #: If `True`, will include highlights with the result - highlight: bool = False - - #: The field projections to be applied - projection: Optional[ProjectedFieldArg] = None - - ResourceModelType = TypeVar("ResourceModelType", bound="ResourceModel") @@ -94,6 +29,14 @@ async def __anext__(self) -> ResourceModelType: async def next_raw(self) -> Optional[Dict[str, Any]]: raise NotImplementedError() + async def to_list(self) -> List[ResourceModelType]: + items: List[ResourceModelType] = [] + item = await self.next_raw() + while item is not None: + items.append(self.get_model_instance(item)) + item = await self.next_raw() + return items + async def to_list_raw(self) -> List[Dict[str, Any]]: items: List[Dict[str, Any]] = [] item = await self.next_raw() diff --git a/superdesk/core/resources/model.py b/superdesk/core/resources/model.py index fdc5d3a0af..a6580b3fb7 100644 --- a/superdesk/core/resources/model.py +++ b/superdesk/core/resources/model.py @@ -28,6 +28,7 @@ from pydantic_core import InitErrorDetails, PydanticCustomError from pydantic.dataclasses import dataclass as pydataclass +from superdesk.core.types import SortListParam from .fields import ObjectId @@ -201,6 +202,9 @@ class ResourceConfig: #: Optional list of resource fields to ignore when generating the etag etag_ignore_fields: Optional[list[str]] = None + #: Optional sorting for this resource + default_sort: SortListParam | None = None + class Resources: """A high level resource class used to manage all resources in the system""" @@ -239,6 +243,8 @@ def register(self, config: ResourceConfig): ) if config.elastic is not None: + if config.default_sort: + config.elastic.default_sort = config.default_sort self.app.elastic.register_resource_config( config.name, config.elastic, diff --git a/superdesk/core/resources/resource_rest_endpoints.py b/superdesk/core/resources/resource_rest_endpoints.py index 9fc5fc0f01..ad1b94044f 100644 --- a/superdesk/core/resources/resource_rest_endpoints.py +++ b/superdesk/core/resources/resource_rest_endpoints.py @@ -18,12 +18,12 @@ from superdesk.core.app import get_current_async_app from superdesk.errors import SuperdeskApiError +from superdesk.core.types import SearchRequest, SearchArgs from ..web.types import HTTP_METHOD, Request, Response, RestGetResponse from ..web.rest_endpoints import RestEndpoints, ItemRequestViewArgs from .model import ResourceConfig, ResourceModel -from .cursor import SearchRequest, SearchArgs from .validators import convert_pydantic_validation_error_for_response from .utils import resource_uses_objectid_for_id diff --git a/superdesk/core/resources/service.py b/superdesk/core/resources/service.py index 7a739d9622..dbd1947e26 100644 --- a/superdesk/core/resources/service.py +++ b/superdesk/core/resources/service.py @@ -21,6 +21,7 @@ cast, Tuple, Literal, + overload, ) import logging import ast @@ -35,10 +36,11 @@ from superdesk.errors import SuperdeskApiError from superdesk.utc import utcnow from superdesk.json_utils import SuperdeskJSONEncoder +from superdesk.core.types import SearchRequest, SortListParam, SortParam from ..app import SuperdeskAsyncApp, get_current_async_app from .fields import ObjectId as ObjectIdField -from .cursor import ElasticsearchResourceCursorAsync, MongoResourceCursorAsync, ResourceCursorAsync, SearchRequest +from .cursor import ElasticsearchResourceCursorAsync, MongoResourceCursorAsync, ResourceCursorAsync from .utils import resource_uses_objectid_for_id logger = logging.getLogger(__name__) @@ -414,19 +416,52 @@ async def get_all_batch(self, size=500, max_iterations=10000, lookup=None) -> As else: logger.warning(f"Not enough iterations for resource {self.resource_name}") + @overload async def find(self, req: SearchRequest) -> ResourceCursorAsync[ResourceModelType]: + ... + + @overload + async def find( + self, req: dict, page: int = 1, max_results: int = 25, sort: SortParam | None = None + ) -> ResourceCursorAsync[ResourceModelType]: + ... + + async def find( + self, + req: SearchRequest | dict, + page: int = 1, + max_results: int = 25, + sort: SortParam | None = None, + ) -> ResourceCursorAsync[ResourceModelType]: """Find items from the resource using Elasticsearch - :param req: A SearchRequest instance with the search params to be used + :param req: SearchRequest instance, or a lookup dictionary, for the search params to be used + :param page: The page number to retrieve (defaults to 1) + :param max_results: The maximum number of results to retrieve per page (defaults to 25) + :param sort: The sort order to use (defaults to resource default sort, or not sorting applied) :return: An async iterable with ``ResourceModel`` instances :raises SuperdeskApiError.notFoundError: If Elasticsearch is not configured """ + search_request = ( + req + if isinstance(req, SearchRequest) + else SearchRequest( + where=req if req else None, + page=page, + max_results=max_results, + sort=sort, + ) + ) + + if search_request.sort is None: + search_request.sort = self.config.default_sort + try: - cursor, count = await self.elastic.find(req) + cursor, count = await self.elastic.find(search_request) return ElasticsearchResourceCursorAsync(self.config.data_class, cursor.hits) except KeyError: - return await self._mongo_find(req) + return await self._mongo_find(search_request) async def _mongo_find(self, req: SearchRequest) -> MongoResourceCursorAsync: args: Dict[str, Any] = {} @@ -450,6 +485,8 @@ async def _mongo_find(self, req: SearchRequest) -> MongoResourceCursorAsync: def _convert_req_to_mongo_sort(self, req: SearchRequest) -> List[Tuple[str, Literal[1, -1]]]: if not req.sort: return [] + elif isinstance(req.sort, list): + return req.sort client_sort: List[Tuple[str, Literal[1, -1]]] = [] try: diff --git a/superdesk/core/types.py b/superdesk/core/types.py new file mode 100644 index 0000000000..73ad09fe24 --- /dev/null +++ b/superdesk/core/types.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8; -*- +# +# This file is part of Superdesk. +# +# Copyright 2024 Sourcefabric z.u. and contributors. +# +# For the full copyright and license information, please see the +# AUTHORS and LICENSE files distributed with this source code, or +# at https://www.sourcefabric.org/superdesk/license + +from typing import Dict, Any, Optional, List, Union, Literal +from typing_extensions import TypedDict + +from pydantic import BaseModel, ConfigDict, NonNegativeInt + + +#: The data type for projections, either a list of field names, or a dictionary containing +#: the field and enable/disable state +ProjectedFieldArg = Union[List[str], Dict[str, Literal[0]], Dict[str, Literal[1]]] + +#: Type used to provide list of sort params to be used +SortListParam = list[tuple[str, Literal[1, -1]]] + +#: Type used for sort param in service requests +#: can be a string, which will convert to an :attr:`SortListParam` type +SortParam = str | SortListParam + +#: Type used for version param in service requests +#: Can be either ``"all"`` or an int ``>= 0`` +VersionParam = Literal["all"] | NonNegativeInt + + +class SearchArgs(TypedDict, total=False): + """Dictionary containing Elasticsearch search arguments + + This is for use with the `.find` methods in elastic clients + """ + + #: A JSON string containing an elasticsearch query + source: str + + #: A query string + q: str + + #: Default field, for use with the query string + df: str + + #: Default operator, for use with the query string (defaults to "AND") + default_operator: str + + #: A JSON string containing bool query filters, to be applied to the elastic query + filter: str + + #: A list of dictionaries containing bool query filters, to be applied to the elastic query + filters: List[Dict[str, Any]] + + #: A JSON string containing the field projections to filter out the returned fields + projections: str + + version: VersionParam | None + + +class SearchRequest(BaseModel): + """Dataclass containing Elasticsearch request arguments""" + + model_config = ConfigDict(extra="allow") + + #: Argument for the search filters + args: Optional[SearchArgs] = None + + #: Sorting to be used + sort: SortParam | None = None + + #: Maximum number of documents to be returned + max_results: int = 25 + + #: The page number to be returned + page: int = 1 + + #: A JSON string contianing an Elasticsearch where query + where: str | dict | None = None + + #: If `True`, will include aggregations with the result + aggregations: bool = False + + #: If `True`, will include highlights with the result + highlight: bool = False + + #: The field projections to be applied + projection: Optional[ProjectedFieldArg] = None diff --git a/superdesk/core/web/rest_endpoints.py b/superdesk/core/web/rest_endpoints.py index c925473bc4..6bdb4e306e 100644 --- a/superdesk/core/web/rest_endpoints.py +++ b/superdesk/core/web/rest_endpoints.py @@ -12,8 +12,8 @@ from pydantic import BaseModel +from superdesk.core.types import SearchRequest from .types import Endpoint, EndpointGroup, HTTP_METHOD, Request, Response -from ..resources.cursor import SearchRequest class ItemRequestViewArgs(BaseModel): diff --git a/superdesk/core/web/types.py b/superdesk/core/web/types.py index 52e081d146..edb7bbf200 100644 --- a/superdesk/core/web/types.py +++ b/superdesk/core/web/types.py @@ -53,6 +53,9 @@ class Response: #: #: Supported endpoint signatures:: #: +#: # Response only +#: async def test() -> Response: +#: #: # Request Only #: async def test1(request: Request) -> Response #: @@ -77,6 +80,10 @@ class Response: #: request: Request #: ) -> Response EndpointFunction = Union[ + Callable[ + [], + Awaitable[Response], + ], Callable[ ["Request"], Awaitable[Response], @@ -129,7 +136,9 @@ def __init__( def __call__(self, args: Dict[str, Any], params: Dict[str, Any], request: "Request"): func_params = signature(self.func).parameters - if "args" not in func_params and "params" not in func_params: + if not len(func_params): + return self.func() # type: ignore[call-arg,arg-type] + elif "args" not in func_params and "params" not in func_params: return self.func(request) # type: ignore[call-arg,arg-type] arg_type = func_params["args"] if "args" in func_params else None diff --git a/superdesk/tests/__init__.py b/superdesk/tests/__init__.py index 54b7f816b6..da9b2b8435 100644 --- a/superdesk/tests/__init__.py +++ b/superdesk/tests/__init__.py @@ -565,6 +565,9 @@ def get_fixture_path(self, filename): rootpath = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) return os.path.join(rootpath, "features", "steps", "fixtures", filename) + def assertDictContains(self, source: dict, contains: dict): + self.assertDictEqual({key: val for key, val in source.items() if key in contains}, contains) + class TestClient(QuartClient): def model_instance_to_json(self, model_instance: ResourceModel): diff --git a/tests/core/elastic_async_test.py b/tests/core/elastic_async_test.py index 4060c2898b..6601af5428 100644 --- a/tests/core/elastic_async_test.py +++ b/tests/core/elastic_async_test.py @@ -1,7 +1,7 @@ import simplejson as json from superdesk.tests import AsyncTestCase -from superdesk.core.resources.cursor import SearchRequest +from superdesk.core.types import SearchRequest from .modules.users import User from .fixtures.users import john_doe diff --git a/tests/core/elastic_sync_test.py b/tests/core/elastic_sync_test.py index 804444bb39..7c2c21d706 100644 --- a/tests/core/elastic_sync_test.py +++ b/tests/core/elastic_sync_test.py @@ -1,7 +1,7 @@ import simplejson as json from superdesk.tests import AsyncTestCase -from superdesk.core.resources.cursor import SearchRequest +from superdesk.core.types import SearchRequest from .modules.users import User from .fixtures.users import john_doe diff --git a/tests/core/modules/users/types.py b/tests/core/modules/users/types.py index 130569411f..dacffaf37b 100644 --- a/tests/core/modules/users/types.py +++ b/tests/core/modules/users/types.py @@ -31,10 +31,10 @@ class MyCustomString(str, fields.CustomStringField): class User(ResourceModel): - first_name: str - last_name: str + first_name: fields.TextWithKeyword + last_name: fields.TextWithKeyword email: Annotated[ - Optional[str], + Optional[fields.TextWithKeyword], validators.validate_email(), validators.validate_iunique_value_async(resource_name="users_async", field_name="email"), ] = None diff --git a/tests/core/resource_model_test.py b/tests/core/resource_model_test.py index 499d7fb5c4..0ad0f8ef41 100644 --- a/tests/core/resource_model_test.py +++ b/tests/core/resource_model_test.py @@ -62,9 +62,9 @@ def test_elastic_mapping(self): "_created": {"type": "date"}, "_updated": {"type": "date"}, "_etag": {"type": "text"}, - "first_name": {"type": "text"}, - "last_name": {"type": "text"}, - "email": {"type": "text"}, + "first_name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, + "last_name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, + "email": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, "name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, "username": {"type": "text"}, "code": {"type": "keyword"}, diff --git a/tests/core/resource_service_test.py b/tests/core/resource_service_test.py index 70987e4349..9f0903192f 100644 --- a/tests/core/resource_service_test.py +++ b/tests/core/resource_service_test.py @@ -4,12 +4,14 @@ import simplejson as json from bson import ObjectId -from superdesk.core.resources.cursor import SearchRequest +from superdesk.core.types import SearchRequest +from superdesk.core.elastic.base_client import ElasticCursor from superdesk.utc import utcnow from superdesk.utils import format_time from superdesk.tests import AsyncTestCase + from .modules.users import UserResourceService from .fixtures.users import all_users, john_doe @@ -338,3 +340,73 @@ async def test_elastic_find(self): req = SearchRequest(args={"source": json.dumps(find_query)}) cursor = await self.service.find(req) self.assertEqual(await cursor.count(), 2) + + async def test_sort_param(self): + users = all_users() + await self.service.create(users) + + items = await (await self.service.find({})).to_list() + self.assertEqual(len(items), 3) + self.assertEqual(items[0].id, users[0].id) + self.assertEqual(items[1].id, users[1].id) + self.assertEqual(items[2].id, users[2].id) + + items = await ( + await self.service.find({}, sort=[("last_name.keyword", 1), ("first_name.keyword", 1)]) + ).to_list_raw() + self.assertEqual(len(items), 3) + self.assertDictContains(items[0], dict(first_name="Foo", last_name="Bar")) + self.assertDictContains(items[1], dict(first_name="Jane", last_name="Doe")) + self.assertDictContains(items[2], dict(first_name="John", last_name="Doe")) + + items = await ( + await self.service.find({}, sort=[("last_name.keyword", -1), ("first_name.keyword", -1)]) + ).to_list_raw() + self.assertEqual(len(items), 3) + self.assertDictContains(items[0], dict(first_name="John", last_name="Doe")) + self.assertDictContains(items[1], dict(first_name="Jane", last_name="Doe")) + self.assertDictContains(items[2], dict(first_name="Foo", last_name="Bar")) + + async def test_find_overloads(self): + await self.service.create(all_users()) + + async def assert_es_find_called_with(*args, **kwargs): + expected = kwargs.pop("expected") + self.service.elastic.find = mock.AsyncMock(return_value=(ElasticCursor(), 0)) + await self.service.find(*args, **kwargs) + self.service.elastic.find.assert_called_once_with(expected) + + # Test without any arguments + await assert_es_find_called_with( + SearchRequest(), expected=SearchRequest(where=None, page=1, max_results=25, sort=None) + ) + expected = SearchRequest() + await assert_es_find_called_with(SearchRequest(), expected=expected) + await assert_es_find_called_with({}, expected=expected) + + sort_query = [("last_name.keyword", 1), ("first_name.keyword", 1)] + expected = SearchRequest(sort=sort_query) + await assert_es_find_called_with(SearchRequest(sort=sort_query), expected=expected) + await assert_es_find_called_with({}, sort=sort_query, expected=expected) + + kwargs = dict( + page=2, + max_results=5, + sort=sort_query, + ) + expected = SearchRequest(**kwargs) + await assert_es_find_called_with(SearchRequest(**kwargs), expected=expected) + await assert_es_find_called_with({}, **kwargs, expected=expected) + + # Test with default sort in the resource config + sort_query = [("email.keyword", 1)] + self.service.config.default_sort = sort_query + expected = SearchRequest(sort=sort_query) + await assert_es_find_called_with(SearchRequest(), expected=expected) + await assert_es_find_called_with({}, expected=expected) + + # Test passing in sort param with default sort configured + custom_sort_query = [("scores", 1)] + expected = SearchRequest(sort=custom_sort_query) + await assert_es_find_called_with(SearchRequest(sort=custom_sort_query), expected=expected) + await assert_es_find_called_with({}, sort=custom_sort_query, expected=expected) From b0b3f11687bbdc7f9b1c94b152c9571d74eb849f Mon Sep 17 00:00:00 2001 From: Helmy Giacoman Date: Tue, 10 Sep 2024 11:59:41 +0200 Subject: [PATCH 4/4] [NHUB-534] Implement async cacheable service (#2688) * Implement `AsyncCacheableService` This service is an implementation based on `superdesk.services.CacheableService` NHUB-534 * Update cache backend to avoid app context error This prevents errors of app context not available when running cached async coroutines NHUB-534 * Fix black removed line * Add docstrings NHUB-534 * Update documentation and broken reference NHUB-534 --- docs/conf.py | 2 +- docs/core/resource_management.rst | 8 +- ...e recording.rst => database_recording.rst} | 0 superdesk/cache.py | 25 ++++-- superdesk/core/resources/__init__.py | 3 +- superdesk/core/resources/service.py | 87 +++++++++++++++++-- tests/core/cacheable_service_test.py | 76 ++++++++++++++++ 7 files changed, 183 insertions(+), 18 deletions(-) rename docs/{database recording.rst => database_recording.rst} (100%) create mode 100644 tests/core/cacheable_service_test.py diff --git a/docs/conf.py b/docs/conf.py index 8f9c80ef0e..8f72441e16 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -140,7 +140,7 @@ # documentation. # html_theme_options = { - "description": "Superdesk server reference v1.4", + "description": "Superdesk v3.0.dev.0", } # Add any paths that contain custom themes here, relative to this directory. diff --git a/docs/core/resource_management.rst b/docs/core/resource_management.rst index ec4ec445a9..7556246231 100644 --- a/docs/core/resource_management.rst +++ b/docs/core/resource_management.rst @@ -3,8 +3,8 @@ Resource Management =================== -Resource Services: ------------------- +Resource Services +----------------- The management of resources is performed using the :class:`AsyncResourceService ` class instances. This is similar to how it is done in Superdesk < v3.0, with some slight improvements. @@ -73,3 +73,7 @@ Search References .. autoclass:: superdesk.core.resources.cursor.MongoResourceCursorAsync :member-order: bysource :members: + +.. autoclass:: superdesk.core.resources.service.AsyncCacheableService + :member-order: bysource + :members: diff --git a/docs/database recording.rst b/docs/database_recording.rst similarity index 100% rename from docs/database recording.rst rename to docs/database_recording.rst diff --git a/superdesk/cache.py b/superdesk/cache.py index 523951a4af..6f2374cde1 100644 --- a/superdesk/cache.py +++ b/superdesk/cache.py @@ -8,6 +8,7 @@ from superdesk import json_utils from superdesk.logging import logger +from superdesk.flask import Flask class SuperdeskMangler(hermes.Mangler): @@ -37,7 +38,12 @@ class SuperdeskCacheBackend(hermes.backend.AbstractBackend): or memcached. """ + app: Flask + def init_app(self, app): + # set app instance for later usage in `_backend` + self.app = app + if not hasattr(app, "extensions"): app.extensions = {} @@ -73,13 +79,18 @@ def init_app(self, app): @property def _backend(self): - from superdesk.core import get_current_app - - current_app = get_current_app().as_any() - if not current_app: - raise RuntimeError("You can only use cache within app context.") - self.init_app(current_app) - return current_app.extensions["superdesk_cache"] + # TODO-ASYNC: Figure out how to properly fix this as it throws an error + # of missing app context when run from async coroutines. For the time being, + # using the app instance that is set in the `init_app` method does the trick + + # from superdesk.core import get_current_app + # current_app = get_current_app().as_any() + # if not current_app: + # raise RuntimeError("You can only use cache within app context.") + # self.init_app(current_app) + # return current_app.extensions["superdesk_cache"] + + return self.app.extensions["superdesk_cache"] def lock(self, key): return self._backend.lock(key) diff --git a/superdesk/core/resources/__init__.py b/superdesk/core/resources/__init__.py index d9a05f451c..d26c6e11c9 100644 --- a/superdesk/core/resources/__init__.py +++ b/superdesk/core/resources/__init__.py @@ -10,7 +10,7 @@ from .model import Resources, ResourceModel, ResourceModelWithObjectId, ResourceConfig, dataclass from .resource_rest_endpoints import RestEndpointConfig -from .service import AsyncResourceService +from .service import AsyncResourceService, AsyncCacheableService from ..mongo import MongoResourceConfig, MongoIndexOptions from ..elastic.resources import ElasticResourceConfig @@ -23,6 +23,7 @@ "fields", "RestEndpointConfig", "AsyncResourceService", + "AsyncCacheableService", "MongoResourceConfig", "MongoIndexOptions", "ElasticResourceConfig", diff --git a/superdesk/core/resources/service.py b/superdesk/core/resources/service.py index dbd1947e26..d6cf051221 100644 --- a/superdesk/core/resources/service.py +++ b/superdesk/core/resources/service.py @@ -8,6 +8,7 @@ # AUTHORS and LICENSE files distributed with this source code, or # at https://www.sourcefabric.org/superdesk/license +import random from typing import ( Optional, Generic, @@ -25,18 +26,20 @@ ) import logging import ast +import simplejson as json from copy import deepcopy from hashlib import sha1 from bson import ObjectId, UuidRepresentation from bson.json_util import dumps, DEFAULT_JSON_OPTIONS -import simplejson as json from motor.motor_asyncio import AsyncIOMotorCursor -from superdesk.errors import SuperdeskApiError +from superdesk.flask import g from superdesk.utc import utcnow +from superdesk.cache import cache +from superdesk.errors import SuperdeskApiError from superdesk.json_utils import SuperdeskJSONEncoder -from superdesk.core.types import SearchRequest, SortListParam, SortParam +from superdesk.core.types import SearchRequest, SortParam from ..app import SuperdeskAsyncApp, get_current_async_app from .fields import ObjectId as ObjectIdField @@ -417,14 +420,12 @@ async def get_all_batch(self, size=500, max_iterations=10000, lookup=None) -> As logger.warning(f"Not enough iterations for resource {self.resource_name}") @overload - async def find(self, req: SearchRequest) -> ResourceCursorAsync[ResourceModelType]: - ... + async def find(self, req: SearchRequest) -> ResourceCursorAsync[ResourceModelType]: ... @overload async def find( self, req: dict, page: int = 1, max_results: int = 25, sort: SortParam | None = None - ) -> ResourceCursorAsync[ResourceModelType]: - ... + ) -> ResourceCursorAsync[ResourceModelType]: ... async def find( self, @@ -564,4 +565,76 @@ def filter_ignore_fields(d, fields): return h.hexdigest() +class AsyncCacheableService(AsyncResourceService[ResourceModelType]): + """ + Handles caching for the resource, will invalidate on any changes to the resource. + + Attributes: + resource_name (str): The name of the resource this service handles. + cache_lookup (dict): A dictionary to specify custom lookup parameters for caching. + """ + + cache_lookup = {} + + @property + def cache_key(self) -> str: + return "cached:{}".format(self.resource_name) + + def get_cache(self) -> Any: + """ + Retrieve the cached value from Flask's `g` object if available. + """ + return getattr(g, self.cache_key, None) + + def set_cache(self, value: Any) -> None: + """ + Set the cached value in Flask's `g` object. + """ + setattr(g, self.cache_key, value) + + async def get_cached(self) -> List[Dict[str, Any]]: + """ + Retrieve cached data for the resource. If the cache is empty, fetches data from the database + and sets it in the cache. The cache is automatically refreshed with a time-to-live (TTL). + + Returns: + List[Dict[str, Any]]: A list of dictionaries containing the cached data. + """ + + @cache( + ttl=3600 + random.randrange(1, 300), + tags=(self.resource_name,), + key=lambda fn: f"_cache_mixin:{self.resource_name}", + ) + async def _get_cached_from_db(): + cursor = await self.search(lookup=self.cache_lookup, use_mongo=True) + return await cursor.to_list_raw() + + cached_data = self.get_cache() + + if cached_data is None: + cached_data = await _get_cached_from_db() + self.set_cache(cached_data) + + return cached_data + + async def get_cached_by_id(self, _id: str): + """ + Retrieve a specific resource by its ID from the cached data. If the resource is not found in + the cache, fetches it directly from the database. + + Args: + _id (string): The ID of the resource to retrieve. + + Returns: + Optional[Dict[str, Any]]: A dictionary representing the resource if found, otherwise None. + """ + cached = await self.get_cached() + for item in cached: + if item.get("_id") == _id: + return item + logger.warning("Cound not find item in cache resource=%s id=%s", self.resource_name, _id) + return await self.find_by_id(_id) + + from .model import ResourceConfig, ResourceModel # noqa: E402 diff --git a/tests/core/cacheable_service_test.py b/tests/core/cacheable_service_test.py new file mode 100644 index 0000000000..9a3dbec985 --- /dev/null +++ b/tests/core/cacheable_service_test.py @@ -0,0 +1,76 @@ +from unittest.mock import AsyncMock, patch +from superdesk.tests import AsyncTestCase + +from superdesk.core.module import Module +from superdesk.core.resources import ResourceModel, ResourceConfig, AsyncCacheableService + + +class TestAsyncCacheableService(AsyncCacheableService[ResourceModel]): + resource_name = "tests_cacheable" + + +model_config = ResourceConfig(name="tests_cacheable", data_class=ResourceModel, service=TestAsyncCacheableService) +module = Module(name="tests.cacheable", resources=[model_config]) + + +class AsyncCacheableServiceTestCase(AsyncTestCase): + app_config = {"MONGO_DBNAME": "sptests", "MODULES": ["tests.core.cacheable_service_test"]} + + async def asyncSetUp(self): + await super().asyncSetUp() + self.service = TestAsyncCacheableService() + + self.patcher1 = patch("superdesk.core.resources.service.cache") + self.patcher2 = patch.object(TestAsyncCacheableService, "get_cache") + self.patcher3 = patch.object(TestAsyncCacheableService, "set_cache") + self.patcher4 = patch.object(TestAsyncCacheableService, "search") + + self.mock_cache = self.patcher1.start() + self.mock_get_cache = self.patcher2.start() + self.mock_set_cache = self.patcher3.start() + self.mock_search = self.patcher4.start() + + # make cache decorator to directly return the function + self.mock_cache.side_effect = lambda *args, **kwargs: lambda fn: fn + + async def asyncTearDown(self): + self.addCleanup(self.patcher1.stop) + self.addCleanup(self.patcher2.stop) + self.addCleanup(self.patcher3.stop) + self.addCleanup(self.patcher4.stop) + + await super().asyncTearDown() + + async def test_get_cached_from_db(self): + mock_cursor = AsyncMock() + mock_cursor.to_list_raw = AsyncMock(return_value=[{"_id": "1", "data": "test"}]) + self.mock_search.return_value = mock_cursor + self.mock_get_cache.return_value = None + + result = await self.service.get_cached() + + assert result == [{"_id": "1", "data": "test"}] + self.mock_get_cache.is_called_once() + self.mock_set_cache.assert_called_once_with([{"_id": "1", "data": "test"}]) + self.mock_search.assert_called_once() + + async def test_get_cached_by_id_found_in_cache(self): + self.mock_get_cache.return_value = [{"_id": "1", "data": "test"}] + + result = await self.service.get_cached_by_id("1") + + assert result == {"_id": "1", "data": "test"} + self.mock_get_cache.assert_called_once() + self.mock_set_cache.assert_not_called() + self.mock_search.assert_not_called() + + @patch.object(TestAsyncCacheableService, "find_by_id", return_value={"_id": "1", "data": "from_db"}) + async def test_get_cached_by_id_NOT_found_in_cache(self, mock_find_by_id): + self.mock_get_cache.return_value = [{"_id": "2", "data": "test"}] + + result = await self.service.get_cached_by_id("1") + assert result == {"_id": "1", "data": "from_db"} + + self.mock_get_cache.assert_called_once() + self.mock_set_cache.assert_not_called() + mock_find_by_id.assert_called_once_with("1")