diff --git a/.github/workflows/ci-arm-build.yml b/.github/workflows/ci-arm-build.yml index 0dca219a690..fcced45470a 100644 --- a/.github/workflows/ci-arm-build.yml +++ b/.github/workflows/ci-arm-build.yml @@ -1,19 +1,21 @@ name: CI ARM64 Build and Push on: - # push: - # branches: - # - "master" - # tags-ignore: - # - "*" - # pull_request: - # branches-ignore: - # - "*" + push: + branches: + - "master" + tags-ignore: + - "*" + workflow_dispatch: +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: build-and-push-arm64: - if: github.event_name == 'workflow_dispatch' + timeout-minutes: 60 # intentionally long to allow for slow builds runs-on: ubuntu-latest strategy: matrix: @@ -54,14 +56,3 @@ jobs: export DOCKER_IMAGE_TAG="$TAG_PREFIX-latest-arm64" export DOCKER_TARGET_PLATFORMS=linux/arm64 make build push=true - - name: build & push images for specific tag - run: | - export DOCKER_IMAGE_TAG=$(exec ci/helpers/build_docker_image_tag.bash)-arm64 - export DOCKER_TARGET_PLATFORMS=linux/arm64 - make build push=true - - name: fuse images in the registry for latest tag - run: | - export DOCKER_IMAGE_TAG="$TAG_PREFIX-latest" - make docker-image-fuse SUFFIX=arm64 - - name: set git tag - run: echo "GIT_TAG=${GITHUB_REF##*/}" >> $GITHUB_ENV diff --git a/.github/workflows/ci-multi-architecture-fusing.yml b/.github/workflows/ci-multi-architecture-fusing.yml new file mode 100644 index 00000000000..bd825b0d247 --- /dev/null +++ b/.github/workflows/ci-multi-architecture-fusing.yml @@ -0,0 +1,58 @@ +name: CI Multi-Architecture Fusing + +on: + workflow_run: + workflows: ["CI ARM64 Build and Push", "CI"] + types: + - completed + branches: + - "master" + + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + multi-architecture-fusing: + if: ${{ github.event.workflow_run.conclusion == 'success' }} + timeout-minutes: 60 # intentionally long to allow for slow builds + runs-on: ubuntu-latest + strategy: + matrix: + os: [ubuntu-22.04] + python: ["3.11"] + env: + # secrets can be set in settings/secrets on github + DOCKER_REGISTRY: ${{ secrets.DOCKER_REGISTRY }} + steps: + - uses: actions/checkout@v4 + - name: setup QEMU + uses: docker/setup-qemu-action@v3 + - name: setup docker buildx + id: buildx + uses: docker/setup-buildx-action@v3 + with: + driver: docker-container + - name: expose github runtime for buildx + uses: crazy-max/ghaction-github-runtime@v3 + - name: show system environs + run: ./ci/helpers/show_system_versions.bash + - name: login to Dockerhub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + - name: Set deployment variables + run: | + if [ "${GITHUB_REF}" == "refs/heads/master" ]; then + echo "TAG_PREFIX=master-github" >> $GITHUB_ENV + elif [[ "${GITHUB_REF}" == refs/heads/hotfix_v* ]]; then + echo "TAG_PREFIX=hotfix-github" >> $GITHUB_ENV + elif [[ "${GITHUB_REF}" == refs/heads/hotfix_staging_* ]]; then + echo "TAG_PREFIX=hotfix-staging-github" >> $GITHUB_ENV + fi + - name: fuse images in the registry for latest tag + run: | + export DOCKER_IMAGE_TAG="$TAG_PREFIX-latest" + make docker-image-fuse SUFFIX=arm64 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6aad7478949..b6260cc42a4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -30,7 +30,7 @@ repos: name: upgrade code exclude: ^scripts/maintenance/computational-clusters/autoscaled_monitor/cli\.py$ # Optional get replaced and typer does not like it - repo: https://github.com/hadialqattan/pycln - rev: v2.1.4 + rev: v2.5.0 hooks: - id: pycln args: [--all, --expand-stars] diff --git a/packages/aws-library/requirements/_base.txt b/packages/aws-library/requirements/_base.txt index c4c997b5983..240c9304b36 100644 --- a/packages/aws-library/requirements/_base.txt +++ b/packages/aws-library/requirements/_base.txt @@ -294,7 +294,7 @@ pyyaml==6.0.2 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_base.in -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/packages/pytest-simcore/src/pytest_simcore/redis_service.py b/packages/pytest-simcore/src/pytest_simcore/redis_service.py index 98cf03a595b..7793228d3c4 100644 --- a/packages/pytest-simcore/src/pytest_simcore/redis_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/redis_service.py @@ -10,6 +10,7 @@ import tenacity from pytest_mock import MockerFixture from redis.asyncio import Redis, from_url +from servicelib.redis import _constants as redis_constants from settings_library.basic_types import PortInt from settings_library.redis import RedisDatabase, RedisSettings from tenacity.before_sleep import before_sleep_log @@ -118,6 +119,4 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None: @pytest.fixture def mock_redis_socket_timeout(mocker: MockerFixture) -> None: # lowered to allow CI to properly shutdown RedisClientSDK instances - from servicelib import redis - - mocker.patch.object(redis, "_DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1)) + mocker.patch.object(redis_constants, "DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1)) diff --git a/packages/service-library/requirements/_base.txt b/packages/service-library/requirements/_base.txt index 6ace2b77f35..c11d7e06099 100644 --- a/packages/service-library/requirements/_base.txt +++ b/packages/service-library/requirements/_base.txt @@ -210,7 +210,7 @@ pyyaml==6.0.2 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # -r requirements/_base.in -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/packages/service-library/src/servicelib/redis/__init__.py b/packages/service-library/src/servicelib/redis/__init__.py new file mode 100644 index 00000000000..fe2455409d5 --- /dev/null +++ b/packages/service-library/src/servicelib/redis/__init__.py @@ -0,0 +1,25 @@ +from ._client import RedisClientSDK +from ._clients_manager import RedisClientsManager +from ._decorators import exclusive +from ._distributed_locks_utils import start_exclusive_periodic_task +from ._errors import ( + CouldNotAcquireLockError, + CouldNotConnectToRedisError, + LockLostError, +) +from ._models import RedisManagerDBConfig +from ._utils import handle_redis_returns_union_types + +__all__: tuple[str, ...] = ( + "CouldNotAcquireLockError", + "CouldNotConnectToRedisError", + "exclusive", + "handle_redis_returns_union_types", + "LockLostError", + "RedisClientSDK", + "RedisClientsManager", + "RedisManagerDBConfig", + "start_exclusive_periodic_task", +) + +# nopycln: file diff --git a/packages/service-library/src/servicelib/redis.py b/packages/service-library/src/servicelib/redis/_client.py similarity index 64% rename from packages/service-library/src/servicelib/redis.py rename to packages/service-library/src/servicelib/redis/_client.py index 00ed169e68d..6e87d122cce 100644 --- a/packages/service-library/src/servicelib/redis.py +++ b/packages/service-library/src/servicelib/redis/_client.py @@ -5,65 +5,38 @@ from asyncio import Task from collections.abc import AsyncIterator from dataclasses import dataclass, field -from typing import Final from uuid import uuid4 import redis.asyncio as aioredis import redis.exceptions -from common_library.errors_classes import OsparcErrorMixin -from pydantic import NonNegativeFloat, NonNegativeInt +from pydantic import NonNegativeFloat from redis.asyncio.lock import Lock from redis.asyncio.retry import Retry from redis.backoff import ExponentialBackoff -from settings_library.redis import RedisDatabase, RedisSettings from tenacity import retry from yarl import URL -from .background_task import periodic_task -from .logging_utils import log_catch, log_context -from .retry_policies import RedisRetryPolicyUponInitialization - -_DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10) -_DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30) - - -_DEFAULT_DECODE_RESPONSES: Final[bool] = True -_DEFAULT_HEALTH_CHECK_INTERVAL: Final[datetime.timedelta] = datetime.timedelta( - seconds=5 +from ..background_task import periodic_task +from ..logging_utils import log_catch +from ..retry_policies import RedisRetryPolicyUponInitialization +from ._constants import ( + DEFAULT_DECODE_RESPONSES, + DEFAULT_HEALTH_CHECK_INTERVAL, + DEFAULT_LOCK_TTL, + DEFAULT_SOCKET_TIMEOUT, ) -_SHUTDOWN_TIMEOUT_S: Final[NonNegativeInt] = 5 - +from ._errors import CouldNotAcquireLockError, CouldNotConnectToRedisError +from ._utils import auto_extend_lock, cancel_or_warn _logger = logging.getLogger(__name__) -class BaseRedisError(OsparcErrorMixin, RuntimeError): - ... - - -class CouldNotAcquireLockError(BaseRedisError): - msg_template: str = "Lock {lock.name} could not be acquired!" - - -class CouldNotConnectToRedisError(BaseRedisError): - msg_template: str = "Connection to '{dsn}' failed" - - -async def _cancel_or_warn(task: Task) -> None: - if not task.cancelled(): - task.cancel() - _, pending = await asyncio.wait((task,), timeout=_SHUTDOWN_TIMEOUT_S) - if pending: - task_name = task.get_name() - _logger.warning("Could not cancel task_name=%s pending=%s", task_name, pending) - - @dataclass class RedisClientSDK: redis_dsn: str client_name: str - decode_responses: bool = _DEFAULT_DECODE_RESPONSES - health_check_interval: datetime.timedelta = _DEFAULT_HEALTH_CHECK_INTERVAL + decode_responses: bool = DEFAULT_DECODE_RESPONSES + health_check_interval: datetime.timedelta = DEFAULT_HEALTH_CHECK_INTERVAL _client: aioredis.Redis = field(init=False) _health_check_task: Task | None = None @@ -74,7 +47,7 @@ class RedisClientSDK: def redis(self) -> aioredis.Redis: return self._client - def __post_init__(self): + def __post_init__(self) -> None: self._client = aioredis.from_url( self.redis_dsn, # Run 3 retries with exponential backoff strategy source: https://redis.readthedocs.io/en/stable/backoff.html @@ -84,8 +57,8 @@ def __post_init__(self): redis.exceptions.ConnectionError, redis.exceptions.TimeoutError, ], - socket_timeout=_DEFAULT_SOCKET_TIMEOUT.total_seconds(), - socket_connect_timeout=_DEFAULT_SOCKET_TIMEOUT.total_seconds(), + socket_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(), + socket_connect_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(), encoding="utf-8", decode_responses=self.decode_responses, client_name=self.client_name, @@ -113,7 +86,7 @@ async def setup(self) -> None: async def shutdown(self) -> None: if self._health_check_task: self._continue_health_checking = False - await _cancel_or_warn(self._health_check_task) + await cancel_or_warn(self._health_check_task) self._health_check_task = None await self._client.aclose(close_connection_pool=True) @@ -165,7 +138,7 @@ async def lock_context( 2. `blocking==True` timeouts out while waiting for lock to be free (another entity holds the lock) """ - total_lock_duration: datetime.timedelta = _DEFAULT_LOCK_TTL + total_lock_duration: datetime.timedelta = DEFAULT_LOCK_TTL lock_unique_id = f"lock_extender_{lock_key}_{uuid4()}" ttl_lock: Lock = self._client.lock( @@ -178,15 +151,9 @@ async def lock_context( if not await ttl_lock.acquire(token=lock_value): raise CouldNotAcquireLockError(lock=ttl_lock) - async def _extend_lock(lock: Lock) -> None: - with log_context( - _logger, logging.DEBUG, f"Extending lock {lock_unique_id}" - ), log_catch(_logger, reraise=False): - await lock.reacquire() - try: async with periodic_task( - _extend_lock, + auto_extend_lock, interval=total_lock_duration / 2, task_name=lock_unique_id, lock=ttl_lock, @@ -224,51 +191,3 @@ async def _extend_lock(lock: Lock) -> None: async def lock_value(self, lock_name: str) -> str | None: output: str | None = await self._client.get(lock_name) return output - - -@dataclass(frozen=True) -class RedisManagerDBConfig: - database: RedisDatabase - decode_responses: bool = _DEFAULT_DECODE_RESPONSES - health_check_interval: datetime.timedelta = _DEFAULT_HEALTH_CHECK_INTERVAL - - -@dataclass -class RedisClientsManager: - """ - Manages the lifetime of redis client sdk connections - """ - - databases_configs: set[RedisManagerDBConfig] - settings: RedisSettings - client_name: str - - _client_sdks: dict[RedisDatabase, RedisClientSDK] = field(default_factory=dict) - - async def setup(self) -> None: - for config in self.databases_configs: - self._client_sdks[config.database] = RedisClientSDK( - redis_dsn=self.settings.build_redis_dsn(config.database), - decode_responses=config.decode_responses, - health_check_interval=config.health_check_interval, - client_name=f"{self.client_name}", - ) - - for client in self._client_sdks.values(): - await client.setup() - - async def shutdown(self) -> None: - # NOTE: somehow using logged_gather is not an option - # doing so will make the shutdown procedure hang - for client in self._client_sdks.values(): - await client.shutdown() - - def client(self, database: RedisDatabase) -> RedisClientSDK: - return self._client_sdks[database] - - async def __aenter__(self) -> "RedisClientsManager": - await self.setup() - return self - - async def __aexit__(self, *args): - await self.shutdown() diff --git a/packages/service-library/src/servicelib/redis/_clients_manager.py b/packages/service-library/src/servicelib/redis/_clients_manager.py new file mode 100644 index 00000000000..01d34781cf2 --- /dev/null +++ b/packages/service-library/src/servicelib/redis/_clients_manager.py @@ -0,0 +1,47 @@ +from dataclasses import dataclass, field + +from settings_library.redis import RedisDatabase, RedisSettings + +from ._client import RedisClientSDK +from ._models import RedisManagerDBConfig + + +@dataclass +class RedisClientsManager: + """ + Manages the lifetime of redis client sdk connections + """ + + databases_configs: set[RedisManagerDBConfig] + settings: RedisSettings + client_name: str + + _client_sdks: dict[RedisDatabase, RedisClientSDK] = field(default_factory=dict) + + async def setup(self) -> None: + for config in self.databases_configs: + self._client_sdks[config.database] = RedisClientSDK( + redis_dsn=self.settings.build_redis_dsn(config.database), + decode_responses=config.decode_responses, + health_check_interval=config.health_check_interval, + client_name=f"{self.client_name}", + ) + + for client in self._client_sdks.values(): + await client.setup() + + async def shutdown(self) -> None: + # NOTE: somehow using logged_gather is not an option + # doing so will make the shutdown procedure hang + for client in self._client_sdks.values(): + await client.shutdown() + + def client(self, database: RedisDatabase) -> RedisClientSDK: + return self._client_sdks[database] + + async def __aenter__(self) -> "RedisClientsManager": + await self.setup() + return self + + async def __aexit__(self, *args) -> None: + await self.shutdown() diff --git a/packages/service-library/src/servicelib/redis/_constants.py b/packages/service-library/src/servicelib/redis/_constants.py new file mode 100644 index 00000000000..6a10c6b75b0 --- /dev/null +++ b/packages/service-library/src/servicelib/redis/_constants.py @@ -0,0 +1,12 @@ +import datetime +from typing import Final + +from pydantic import NonNegativeInt + +DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10) +DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30) + + +DEFAULT_DECODE_RESPONSES: Final[bool] = True +DEFAULT_HEALTH_CHECK_INTERVAL: Final[datetime.timedelta] = datetime.timedelta(seconds=5) +SHUTDOWN_TIMEOUT_S: Final[NonNegativeInt] = 5 diff --git a/packages/service-library/src/servicelib/redis/_decorators.py b/packages/service-library/src/servicelib/redis/_decorators.py new file mode 100644 index 00000000000..53c952b3991 --- /dev/null +++ b/packages/service-library/src/servicelib/redis/_decorators.py @@ -0,0 +1,56 @@ +import functools +import logging +from collections.abc import Awaitable, Callable +from typing import ParamSpec, TypeVar + +from ._client import RedisClientSDK + +_logger = logging.getLogger(__file__) + +P = ParamSpec("P") +R = TypeVar("R") + + +def exclusive( + redis: RedisClientSDK | Callable[..., RedisClientSDK], + *, + lock_key: str | Callable[..., str], + lock_value: bytes | str | None = None, +) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: + """ + Define a method to run exclusively across + processes by leveraging a Redis Lock. + + parameters: + redis: the redis client SDK + lock_key: a string as the name of the lock (good practice: app_name:lock_name) + lock_value: some additional data that can be retrieved by another client + + Raises: + - ValueError if used incorrectly + - CouldNotAcquireLockError if the lock could not be acquired + """ + + if not lock_key: + msg = "lock_key cannot be empty string!" + raise ValueError(msg) + + def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]: + @functools.wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + redis_lock_key = ( + lock_key(*args, **kwargs) if callable(lock_key) else lock_key + ) + assert isinstance(redis_lock_key, str) # nosec + + redis_client = redis(*args, **kwargs) if callable(redis) else redis + assert isinstance(redis_client, RedisClientSDK) # nosec + + async with redis_client.lock_context( + lock_key=redis_lock_key, lock_value=lock_value + ): + return await func(*args, **kwargs) + + return wrapper + + return decorator diff --git a/packages/service-library/src/servicelib/redis/_distributed_locks_utils.py b/packages/service-library/src/servicelib/redis/_distributed_locks_utils.py new file mode 100644 index 00000000000..2560e88c41f --- /dev/null +++ b/packages/service-library/src/servicelib/redis/_distributed_locks_utils.py @@ -0,0 +1,79 @@ +import asyncio +import datetime +import logging +from collections.abc import Awaitable, Callable + +import arrow +from servicelib.background_task import start_periodic_task + +from ._client import RedisClientSDK +from ._decorators import exclusive +from ._errors import CouldNotAcquireLockError + +_logger = logging.getLogger(__name__) + + +async def _exclusive_task_starter( + client: RedisClientSDK, + usr_tsk_task: Callable[..., Awaitable[None]], + *, + usr_tsk_interval: datetime.timedelta, + usr_tsk_task_name: str, + **kwargs, +) -> None: + lock_key = f"lock:exclusive_task_starter:{usr_tsk_task_name}" + lock_value = f"locked since {arrow.utcnow().format()}" + + try: + await exclusive(client, lock_key=lock_key, lock_value=lock_value)( + start_periodic_task + )( + usr_tsk_task, + interval=usr_tsk_interval, + task_name=usr_tsk_task_name, + **kwargs, + ) + except CouldNotAcquireLockError: + _logger.debug( + "Could not acquire lock '%s' with value '%s'", lock_key, lock_value + ) + except Exception as e: + _logger.exception(e) # noqa: TRY401 + raise + + +def start_exclusive_periodic_task( + client: RedisClientSDK, + task: Callable[..., Awaitable[None]], + *, + task_period: datetime.timedelta, + retry_after: datetime.timedelta = datetime.timedelta(seconds=1), + task_name: str, + **kwargs, +) -> asyncio.Task: + """ + Ensures that only 1 process periodically ever runs ``task`` at all times. + If one process dies, another process will run the ``task``. + + Creates a background task that periodically tries to start the user ``task``. + Before the ``task`` is scheduled for periodic background execution, it acquires a lock. + Subsequent calls to ``start_exclusive_periodic_task`` will not allow the same ``task`` + to start since the lock will prevent the scheduling. + + Q&A: + - Why is `_exclusive_task_starter` run as a task? + This is usually used at setup time and cannot block the setup process forever + - Why is `_exclusive_task_starter` task a periodic task? + If Redis connectivity is lost, the periodic `_exclusive_task_starter` ensures the lock is + reacquired + """ + return start_periodic_task( + _exclusive_task_starter, + interval=retry_after, + task_name=f"exclusive_task_starter_{task_name}", + client=client, + usr_tsk_task=task, + usr_tsk_interval=task_period, + usr_tsk_task_name=task_name, + **kwargs, + ) diff --git a/packages/service-library/src/servicelib/redis/_errors.py b/packages/service-library/src/servicelib/redis/_errors.py new file mode 100644 index 00000000000..1a62f8f2e5a --- /dev/null +++ b/packages/service-library/src/servicelib/redis/_errors.py @@ -0,0 +1,17 @@ +from common_library.errors_classes import OsparcErrorMixin + + +class BaseRedisError(OsparcErrorMixin, RuntimeError): + ... + + +class CouldNotAcquireLockError(BaseRedisError): + msg_template: str = "Lock {lock.name} could not be acquired!" + + +class CouldNotConnectToRedisError(BaseRedisError): + msg_template: str = "Connection to '{dsn}' failed" + + +class LockLostError(BaseRedisError): + msg_template: str = "Lock {lock.name} has been lost" diff --git a/packages/service-library/src/servicelib/redis/_models.py b/packages/service-library/src/servicelib/redis/_models.py new file mode 100644 index 00000000000..6e2db864c09 --- /dev/null +++ b/packages/service-library/src/servicelib/redis/_models.py @@ -0,0 +1,13 @@ +import datetime +from dataclasses import dataclass + +from settings_library.redis import RedisDatabase + +from ._constants import DEFAULT_DECODE_RESPONSES, DEFAULT_HEALTH_CHECK_INTERVAL + + +@dataclass(frozen=True, kw_only=True) +class RedisManagerDBConfig: + database: RedisDatabase + decode_responses: bool = DEFAULT_DECODE_RESPONSES + health_check_interval: datetime.timedelta = DEFAULT_HEALTH_CHECK_INTERVAL diff --git a/packages/service-library/src/servicelib/redis/_utils.py b/packages/service-library/src/servicelib/redis/_utils.py new file mode 100644 index 00000000000..76fe12cb10e --- /dev/null +++ b/packages/service-library/src/servicelib/redis/_utils.py @@ -0,0 +1,37 @@ +import asyncio +import logging +from collections.abc import Awaitable +from typing import Any + +import redis.exceptions +from redis.asyncio.lock import Lock + +from ..logging_utils import log_context +from ._constants import SHUTDOWN_TIMEOUT_S +from ._errors import LockLostError + +_logger = logging.getLogger(__name__) + + +async def cancel_or_warn(task: asyncio.Task) -> None: + if not task.cancelled(): + task.cancel() + _, pending = await asyncio.wait((task,), timeout=SHUTDOWN_TIMEOUT_S) + if pending: + task_name = task.get_name() + _logger.warning("Could not cancel task_name=%s pending=%s", task_name, pending) + + +async def auto_extend_lock(lock: Lock) -> None: + try: + with log_context(_logger, logging.DEBUG, f"Autoextend lock {lock.name!r}"): + await lock.reacquire() + except redis.exceptions.LockNotOwnedError as exc: + raise LockLostError(lock=lock) from exc + + +async def handle_redis_returns_union_types(result: Any | Awaitable[Any]) -> Any: + """Used to handle mypy issues with redis 5.x return types""" + if isinstance(result, Awaitable): + return await result + return result diff --git a/packages/service-library/src/servicelib/redis_utils.py b/packages/service-library/src/servicelib/redis_utils.py deleted file mode 100644 index 559349cbb0d..00000000000 --- a/packages/service-library/src/servicelib/redis_utils.py +++ /dev/null @@ -1,134 +0,0 @@ -import asyncio -import functools -import logging -from collections.abc import Awaitable, Callable -from datetime import timedelta -from typing import Any, ParamSpec, TypeVar - -import arrow - -from .background_task import start_periodic_task -from .redis import CouldNotAcquireLockError, RedisClientSDK - -_logger = logging.getLogger(__file__) - -P = ParamSpec("P") -R = TypeVar("R") - - -def exclusive( - redis: RedisClientSDK | Callable[..., RedisClientSDK], - *, - lock_key: str | Callable[..., str], - lock_value: bytes | str | None = None, -) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: - """ - Define a method to run exclusively across - processes by leveraging a Redis Lock. - - parameters: - redis: the redis client SDK - lock_key: a string as the name of the lock (good practice: app_name:lock_name) - lock_value: some additional data that can be retrieved by another client - - Raises: - - ValueError if used incorrectly - - CouldNotAcquireLockError if the lock could not be acquired - """ - - if not lock_key: - msg = "lock_key cannot be empty string!" - raise ValueError(msg) - - def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]: - @functools.wraps(func) - async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - redis_lock_key = ( - lock_key(*args, **kwargs) if callable(lock_key) else lock_key - ) - assert isinstance(redis_lock_key, str) # nosec - - redis_client = redis(*args, **kwargs) if callable(redis) else redis - assert isinstance(redis_client, RedisClientSDK) # nosec - - async with redis_client.lock_context( - lock_key=redis_lock_key, lock_value=lock_value - ): - return await func(*args, **kwargs) - - return wrapper - - return decorator - - -async def _exclusive_task_starter( - redis: RedisClientSDK, - usr_tsk_task: Callable[..., Awaitable[None]], - *, - usr_tsk_interval: timedelta, - usr_tsk_task_name: str, - **kwargs, -) -> None: - lock_key = f"lock:exclusive_task_starter:{usr_tsk_task_name}" - lock_value = f"locked since {arrow.utcnow().format()}" - - try: - await exclusive(redis, lock_key=lock_key, lock_value=lock_value)( - start_periodic_task - )( - usr_tsk_task, - interval=usr_tsk_interval, - task_name=usr_tsk_task_name, - **kwargs, - ) - except CouldNotAcquireLockError: - _logger.debug( - "Could not acquire lock '%s' with value '%s'", lock_key, lock_value - ) - except Exception as e: - _logger.exception(e) # noqa: TRY401 - raise - - -def start_exclusive_periodic_task( - redis: RedisClientSDK, - task: Callable[..., Awaitable[None]], - *, - task_period: timedelta, - retry_after: timedelta = timedelta(seconds=1), - task_name: str, - **kwargs, -) -> asyncio.Task: - """ - Ensures that only 1 process periodically ever runs ``task`` at all times. - If one process dies, another process will run the ``task``. - - Creates a background task that periodically tries to start the user ``task``. - Before the ``task`` is scheduled for periodic background execution, it acquires a lock. - Subsequent calls to ``start_exclusive_periodic_task`` will not allow the same ``task`` - to start since the lock will prevent the scheduling. - - Q&A: - - Why is `_exclusive_task_starter` run as a task? - This is usually used at setup time and cannot block the setup process forever - - Why is `_exclusive_task_starter` task a periodic task? - If Redis connectivity is lost, the periodic `_exclusive_task_starter` ensures the lock is - reacquired - """ - return start_periodic_task( - _exclusive_task_starter, - interval=retry_after, - task_name=f"exclusive_task_starter_{task_name}", - redis=redis, - usr_tsk_task=task, - usr_tsk_interval=task_period, - usr_tsk_task_name=task_name, - **kwargs, - ) - - -async def handle_redis_returns_union_types(result: Any | Awaitable[Any]) -> Any: - """Used to handle mypy issues with redis 5.x return types""" - if isinstance(result, Awaitable): - return await result - return result diff --git a/packages/service-library/tests/conftest.py b/packages/service-library/tests/conftest.py index 4f05756fa16..b6af04f6b78 100644 --- a/packages/service-library/tests/conftest.py +++ b/packages/service-library/tests/conftest.py @@ -77,7 +77,8 @@ async def get_redis_client_sdk( ]: @asynccontextmanager async def _( - database: RedisDatabase, decode_response: bool = True # noqa: FBT002 + database: RedisDatabase, + decode_response: bool = True, # noqa: FBT002 ) -> AsyncIterator[RedisClientSDK]: redis_resources_dns = redis_service.build_redis_dsn(database) client = RedisClientSDK( @@ -97,7 +98,7 @@ async def _cleanup_redis_data(clients_manager: RedisClientsManager) -> None: await clients_manager.client(db).redis.flushall() async with RedisClientsManager( - {RedisManagerDBConfig(db) for db in RedisDatabase}, + {RedisManagerDBConfig(database=db) for db in RedisDatabase}, redis_service, client_name="pytest", ) as clients_manager: diff --git a/packages/service-library/tests/deferred_tasks/test_deferred_tasks.py b/packages/service-library/tests/deferred_tasks/test_deferred_tasks.py index 6dbc5d3d764..8f09e436885 100644 --- a/packages/service-library/tests/deferred_tasks/test_deferred_tasks.py +++ b/packages/service-library/tests/deferred_tasks/test_deferred_tasks.py @@ -20,9 +20,9 @@ from common_library.serialization import model_dump_with_secrets from pydantic import NonNegativeFloat, NonNegativeInt from pytest_mock import MockerFixture -from servicelib import redis as servicelib_redis from servicelib.rabbitmq import RabbitMQClient from servicelib.redis import RedisClientSDK +from servicelib.redis import _constants as redis_client_constants from servicelib.sequences_utils import partition_gen from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisSettings @@ -353,7 +353,6 @@ def __init__( async def _pause_container( self, container_name: str, client: ClientWithPingProtocol ) -> AsyncIterator[None]: - async with self.paused_container(container_name): async for attempt in AsyncRetrying( wait=wait_fixed(0.1), @@ -391,7 +390,9 @@ async def pause_redis(self) -> AsyncIterator[None]: @pytest.fixture def mock_default_socket_timeout(mocker: MockerFixture) -> None: mocker.patch.object( - servicelib_redis, "_DEFAULT_SOCKET_TIMEOUT", datetime.timedelta(seconds=0.25) + redis_client_constants, + "DEFAULT_SOCKET_TIMEOUT", + datetime.timedelta(seconds=0.25), ) @@ -420,7 +421,6 @@ async def test_workflow_with_third_party_services_outages( redis_service, max_workers, ) as manager: - # start all in parallel await asyncio.gather( *[manager.start_task(0.1, i) for i in range(deferred_tasks_to_start)] diff --git a/packages/service-library/tests/test_redis.py b/packages/service-library/tests/redis/test_redis.py similarity index 89% rename from packages/service-library/tests/test_redis.py rename to packages/service-library/tests/redis/test_redis.py index c120f85d344..6a88641f844 100644 --- a/packages/service-library/tests/test_redis.py +++ b/packages/service-library/tests/redis/test_redis.py @@ -14,13 +14,14 @@ from faker import Faker from pytest_mock import MockerFixture from redis.exceptions import LockError, LockNotOwnedError -from servicelib import redis as servicelib_redis from servicelib.redis import ( CouldNotAcquireLockError, + LockLostError, RedisClientSDK, RedisClientsManager, RedisManagerDBConfig, ) +from servicelib.redis import _constants as redis_constants from servicelib.utils import limited_gather from settings_library.redis import RedisDatabase, RedisSettings from tenacity import ( @@ -35,7 +36,7 @@ ] pytest_simcore_ops_services_selection = [ - # "redis-commander", + "redis-commander", ] @@ -48,7 +49,7 @@ async def _is_locked(redis_client_sdk: RedisClientSDK, lock_name: str) -> bool: async def redis_client_sdk( get_redis_client_sdk: Callable[ [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ] + ], ) -> AsyncIterator[RedisClientSDK]: async with get_redis_client_sdk(RedisDatabase.RESOURCES) as client: yield client @@ -62,7 +63,7 @@ def lock_timeout() -> datetime.timedelta: @pytest.fixture def mock_default_lock_ttl(mocker: MockerFixture) -> None: mocker.patch.object( - servicelib_redis, "_DEFAULT_LOCK_TTL", datetime.timedelta(seconds=0.25) + redis_constants, "DEFAULT_LOCK_TTL", datetime.timedelta(seconds=0.25) ) @@ -165,6 +166,21 @@ async def test_lock_context( assert await ttl_lock.owned() is False +@pytest.mark.xfail(reason="This test shows an issue, that will be fixed in the next PR") +async def test_lock_context_raises_if_lock_is_lost( + redis_client_sdk: RedisClientSDK, faker: Faker +): + lock_name = faker.pystr() + with pytest.raises(LockLostError): # noqa: PT012 + async with redis_client_sdk.lock_context(lock_name) as ttl_lock: + assert await _is_locked(redis_client_sdk, lock_name) is True + assert await ttl_lock.owned() is True + # let's simlulate lost lock by forcefully deleting it + await redis_client_sdk._client.delete(lock_name) # noqa: SLF001 + # now we wait for the exception to be raised + await asyncio.sleep(20) + + async def test_lock_context_with_already_locked_lock_raises( redis_client_sdk: RedisClientSDK, faker: Faker ): @@ -197,7 +213,7 @@ async def test_lock_context_with_data(redis_client_sdk: RedisClientSDK, faker: F lock_name = faker.pystr() assert await _is_locked(redis_client_sdk, lock_name) is False assert await redis_client_sdk.lock_value(lock_name) is None - async with redis_client_sdk.lock_context(lock_name, lock_value=lock_data) as lock: + async with redis_client_sdk.lock_context(lock_name, lock_value=lock_data): assert await _is_locked(redis_client_sdk, lock_name) is True assert await redis_client_sdk.lock_value(lock_name) == lock_data assert await _is_locked(redis_client_sdk, lock_name) is False @@ -238,18 +254,14 @@ async def race_condition_increase(self, by: int) -> None: current_value = self.value current_value += by # most likely situation which creates issues - await asyncio.sleep( - servicelib_redis._DEFAULT_LOCK_TTL.total_seconds() / 2 # noqa: SLF001 - ) + await asyncio.sleep(redis_constants.DEFAULT_LOCK_TTL.total_seconds() / 2) self.value = current_value counter = RaceConditionCounter() lock_name: str = faker.pystr() # ensures it does nto time out before acquiring the lock time_for_all_inc_counter_calls_to_finish_s: float = ( - servicelib_redis._DEFAULT_LOCK_TTL.total_seconds() # noqa: SLF001 - * INCREASE_OPERATIONS - * 10 + redis_constants.DEFAULT_LOCK_TTL.total_seconds() * INCREASE_OPERATIONS * 10 ) async def _inc_counter() -> None: @@ -272,9 +284,8 @@ async def _inc_counter() -> None: async def test_redis_client_sdks_manager( mock_redis_socket_timeout: None, redis_service: RedisSettings ): - all_redis_configs: set[RedisManagerDBConfig] = { - RedisManagerDBConfig(db) for db in RedisDatabase + RedisManagerDBConfig(database=db) for db in RedisDatabase } manager = RedisClientsManager( databases_configs=all_redis_configs, @@ -320,7 +331,7 @@ async def test_redis_client_sdk_setup_shutdown( @pytest.fixture def mock_default_socket_timeout(mocker: MockerFixture) -> None: mocker.patch.object( - servicelib_redis, "_DEFAULT_SOCKET_TIMEOUT", datetime.timedelta(seconds=0.25) + redis_constants, "DEFAULT_SOCKET_TIMEOUT", datetime.timedelta(seconds=0.25) ) diff --git a/packages/service-library/tests/test_redis__reconection.py b/packages/service-library/tests/redis/test_redis__reconection.py similarity index 100% rename from packages/service-library/tests/test_redis__reconection.py rename to packages/service-library/tests/redis/test_redis__reconection.py diff --git a/packages/service-library/tests/test_redis_utils.py b/packages/service-library/tests/redis/test_redis_utils.py similarity index 97% rename from packages/service-library/tests/test_redis_utils.py rename to packages/service-library/tests/redis/test_redis_utils.py index 26f749cd894..5df925ad2ca 100644 --- a/packages/service-library/tests/test_redis_utils.py +++ b/packages/service-library/tests/redis/test_redis_utils.py @@ -12,8 +12,12 @@ import pytest from faker import Faker from servicelib.background_task import stop_periodic_task -from servicelib.redis import CouldNotAcquireLockError, RedisClientSDK -from servicelib.redis_utils import exclusive, start_exclusive_periodic_task +from servicelib.redis import ( + CouldNotAcquireLockError, + RedisClientSDK, + exclusive, + start_exclusive_periodic_task, +) from servicelib.utils import logged_gather from settings_library.redis import RedisDatabase from tenacity.asyncio import AsyncRetrying @@ -67,7 +71,6 @@ async def test_exclusive_decorator( lock_name: str, sleep_duration: float, ): - async with get_redis_client_sdk(RedisDatabase.RESOURCES) as redis_client: for _ in range(3): assert ( @@ -132,7 +135,6 @@ async def _acquire_lock_and_exclusively_sleep( ) -> None: async with get_redis_client_sdk(RedisDatabase.RESOURCES) as redis_client_sdk: redis_lock_name = lock_name() if callable(lock_name) else lock_name - assert not await _is_locked(redis_client_sdk, redis_lock_name) @exclusive(redis_client_sdk, lock_key=lock_name) async def _() -> float: @@ -156,7 +158,7 @@ async def test_exclusive_parallel_lock_is_released_and_reacquired( results = await logged_gather( *[ _acquire_lock_and_exclusively_sleep( - get_redis_client_sdk, lock_name, sleep_duration=0.1 + get_redis_client_sdk, lock_name, sleep_duration=1 ) for _ in range(parallel_tasks) ], @@ -220,7 +222,7 @@ async def _assert_task_completes_once( async def test_start_exclusive_periodic_task_single( get_redis_client_sdk: Callable[ [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ] + ], ): await _assert_task_completes_once(get_redis_client_sdk, stop_after=2) @@ -241,7 +243,7 @@ def test__check_elements_lower(): async def test_start_exclusive_periodic_task_parallel_all_finish( get_redis_client_sdk: Callable[ [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ] + ], ): parallel_tasks = 10 results: list[tuple[float, float]] = await logged_gather( diff --git a/packages/simcore-sdk/requirements/_base.txt b/packages/simcore-sdk/requirements/_base.txt index c13586c4bd1..24b82746808 100644 --- a/packages/simcore-sdk/requirements/_base.txt +++ b/packages/simcore-sdk/requirements/_base.txt @@ -337,7 +337,7 @@ pyyaml==6.0.2 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_base.in -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/agent/requirements/_base.txt b/services/agent/requirements/_base.txt index 7e4a6e2b4db..30d644e3a42 100644 --- a/services/agent/requirements/_base.txt +++ b/services/agent/requirements/_base.txt @@ -332,7 +332,7 @@ pyyaml==6.0.2 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_base.in -redis==5.2.0 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/api-server/requirements/_base.txt b/services/api-server/requirements/_base.txt index ccc9b001a80..2539c60c7fe 100644 --- a/services/api-server/requirements/_base.txt +++ b/services/api-server/requirements/_base.txt @@ -675,7 +675,7 @@ pyyaml==6.0.2 # -r requirements/_base.in # fastapi # uvicorn -redis==5.2.0 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/autoscaling/requirements/_base.txt b/services/autoscaling/requirements/_base.txt index 497bcc4794f..0f13c0e7eaf 100644 --- a/services/autoscaling/requirements/_base.txt +++ b/services/autoscaling/requirements/_base.txt @@ -189,7 +189,7 @@ h11==0.14.0 # uvicorn httpcore==1.0.7 # via httpx -httpx==0.28.0 +httpx==0.28.1 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/aws-library/requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/autoscaling/requirements/_test.txt b/services/autoscaling/requirements/_test.txt index 28a6d7a45e9..a5de3c064b3 100644 --- a/services/autoscaling/requirements/_test.txt +++ b/services/autoscaling/requirements/_test.txt @@ -93,7 +93,7 @@ httpcore==1.0.7 # via # -c requirements/_base.txt # httpx -httpx==0.28.0 +httpx==0.28.1 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py index 418d8f2afa8..28a37953a52 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py @@ -4,7 +4,7 @@ from fastapi import FastAPI from servicelib.background_task import start_periodic_task, stop_periodic_task -from servicelib.redis_utils import exclusive +from servicelib.redis import exclusive from ..core.settings import ApplicationSettings from ..utils.redis import create_lock_key_and_value diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py b/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py index 2ac31a21343..a4a0eefe11d 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py @@ -4,7 +4,7 @@ from fastapi import FastAPI from servicelib.background_task import start_periodic_task, stop_periodic_task -from servicelib.redis_utils import exclusive +from servicelib.redis import exclusive from ..core.settings import ApplicationSettings from ..utils.redis import create_lock_key_and_value diff --git a/services/clusters-keeper/requirements/_base.txt b/services/clusters-keeper/requirements/_base.txt index 20112ddbea5..9284af99ada 100644 --- a/services/clusters-keeper/requirements/_base.txt +++ b/services/clusters-keeper/requirements/_base.txt @@ -187,7 +187,7 @@ h11==0.14.0 # uvicorn httpcore==1.0.7 # via httpx -httpx==0.28.0 +httpx==0.28.1 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/aws-library/requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/clusters-keeper/requirements/_test.txt b/services/clusters-keeper/requirements/_test.txt index 1f80746a22a..c3fc6f9e590 100644 --- a/services/clusters-keeper/requirements/_test.txt +++ b/services/clusters-keeper/requirements/_test.txt @@ -112,7 +112,7 @@ httpcore==1.0.7 # via # -c requirements/_base.txt # httpx -httpx==0.28.0 +httpx==0.28.1 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py index d2e8f6e4c6f..cb5caa71c21 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py @@ -4,7 +4,7 @@ from fastapi import FastAPI from servicelib.background_task import start_periodic_task, stop_periodic_task -from servicelib.redis_utils import exclusive +from servicelib.redis import exclusive from .._meta import APP_NAME from ..core.settings import ApplicationSettings diff --git a/services/dask-sidecar/requirements/_base.txt b/services/dask-sidecar/requirements/_base.txt index c564683d8c3..7ba73695252 100644 --- a/services/dask-sidecar/requirements/_base.txt +++ b/services/dask-sidecar/requirements/_base.txt @@ -419,7 +419,7 @@ pyyaml==6.0.2 # bokeh # dask # distributed -redis==5.2.0 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/dask-task-models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/datcore-adapter/requirements/_base.txt b/services/datcore-adapter/requirements/_base.txt index a3bfb49c0e7..eef071269ca 100644 --- a/services/datcore-adapter/requirements/_base.txt +++ b/services/datcore-adapter/requirements/_base.txt @@ -352,7 +352,7 @@ pyyaml==6.0.1 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_base.in # uvicorn -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/director-v2/requirements/_base.txt b/services/director-v2/requirements/_base.txt index 407e941fb3a..4f4bfb4730a 100644 --- a/services/director-v2/requirements/_base.txt +++ b/services/director-v2/requirements/_base.txt @@ -760,7 +760,7 @@ pyyaml==6.0.2 # distributed # fastapi # uvicorn -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/dask-task-models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py index 57308eb27c9..87fd24fe8ad 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py @@ -9,8 +9,7 @@ from servicelib.background_task import start_periodic_task, stop_periodic_task from servicelib.exception_utils import silence_exceptions from servicelib.logging_utils import log_context -from servicelib.redis import CouldNotAcquireLockError -from servicelib.redis_utils import exclusive +from servicelib.redis import CouldNotAcquireLockError, exclusive from servicelib.utils import limited_gather from ...models.comp_runs import RunMetadataDict diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py index 397b68db0c9..2bfd2f2f5a4 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py @@ -8,8 +8,7 @@ from models_library.projects import ProjectID from models_library.users import UserID from servicelib.logging_utils import log_context -from servicelib.redis import CouldNotAcquireLockError -from servicelib.redis_utils import exclusive +from servicelib.redis import CouldNotAcquireLockError, exclusive from ...core.settings import get_application_settings from ...models.comp_runs import Iteration @@ -50,7 +49,6 @@ async def _exclusively_schedule_pipeline( async def _handle_apply_distributed_schedule(app: FastAPI, data: bytes) -> bool: - with log_context(_logger, logging.DEBUG, msg="handling scheduling"): to_schedule_pipeline = SchedulePipelineRabbitMessage.model_validate_json(data) with contextlib.suppress(CouldNotAcquireLockError): diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py index 99fa3517130..a65c2cd84ff 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py @@ -48,8 +48,7 @@ ) from servicelib.fastapi.long_running_tasks.client import ProgressCallback from servicelib.fastapi.long_running_tasks.server import TaskProgress -from servicelib.redis import RedisClientsManager -from servicelib.redis_utils import exclusive +from servicelib.redis import RedisClientsManager, exclusive from settings_library.redis import RedisDatabase from .....core.dynamic_services_settings.scheduler import ( @@ -158,7 +157,7 @@ async def shutdown(self) -> None: "Following observation tasks completed with an unexpected error:%s", f"{bad_results}", ) - except asyncio.TimeoutError: + except TimeoutError: logger.exception( "Timed-out waiting for %s to complete. Action: Check why this is blocking", f"{running_tasks=}", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/redis.py b/services/director-v2/src/simcore_service_director_v2/modules/redis.py index 273061cb188..5928cc78e97 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/redis.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/redis.py @@ -14,7 +14,7 @@ async def on_startup() -> None: app.state.redis_clients_manager = redis_clients_manager = RedisClientsManager( databases_configs={ - RedisManagerDBConfig(db) + RedisManagerDBConfig(database=db) for db in ( RedisDatabase.LOCKS, RedisDatabase.DISTRIBUTED_IDENTIFIERS, diff --git a/services/director/requirements/_base.txt b/services/director/requirements/_base.txt index fbef4ce0fc9..b151fdf4639 100644 --- a/services/director/requirements/_base.txt +++ b/services/director/requirements/_base.txt @@ -375,7 +375,7 @@ pyyaml==6.0.2 # -r requirements/../../../packages/service-library/requirements/_base.in # fastapi # uvicorn -redis==5.2.0 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/dynamic-scheduler/requirements/_base.txt b/services/dynamic-scheduler/requirements/_base.txt index 2a490843db1..b9827f8175e 100644 --- a/services/dynamic-scheduler/requirements/_base.txt +++ b/services/dynamic-scheduler/requirements/_base.txt @@ -437,7 +437,7 @@ pyyaml==6.0.2 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_base.in # uvicorn -redis==5.2.0 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/redis.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/redis.py index ff7d53920bf..c6f98d9e49e 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/redis.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/redis.py @@ -23,8 +23,14 @@ def setup_redis(app: FastAPI) -> None: async def on_startup() -> None: app.state.redis_clients_manager = manager = RedisClientsManager( - {RedisManagerDBConfig(x, decode_responses=False) for x in _BINARY_DBS} - | {RedisManagerDBConfig(x, decode_responses=True) for x in _DECODE_DBS}, + { + RedisManagerDBConfig(database=x, decode_responses=False) + for x in _BINARY_DBS + } + | { + RedisManagerDBConfig(database=x, decode_responses=True) + for x in _DECODE_DBS + }, settings, client_name=APP_NAME, ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py index 432cf8896d8..c404535c1e4 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py @@ -8,7 +8,7 @@ from models_library.projects_nodes_io import NodeID from pydantic import NonNegativeFloat, NonNegativeInt from servicelib.background_task import stop_periodic_task -from servicelib.redis_utils import start_exclusive_periodic_task +from servicelib.redis import start_exclusive_periodic_task from servicelib.utils import limited_gather from settings_library.redis import RedisDatabase @@ -34,7 +34,6 @@ async def _start_get_status_deferred( def _can_be_removed(model: TrackedServiceModel) -> bool: - # requested **as** `STOPPED` # service **reports** `IDLE` if ( diff --git a/services/dynamic-scheduler/tests/conftest.py b/services/dynamic-scheduler/tests/conftest.py index 1071b9a103e..1c4760a1659 100644 --- a/services/dynamic-scheduler/tests/conftest.py +++ b/services/dynamic-scheduler/tests/conftest.py @@ -135,7 +135,7 @@ async def app( @pytest.fixture async def remove_redis_data(redis_service: RedisSettings) -> None: async with RedisClientsManager( - {RedisManagerDBConfig(x) for x in RedisDatabase}, + {RedisManagerDBConfig(database=x) for x in RedisDatabase}, redis_service, client_name="pytest", ) as manager: diff --git a/services/dynamic-sidecar/requirements/_base.txt b/services/dynamic-sidecar/requirements/_base.txt index 45c1909c42c..f1ff8805767 100644 --- a/services/dynamic-sidecar/requirements/_base.txt +++ b/services/dynamic-sidecar/requirements/_base.txt @@ -592,7 +592,7 @@ pyyaml==6.0.2 # -r requirements/../../../packages/service-library/requirements/_base.in # -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in # -r requirements/_base.in -redis==5.2.0 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/efs-guardian/requirements/_base.txt b/services/efs-guardian/requirements/_base.txt index 906e7e2581e..4d9bd5f7fc3 100644 --- a/services/efs-guardian/requirements/_base.txt +++ b/services/efs-guardian/requirements/_base.txt @@ -554,7 +554,7 @@ pyyaml==6.0.2 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/aws-library/requirements/../../../packages/service-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/_base.in -redis==5.1.1 +redis==5.2.1 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/aws-library/requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/efs-guardian/requirements/_test.txt b/services/efs-guardian/requirements/_test.txt index 364a52d87ae..12345c0ecfd 100644 --- a/services/efs-guardian/requirements/_test.txt +++ b/services/efs-guardian/requirements/_test.txt @@ -258,7 +258,7 @@ pyyaml==6.0.2 # jsonschema-path # moto # responses -redis==5.1.1 +redis==5.2.1 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py index 5874de42b0c..58218a79cb1 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py @@ -7,7 +7,7 @@ from fastapi import FastAPI from servicelib.background_task import stop_periodic_task from servicelib.logging_utils import log_catch, log_context -from servicelib.redis_utils import start_exclusive_periodic_task +from servicelib.redis import start_exclusive_periodic_task from .background_tasks import removal_policy_task from .modules.redis import get_redis_lock_client @@ -33,9 +33,10 @@ class EfsGuardianBackgroundTask(TypedDict): def _on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: async def _startup() -> None: - with log_context( - _logger, logging.INFO, msg="Efs Guardian startup.." - ), log_catch(_logger, reraise=False): + with ( + log_context(_logger, logging.INFO, msg="Efs Guardian startup.."), + log_catch(_logger, reraise=False), + ): app.state.efs_guardian_background_tasks = [] # Setup periodic tasks @@ -57,9 +58,10 @@ def _on_app_shutdown( _app: FastAPI, ) -> Callable[[], Awaitable[None]]: async def _stop() -> None: - with log_context( - _logger, logging.INFO, msg="Efs Guardian shutdown.." - ), log_catch(_logger, reraise=False): + with ( + log_context(_logger, logging.INFO, msg="Efs Guardian shutdown.."), + log_catch(_logger, reraise=False), + ): assert _app # nosec if _app.state.efs_guardian_background_tasks: await asyncio.gather( diff --git a/services/invitations/requirements/_base.txt b/services/invitations/requirements/_base.txt index c9528eb09f8..515fe9c9567 100644 --- a/services/invitations/requirements/_base.txt +++ b/services/invitations/requirements/_base.txt @@ -354,7 +354,7 @@ pyyaml==6.0.2 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_base.in # uvicorn -redis==5.2.0 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/payments/requirements/_base.txt b/services/payments/requirements/_base.txt index b8fd65acb18..a3a8b15b9d7 100644 --- a/services/payments/requirements/_base.txt +++ b/services/payments/requirements/_base.txt @@ -443,7 +443,7 @@ pyyaml==6.0.2 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_base.in # uvicorn -redis==5.2.0 +redis==5.2.1 # via # -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/resource-usage-tracker/requirements/_base.txt b/services/resource-usage-tracker/requirements/_base.txt index b1071e5b5f0..b20058f6992 100644 --- a/services/resource-usage-tracker/requirements/_base.txt +++ b/services/resource-usage-tracker/requirements/_base.txt @@ -594,7 +594,7 @@ pyyaml==6.0.1 # -r requirements/../../../packages/aws-library/requirements/../../../packages/service-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/_base.in # uvicorn -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/aws-library/requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/resource-usage-tracker/requirements/_test.txt b/services/resource-usage-tracker/requirements/_test.txt index 2e7590de3f6..2a9b72d235c 100644 --- a/services/resource-usage-tracker/requirements/_test.txt +++ b/services/resource-usage-tracker/requirements/_test.txt @@ -237,7 +237,7 @@ pyyaml==6.0.1 # jsonschema-path # moto # responses -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py index 9f022e863e5..c1f407a7765 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py @@ -5,7 +5,7 @@ from fastapi import FastAPI from servicelib.background_task import stop_periodic_task from servicelib.logging_utils import log_catch, log_context -from servicelib.redis_utils import start_exclusive_periodic_task +from servicelib.redis import start_exclusive_periodic_task from ..core.settings import ApplicationSettings from .background_task_periodic_heartbeat_check import ( @@ -26,11 +26,14 @@ class RutBackgroundTask(TypedDict): def _on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: async def _startup() -> None: - with log_context( - _logger, - logging.INFO, - msg="RUT background task Periodic check of running services startup..", - ), log_catch(_logger, reraise=False): + with ( + log_context( + _logger, + logging.INFO, + msg="RUT background task Periodic check of running services startup..", + ), + log_catch(_logger, reraise=False), + ): app_settings: ApplicationSettings = app.state.settings app.state.rut_background_task__periodic_check_of_running_services = None @@ -55,11 +58,14 @@ def _on_app_shutdown( _app: FastAPI, ) -> Callable[[], Awaitable[None]]: async def _stop() -> None: - with log_context( - _logger, - logging.INFO, - msg="RUT background tasks Periodic check of running services shutdown..", - ), log_catch(_logger, reraise=False): + with ( + log_context( + _logger, + logging.INFO, + msg="RUT background tasks Periodic check of running services shutdown..", + ), + log_catch(_logger, reraise=False), + ): assert _app # nosec if _app.state.rut_background_task__periodic_check_of_running_services: await stop_periodic_task( diff --git a/services/storage/requirements/_base.txt b/services/storage/requirements/_base.txt index 18c2388a7dd..df1f0513a0e 100644 --- a/services/storage/requirements/_base.txt +++ b/services/storage/requirements/_base.txt @@ -552,7 +552,7 @@ pyyaml==6.0.1 # -r requirements/../../../packages/aws-library/requirements/../../../packages/service-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/_base.in # aiohttp-swagger -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/aws-library/requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/storage/requirements/_test.txt b/services/storage/requirements/_test.txt index 6b7dbd281bb..bdef9179ec7 100644 --- a/services/storage/requirements/_test.txt +++ b/services/storage/requirements/_test.txt @@ -252,7 +252,7 @@ pyyaml==6.0.1 # jsonschema-path # moto # responses -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt diff --git a/services/storage/src/simcore_service_storage/dsm_cleaner.py b/services/storage/src/simcore_service_storage/dsm_cleaner.py index b11ade3d256..67d859a4032 100644 --- a/services/storage/src/simcore_service_storage/dsm_cleaner.py +++ b/services/storage/src/simcore_service_storage/dsm_cleaner.py @@ -1,4 +1,4 @@ -""" backround task that cleans the DSM pending/expired uploads +"""backround task that cleans the DSM pending/expired uploads # Rationale: - for each upload an entry is created in the file_meta_data table in the database @@ -26,7 +26,7 @@ from aiohttp import web from servicelib.background_task import stop_periodic_task from servicelib.logging_utils import log_catch, log_context -from servicelib.redis_utils import start_exclusive_periodic_task +from servicelib.redis import start_exclusive_periodic_task from .constants import APP_CONFIG_KEY, APP_DSM_KEY from .dsm_factory import DataManagerProvider @@ -57,8 +57,9 @@ async def dsm_cleaner_task(app: web.Application) -> None: def setup_dsm_cleaner(app: web.Application): async def _setup(app: web.Application): - with log_context(_logger, logging.INFO, msg="setup dsm cleaner"), log_catch( - _logger, reraise=False + with ( + log_context(_logger, logging.INFO, msg="setup dsm cleaner"), + log_catch(_logger, reraise=False), ): cfg: Settings = app[APP_CONFIG_KEY] assert cfg.STORAGE_CLEANER_INTERVAL_S # nosec diff --git a/services/web/server/requirements/_base.txt b/services/web/server/requirements/_base.txt index 62a83465800..f35f5ab6212 100644 --- a/services/web/server/requirements/_base.txt +++ b/services/web/server/requirements/_base.txt @@ -650,7 +650,7 @@ pyyaml==6.0.1 # -r requirements/../../../../packages/service-library/requirements/_base.in # -r requirements/../../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in # swagger-ui-py -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt diff --git a/services/web/server/requirements/_test.txt b/services/web/server/requirements/_test.txt index a62c8fcff02..3af248db46e 100644 --- a/services/web/server/requirements/_test.txt +++ b/services/web/server/requirements/_test.txt @@ -177,7 +177,7 @@ pyyaml==6.0.1 # -c requirements/../../../../requirements/constraints.txt # -c requirements/_base.txt # openapi-spec-validator -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../../requirements/constraints.txt # -c requirements/_base.txt diff --git a/services/web/server/src/simcore_service_webserver/redis.py b/services/web/server/src/simcore_service_webserver/redis.py index 1a1427cc09c..5caebe02c53 100644 --- a/services/web/server/src/simcore_service_webserver/redis.py +++ b/services/web/server/src/simcore_service_webserver/redis.py @@ -34,7 +34,7 @@ async def setup_redis_client(app: web.Application): redis_settings: RedisSettings = get_plugin_settings(app) app[_APP_REDIS_CLIENTS_MANAGER] = manager = RedisClientsManager( databases_configs={ - RedisManagerDBConfig(db) + RedisManagerDBConfig(database=db) for db in ( RedisDatabase.RESOURCES, RedisDatabase.LOCKS, diff --git a/services/web/server/src/simcore_service_webserver/resource_manager/registry.py b/services/web/server/src/simcore_service_webserver/resource_manager/registry.py index 9ae9b3d2328..8ebd3b57ab6 100644 --- a/services/web/server/src/simcore_service_webserver/resource_manager/registry.py +++ b/services/web/server/src/simcore_service_webserver/resource_manager/registry.py @@ -1,14 +1,14 @@ -""" Wrapper around a Redis-backed registry for storing resources in a hash (https://redis.io/topics/data-types). +"""Wrapper around a Redis-backed registry for storing resources in a hash (https://redis.io/topics/data-types). - Redis stores key/values. +Redis stores key/values. - key hashes are generated from a dictionary (e.g. {"user_id":"a_user_id, "some_other_id":123} will - create a hash named "user_id=a_user_id:some_other_id=123:resources") - resources are tuples (resource_name, resource_value) that are stored with a key as Redis fields. - A same key can have a lot of fields provided they have a different name. +key hashes are generated from a dictionary (e.g. {"user_id":"a_user_id, "some_other_id":123} will +create a hash named "user_id=a_user_id:some_other_id=123:resources") +resources are tuples (resource_name, resource_value) that are stored with a key as Redis fields. +A same key can have a lot of fields provided they have a different name. - A key can be set as "alive". This creates a secondary key (e.g. "user_id=a_user_id:some_other_id=123:alive"). - This key can have a timeout value. When the key times out then the key disappears from Redis automatically. +A key can be set as "alive". This creates a secondary key (e.g. "user_id=a_user_id:some_other_id=123:alive"). +This key can have a timeout value. When the key times out then the key disappears from Redis automatically. """ @@ -18,7 +18,7 @@ import redis.asyncio as aioredis from aiohttp import web from models_library.basic_types import UUIDStr -from servicelib.redis_utils import handle_redis_returns_union_types +from servicelib.redis import handle_redis_returns_union_types from typing_extensions import ( # https://docs.pydantic.dev/latest/api/standard_library_types/#typeddict TypedDict, ) diff --git a/services/web/server/src/simcore_service_webserver/users/_notifications_rest.py b/services/web/server/src/simcore_service_webserver/users/_notifications_rest.py index e9f3b1788e9..fb7e02b08da 100644 --- a/services/web/server/src/simcore_service_webserver/users/_notifications_rest.py +++ b/services/web/server/src/simcore_service_webserver/users/_notifications_rest.py @@ -11,7 +11,7 @@ parse_request_body_as, parse_request_path_parameters_as, ) -from servicelib.redis_utils import handle_redis_returns_union_types +from servicelib.redis import handle_redis_returns_union_types from .._meta import API_VTAG from ..login.decorators import login_required diff --git a/tests/swarm-deploy/requirements/_test.txt b/tests/swarm-deploy/requirements/_test.txt index 1dedde81a5d..4563e823bbc 100644 --- a/tests/swarm-deploy/requirements/_test.txt +++ b/tests/swarm-deploy/requirements/_test.txt @@ -540,7 +540,7 @@ pyyaml==6.0.2 # -r requirements/../../../packages/service-library/requirements/_base.in # -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in # -r requirements/_test.in -redis==5.0.4 +redis==5.2.1 # via # -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt