Skip to content

Commit

Permalink
♻️ do not send heartbeat as soon as the service is started (#5168)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Dec 15, 2023
1 parent 4f38ab4 commit bda5bc6
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 23 deletions.
8 changes: 7 additions & 1 deletion packages/service-library/src/servicelib/background_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed

from .decorators import async_delayed

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -48,13 +50,17 @@ def start_periodic_task(
*,
interval: datetime.timedelta,
task_name: str,
wait_before_running: datetime.timedelta = datetime.timedelta(0),
**kwargs,
) -> asyncio.Task:
with log_context(
logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
):
delayed_periodic_scheduled_task = async_delayed(wait_before_running)(
_periodic_scheduled_task
)
return asyncio.create_task(
_periodic_scheduled_task(
delayed_periodic_scheduled_task(
task,
interval=interval,
task_name=task_name,
Expand Down
31 changes: 24 additions & 7 deletions packages/service-library/src/servicelib/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,49 @@
I order to avoid cyclic dependences, please
DO NOT IMPORT ANYTHING from .
"""
import asyncio
import datetime
import logging
from collections.abc import Callable, Coroutine
from copy import deepcopy
from functools import wraps
from typing import Any

log = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)


def safe_return(if_fails_return=False, catch=None, logger=None):
def safe_return(if_fails_return=False, catch=None, logger=None): # noqa: FBT002
# defaults
if catch is None:
catch = (RuntimeError,)
if logger is None:
logger = log
logger = _logger

def decorate(func):
@wraps(func)
def safe_func(*args, **kargs):
def safe_func(*args, **kwargs):
try:
res = func(*args, **kargs)
return res
return func(*args, **kwargs)
except catch as err:
logger.info("%s failed: %s", func.__name__, str(err))
except Exception: # pylint: disable=broad-except
logger.info("%s failed unexpectedly", func.__name__, exc_info=True)
return deepcopy(if_fails_return) # avoid issues with default mutables
return deepcopy(if_fails_return) # avoid issues with default mutable

return safe_func

return decorate


def async_delayed(
interval: datetime.timedelta,
) -> Callable[..., Callable[..., Coroutine]]:
def decorator(func) -> Callable[..., Coroutine]:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
await asyncio.sleep(interval.total_seconds())
return await func(*args, **kwargs)

return wrapper

return decorator
34 changes: 27 additions & 7 deletions packages/service-library/tests/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,46 @@
# pylint:disable=unused-argument
# pylint:disable=redefined-outer-name

from servicelib.decorators import safe_return
from datetime import timedelta

from servicelib.decorators import async_delayed, safe_return


def test_safe_return_decorator():
class MyException(Exception):
class AnError(Exception):
pass

@safe_return(if_fails_return=False, catch=(MyException,), logger=None)
@safe_return(if_fails_return=False, catch=(AnError,), logger=None)
def raise_my_exception():
raise MyException()
raise AnError

assert not raise_my_exception()


def test_safe_return_mutables():
some_mutable_return = ["some", "defaults"]

@safe_return(if_fails_return=some_mutable_return)
@safe_return(if_fails_return=some_mutable_return) # type: ignore
def return_mutable():
raise RuntimeError("Runtime is default")
msg = "Runtime is default"
raise RuntimeError(msg)

assert return_mutable() == some_mutable_return # contains the same
assert not (return_mutable() is some_mutable_return) # but is not the same
assert return_mutable() is not some_mutable_return # but is not the same


async def test_async_delayed():
@async_delayed(timedelta(seconds=0.2))
async def decorated_awaitable() -> int:
return 42

assert await decorated_awaitable() == 42

async def another_awaitable() -> int:
return 42

decorated_another_awaitable = async_delayed(timedelta(seconds=0.2))(
another_awaitable
)

assert await decorated_another_awaitable() == 42
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

import pytest
from httpx import (
ConnectError,
HTTPError,
PoolTimeout,
Request,
RequestError,
Response,
TransportError,
codes,
)
from pydantic import AnyHttpUrl, parse_obj_as
from pytest import LogCaptureFixture
from respx import MockRouter
from simcore_service_director_v2.modules.dynamic_sidecar.api_client._base import (
BaseThinClient,
Expand Down Expand Up @@ -79,13 +78,13 @@ async def test_connection_error(
await thick_client.get_provided_url(test_url)

assert isinstance(exe_info.value, ClientHttpError)
assert isinstance(exe_info.value.error, ConnectError)
assert isinstance(exe_info.value.error, TransportError)


async def test_retry_on_errors(
request_timeout: int,
test_url: AnyHttpUrl,
caplog_info_level: LogCaptureFixture,
caplog_info_level: pytest.LogCaptureFixture,
) -> None:
client = FakeThickClient(request_timeout=request_timeout)

Expand All @@ -95,10 +94,10 @@ async def test_retry_on_errors(
_assert_messages(caplog_info_level.messages)


@pytest.mark.parametrize("error_class", [ConnectError, PoolTimeout])
@pytest.mark.parametrize("error_class", [TransportError, PoolTimeout])
async def test_retry_on_errors_by_error_type(
error_class: type[RequestError],
caplog_info_level: LogCaptureFixture,
caplog_info_level: pytest.LogCaptureFixture,
request_timeout: int,
test_url: AnyHttpUrl,
) -> None:
Expand All @@ -107,7 +106,7 @@ class ATestClient(BaseThinClient):
@retry_on_errors
async def raises_request_error(self) -> Response:
raise error_class(
"mock_connect_error",
"mock_connect_error", # noqa: EM101
request=Request(method="GET", url=test_url),
)

Expand All @@ -134,7 +133,8 @@ class ATestClient(BaseThinClient):
# pylint: disable=no-self-use
@retry_on_errors
async def raises_http_error(self) -> Response:
raise HTTPError("mock_http_error")
msg = "mock_http_error"
raise HTTPError(msg)

client = ATestClient(request_timeout=request_timeout)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def _start_heart_beat_task(app: FastAPI) -> None:
app=app,
interval=resource_tracking_settings.RESOURCE_TRACKING_HEARTBEAT_INTERVAL,
task_name="resource_tracking_heart_beat",
wait_before_running=resource_tracking_settings.RESOURCE_TRACKING_HEARTBEAT_INTERVAL,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ async def test_user_services_fail_to_stop_or_save_data(

await _wait_for_containers_to_be_running(app)

# let a few heartbeats pass
await asyncio.sleep(_BASE_HEART_BEAT_INTERVAL * 2)

# in case of manual intervention multiple stops will be sent
_EXPECTED_STOP_MESSAGES = 4
for _ in range(_EXPECTED_STOP_MESSAGES):
Expand Down

0 comments on commit bda5bc6

Please sign in to comment.