Skip to content

Commit

Permalink
♻️Maintenance: Refactoring of redis client structure (#7015)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Jan 8, 2025
1 parent 324c053 commit 897cfca
Show file tree
Hide file tree
Showing 61 changed files with 504 additions and 361 deletions.
31 changes: 11 additions & 20 deletions .github/workflows/ci-arm-build.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
name: CI ARM64 Build and Push

on:
# push:
# branches:
# - "master"
# tags-ignore:
# - "*"
# pull_request:
# branches-ignore:
# - "*"
push:
branches:
- "master"
tags-ignore:
- "*"

workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build-and-push-arm64:
if: github.event_name == 'workflow_dispatch'
timeout-minutes: 60 # intentionally long to allow for slow builds
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -54,14 +56,3 @@ jobs:
export DOCKER_IMAGE_TAG="$TAG_PREFIX-latest-arm64"
export DOCKER_TARGET_PLATFORMS=linux/arm64
make build push=true
- name: build & push images for specific tag
run: |
export DOCKER_IMAGE_TAG=$(exec ci/helpers/build_docker_image_tag.bash)-arm64
export DOCKER_TARGET_PLATFORMS=linux/arm64
make build push=true
- name: fuse images in the registry for latest tag
run: |
export DOCKER_IMAGE_TAG="$TAG_PREFIX-latest"
make docker-image-fuse SUFFIX=arm64
- name: set git tag
run: echo "GIT_TAG=${GITHUB_REF##*/}" >> $GITHUB_ENV
58 changes: 58 additions & 0 deletions .github/workflows/ci-multi-architecture-fusing.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
name: CI Multi-Architecture Fusing

on:
workflow_run:
workflows: ["CI ARM64 Build and Push", "CI"]
types:
- completed
branches:
- "master"


concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
multi-architecture-fusing:
if: ${{ github.event.workflow_run.conclusion == 'success' }}
timeout-minutes: 60 # intentionally long to allow for slow builds
runs-on: ubuntu-latest
strategy:
matrix:
os: [ubuntu-22.04]
python: ["3.11"]
env:
# secrets can be set in settings/secrets on github
DOCKER_REGISTRY: ${{ secrets.DOCKER_REGISTRY }}
steps:
- uses: actions/checkout@v4
- name: setup QEMU
uses: docker/setup-qemu-action@v3
- name: setup docker buildx
id: buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
- name: expose github runtime for buildx
uses: crazy-max/ghaction-github-runtime@v3
- name: show system environs
run: ./ci/helpers/show_system_versions.bash
- name: login to Dockerhub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Set deployment variables
run: |
if [ "${GITHUB_REF}" == "refs/heads/master" ]; then
echo "TAG_PREFIX=master-github" >> $GITHUB_ENV
elif [[ "${GITHUB_REF}" == refs/heads/hotfix_v* ]]; then
echo "TAG_PREFIX=hotfix-github" >> $GITHUB_ENV
elif [[ "${GITHUB_REF}" == refs/heads/hotfix_staging_* ]]; then
echo "TAG_PREFIX=hotfix-staging-github" >> $GITHUB_ENV
fi
- name: fuse images in the registry for latest tag
run: |
export DOCKER_IMAGE_TAG="$TAG_PREFIX-latest"
make docker-image-fuse SUFFIX=arm64
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ repos:
name: upgrade code
exclude: ^scripts/maintenance/computational-clusters/autoscaled_monitor/cli\.py$ # Optional get replaced and typer does not like it
- repo: https://github.com/hadialqattan/pycln
rev: v2.1.4
rev: v2.5.0
hooks:
- id: pycln
args: [--all, --expand-stars]
Expand Down
2 changes: 1 addition & 1 deletion packages/aws-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ pyyaml==6.0.2
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../requirements/constraints.txt
# -r requirements/../../../packages/service-library/requirements/_base.in
redis==5.0.4
redis==5.2.1
# via
# -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
Expand Down
5 changes: 2 additions & 3 deletions packages/pytest-simcore/src/pytest_simcore/redis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tenacity
from pytest_mock import MockerFixture
from redis.asyncio import Redis, from_url
from servicelib.redis import _constants as redis_constants
from settings_library.basic_types import PortInt
from settings_library.redis import RedisDatabase, RedisSettings
from tenacity.before_sleep import before_sleep_log
Expand Down Expand Up @@ -118,6 +119,4 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None:
@pytest.fixture
def mock_redis_socket_timeout(mocker: MockerFixture) -> None:
# lowered to allow CI to properly shutdown RedisClientSDK instances
from servicelib import redis

mocker.patch.object(redis, "_DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1))
mocker.patch.object(redis_constants, "DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1))
2 changes: 1 addition & 1 deletion packages/service-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ pyyaml==6.0.2
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../requirements/constraints.txt
# -r requirements/_base.in
redis==5.0.4
redis==5.2.1
# via
# -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
Expand Down
25 changes: 25 additions & 0 deletions packages/service-library/src/servicelib/redis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from ._client import RedisClientSDK
from ._clients_manager import RedisClientsManager
from ._decorators import exclusive
from ._distributed_locks_utils import start_exclusive_periodic_task
from ._errors import (
CouldNotAcquireLockError,
CouldNotConnectToRedisError,
LockLostError,
)
from ._models import RedisManagerDBConfig
from ._utils import handle_redis_returns_union_types

__all__: tuple[str, ...] = (
"CouldNotAcquireLockError",
"CouldNotConnectToRedisError",
"exclusive",
"handle_redis_returns_union_types",
"LockLostError",
"RedisClientSDK",
"RedisClientsManager",
"RedisManagerDBConfig",
"start_exclusive_periodic_task",
)

# nopycln: file
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,38 @@
from asyncio import Task
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from typing import Final
from uuid import uuid4

import redis.asyncio as aioredis
import redis.exceptions
from common_library.errors_classes import OsparcErrorMixin
from pydantic import NonNegativeFloat, NonNegativeInt
from pydantic import NonNegativeFloat
from redis.asyncio.lock import Lock
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialBackoff
from settings_library.redis import RedisDatabase, RedisSettings
from tenacity import retry
from yarl import URL

from .background_task import periodic_task
from .logging_utils import log_catch, log_context
from .retry_policies import RedisRetryPolicyUponInitialization

_DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
_DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30)


_DEFAULT_DECODE_RESPONSES: Final[bool] = True
_DEFAULT_HEALTH_CHECK_INTERVAL: Final[datetime.timedelta] = datetime.timedelta(
seconds=5
from ..background_task import periodic_task
from ..logging_utils import log_catch
from ..retry_policies import RedisRetryPolicyUponInitialization
from ._constants import (
DEFAULT_DECODE_RESPONSES,
DEFAULT_HEALTH_CHECK_INTERVAL,
DEFAULT_LOCK_TTL,
DEFAULT_SOCKET_TIMEOUT,
)
_SHUTDOWN_TIMEOUT_S: Final[NonNegativeInt] = 5

from ._errors import CouldNotAcquireLockError, CouldNotConnectToRedisError
from ._utils import auto_extend_lock, cancel_or_warn

_logger = logging.getLogger(__name__)


class BaseRedisError(OsparcErrorMixin, RuntimeError):
...


class CouldNotAcquireLockError(BaseRedisError):
msg_template: str = "Lock {lock.name} could not be acquired!"


class CouldNotConnectToRedisError(BaseRedisError):
msg_template: str = "Connection to '{dsn}' failed"


async def _cancel_or_warn(task: Task) -> None:
if not task.cancelled():
task.cancel()
_, pending = await asyncio.wait((task,), timeout=_SHUTDOWN_TIMEOUT_S)
if pending:
task_name = task.get_name()
_logger.warning("Could not cancel task_name=%s pending=%s", task_name, pending)


@dataclass
class RedisClientSDK:
redis_dsn: str
client_name: str
decode_responses: bool = _DEFAULT_DECODE_RESPONSES
health_check_interval: datetime.timedelta = _DEFAULT_HEALTH_CHECK_INTERVAL
decode_responses: bool = DEFAULT_DECODE_RESPONSES
health_check_interval: datetime.timedelta = DEFAULT_HEALTH_CHECK_INTERVAL

_client: aioredis.Redis = field(init=False)
_health_check_task: Task | None = None
Expand All @@ -74,7 +47,7 @@ class RedisClientSDK:
def redis(self) -> aioredis.Redis:
return self._client

def __post_init__(self):
def __post_init__(self) -> None:
self._client = aioredis.from_url(
self.redis_dsn,
# Run 3 retries with exponential backoff strategy source: https://redis.readthedocs.io/en/stable/backoff.html
Expand All @@ -84,8 +57,8 @@ def __post_init__(self):
redis.exceptions.ConnectionError,
redis.exceptions.TimeoutError,
],
socket_timeout=_DEFAULT_SOCKET_TIMEOUT.total_seconds(),
socket_connect_timeout=_DEFAULT_SOCKET_TIMEOUT.total_seconds(),
socket_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(),
socket_connect_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(),
encoding="utf-8",
decode_responses=self.decode_responses,
client_name=self.client_name,
Expand Down Expand Up @@ -113,7 +86,7 @@ async def setup(self) -> None:
async def shutdown(self) -> None:
if self._health_check_task:
self._continue_health_checking = False
await _cancel_or_warn(self._health_check_task)
await cancel_or_warn(self._health_check_task)
self._health_check_task = None

await self._client.aclose(close_connection_pool=True)
Expand Down Expand Up @@ -165,7 +138,7 @@ async def lock_context(
2. `blocking==True` timeouts out while waiting for lock to be free (another entity holds the lock)
"""

total_lock_duration: datetime.timedelta = _DEFAULT_LOCK_TTL
total_lock_duration: datetime.timedelta = DEFAULT_LOCK_TTL
lock_unique_id = f"lock_extender_{lock_key}_{uuid4()}"

ttl_lock: Lock = self._client.lock(
Expand All @@ -178,15 +151,9 @@ async def lock_context(
if not await ttl_lock.acquire(token=lock_value):
raise CouldNotAcquireLockError(lock=ttl_lock)

async def _extend_lock(lock: Lock) -> None:
with log_context(
_logger, logging.DEBUG, f"Extending lock {lock_unique_id}"
), log_catch(_logger, reraise=False):
await lock.reacquire()

try:
async with periodic_task(
_extend_lock,
auto_extend_lock,
interval=total_lock_duration / 2,
task_name=lock_unique_id,
lock=ttl_lock,
Expand Down Expand Up @@ -224,51 +191,3 @@ async def _extend_lock(lock: Lock) -> None:
async def lock_value(self, lock_name: str) -> str | None:
output: str | None = await self._client.get(lock_name)
return output


@dataclass(frozen=True)
class RedisManagerDBConfig:
database: RedisDatabase
decode_responses: bool = _DEFAULT_DECODE_RESPONSES
health_check_interval: datetime.timedelta = _DEFAULT_HEALTH_CHECK_INTERVAL


@dataclass
class RedisClientsManager:
"""
Manages the lifetime of redis client sdk connections
"""

databases_configs: set[RedisManagerDBConfig]
settings: RedisSettings
client_name: str

_client_sdks: dict[RedisDatabase, RedisClientSDK] = field(default_factory=dict)

async def setup(self) -> None:
for config in self.databases_configs:
self._client_sdks[config.database] = RedisClientSDK(
redis_dsn=self.settings.build_redis_dsn(config.database),
decode_responses=config.decode_responses,
health_check_interval=config.health_check_interval,
client_name=f"{self.client_name}",
)

for client in self._client_sdks.values():
await client.setup()

async def shutdown(self) -> None:
# NOTE: somehow using logged_gather is not an option
# doing so will make the shutdown procedure hang
for client in self._client_sdks.values():
await client.shutdown()

def client(self, database: RedisDatabase) -> RedisClientSDK:
return self._client_sdks[database]

async def __aenter__(self) -> "RedisClientsManager":
await self.setup()
return self

async def __aexit__(self, *args):
await self.shutdown()
Loading

0 comments on commit 897cfca

Please sign in to comment.