diff --git a/.gitignore b/.gitignore index ba2740de7ff..681ae664847 100644 --- a/.gitignore +++ b/.gitignore @@ -47,6 +47,7 @@ htmlcov/ .cache nosetests.xml coverage.xml +cov.xml *.cover .hypothesis/ .pytest_cache/ diff --git a/packages/settings-library/src/settings_library/docker_registry.py b/packages/settings-library/src/settings_library/docker_registry.py index 4b59f662215..3043e46f54a 100644 --- a/packages/settings-library/src/settings_library/docker_registry.py +++ b/packages/settings-library/src/settings_library/docker_registry.py @@ -1,4 +1,5 @@ from functools import cached_property +from typing import Any, ClassVar from pydantic import Field, SecretStr, validator @@ -24,7 +25,7 @@ class RegistrySettings(BaseCustomSettings): @validator("REGISTRY_PATH", pre=True) @classmethod - def escape_none_string(cls, v): + def escape_none_string(cls, v) -> Any | None: return None if v == "None" else v @cached_property @@ -34,3 +35,16 @@ def resolved_registry_url(self) -> str: @cached_property def api_url(self) -> str: return f"{self.REGISTRY_URL}/v2" + + class Config(BaseCustomSettings.Config): + schema_extra: ClassVar[dict[str, Any]] = { # type: ignore[misc] + "examples": [ + { + "REGISTRY_AUTH": "True", + "REGISTRY_USER": "theregistryuser", + "REGISTRY_PW": "some_secret_value", + "REGISTRY_SSL": "True", + "REGISTRY_URL": "registry.osparc-master.speag.com", + } + ], + } diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index e5528c4ee61..69926eee67a 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -100,6 +100,11 @@ class EC2InstancesSettings(BaseCustomSettings): " (https://docs.aws.amazon.com/vpc/latest/userguide/configure-subnets.html), " "this is required to start a new EC2 instance", ) + EC2_INSTANCES_TIME_BEFORE_DRAINING: datetime.timedelta = Field( + default=datetime.timedelta(seconds=20), + description="Time after which an EC2 instance may be drained (10s<=T<=1 minutes, is automatically capped)" + "(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)", + ) EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field( default=datetime.timedelta(minutes=1), description="Time after which an EC2 instance may be terminated (0<=T<=59 minutes, is automatically capped)" @@ -111,9 +116,22 @@ class EC2InstancesSettings(BaseCustomSettings): "a tag must have a key and an optional value. see [https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html]", ) + @validator("EC2_INSTANCES_TIME_BEFORE_DRAINING") + @classmethod + def ensure_draining_delay_time_is_in_range( + cls, value: datetime.timedelta + ) -> datetime.timedelta: + if value < datetime.timedelta(seconds=10): + value = datetime.timedelta(seconds=10) + elif value > datetime.timedelta(minutes=1): + value = datetime.timedelta(minutes=1) + return value + @validator("EC2_INSTANCES_TIME_BEFORE_TERMINATION") @classmethod - def ensure_time_is_in_range(cls, value): + def ensure_termination_delay_time_is_in_range( + cls, value: datetime.timedelta + ) -> datetime.timedelta: if value < datetime.timedelta(minutes=0): value = datetime.timedelta(minutes=0) elif value > datetime.timedelta(minutes=59): diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index 387629d11d1..29085c53419 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -34,6 +34,9 @@ def __post_init__(self) -> None: if self.available_resources == Resources.create_as_empty(): object.__setattr__(self, "available_resources", self.ec2_instance.resources) + def has_assigned_tasks(self) -> bool: + return bool(self.available_resources < self.ec2_instance.resources) + @dataclass(frozen=True, kw_only=True, slots=True) class AssociatedInstance(_BaseInstance): diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 39b26599b58..f6b1648d034 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -751,23 +751,61 @@ async def _scale_up_cluster( return cluster +async def _find_drainable_nodes( + app: FastAPI, cluster: Cluster +) -> list[AssociatedInstance]: + app_settings: ApplicationSettings = app.state.settings + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + + if not cluster.active_nodes: + # there is nothing to drain here + return [] + + # get the corresponding ec2 instance data + drainable_nodes: list[AssociatedInstance] = [] + + for instance in cluster.active_nodes: + if instance.has_assigned_tasks(): + await utils_docker.set_node_found_empty( + get_docker_client(app), instance.node, empty=False + ) + continue + node_last_empty = await utils_docker.get_node_empty_since(instance.node) + if not node_last_empty: + await utils_docker.set_node_found_empty( + get_docker_client(app), instance.node, empty=True + ) + continue + elapsed_time_since_empty = arrow.utcnow().datetime - node_last_empty + _logger.debug("%s", f"{node_last_empty=}, {elapsed_time_since_empty=}") + if ( + elapsed_time_since_empty + > app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING + ): + drainable_nodes.append(instance) + else: + _logger.info( + "%s has still %ss before being drainable", + f"{instance.ec2_instance.id=}", + f"{(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING - elapsed_time_since_empty).total_seconds():.0f}", + ) + + if drainable_nodes: + _logger.info( + "the following nodes were found to be drainable: '%s'", + f"{[instance.node.Description.Hostname for instance in drainable_nodes if instance.node.Description]}", + ) + return drainable_nodes + + async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: app_settings = get_application_settings(app) docker_client = get_docker_client(app) - active_empty_instances: list[AssociatedInstance] = [] - active_non_empty_instances: list[AssociatedInstance] = [] - for instance in cluster.active_nodes: - if instance.available_resources == instance.ec2_instance.resources: - active_empty_instances.append(instance) - else: - active_non_empty_instances.append(instance) + active_empty_instances = await _find_drainable_nodes(app, cluster) if not active_empty_instances: return cluster - _logger.info( - "following nodes will be drained: '%s'", - f"{[instance.node.Description.Hostname for instance in active_empty_instances if instance.node.Description]}", - ) + # drain this empty nodes updated_nodes: list[Node] = await asyncio.gather( *( @@ -782,7 +820,7 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: ) if updated_nodes: _logger.info( - "following nodes set to drain: '%s'", + "following nodes were set to drain: '%s'", f"{[node.Description.Hostname for node in updated_nodes if node.Description]}", ) newly_drained_instances = [ @@ -791,7 +829,9 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: ] return dataclasses.replace( cluster, - active_nodes=active_non_empty_instances, + active_nodes=[ + n for n in cluster.active_nodes if n not in active_empty_instances + ], drained_nodes=cluster.drained_nodes + newly_drained_instances, ) @@ -814,7 +854,7 @@ async def _find_terminateable_instances( elapsed_time_since_drained = ( datetime.datetime.now(datetime.timezone.utc) - node_last_updated ) - _logger.warning("%s", f"{node_last_updated=}, {elapsed_time_since_drained=}") + _logger.debug("%s", f"{node_last_updated=}, {elapsed_time_since_drained=}") if ( elapsed_time_since_drained > app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index a70319801d6..d879dfb4406 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -71,6 +71,11 @@ ] +_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as( + DockerLabelKey, "io.simcore.osparc-node-found-empty" +) + + async def get_monitored_nodes( docker_client: AutoscalingDocker, node_labels: list[DockerLabelKey] ) -> list[Node]: @@ -609,6 +614,38 @@ def get_node_last_readyness_update(node: Node) -> datetime.datetime: ) # mypy +async def set_node_found_empty( + docker_client: AutoscalingDocker, + node: Node, + *, + empty: bool, +) -> Node: + assert node.Spec # nosec + new_tags = deepcopy(cast(dict[DockerLabelKey, str], node.Spec.Labels)) + if empty: + new_tags[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY] = arrow.utcnow().isoformat() + else: + new_tags.pop(_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, None) + return await tag_node( + docker_client, + node, + tags=new_tags, + available=bool(node.Spec.Availability is Availability.active), + ) + + +async def get_node_empty_since(node: Node) -> datetime.datetime | None: + """returns the last time when the node was found empty or None if it was not empty""" + assert node.Spec # nosec + assert node.Spec.Labels # nosec + if _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY not in node.Spec.Labels: + return None + return cast( + datetime.datetime, + arrow.get(node.Spec.Labels[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY]).datetime, + ) # mypy + + async def attach_node( app_settings: ApplicationSettings, docker_client: AutoscalingDocker, diff --git a/services/autoscaling/tests/manual/.env-devel b/services/autoscaling/tests/manual/.env-devel index 4ff5907de69..fd5908c9214 100644 --- a/services/autoscaling/tests/manual/.env-devel +++ b/services/autoscaling/tests/manual/.env-devel @@ -8,6 +8,7 @@ AUTOSCALING_EC2_REGION_NAME=us-east-1 AUTOSCALING_REMOTE_DEBUGGING_PORT=3000 EC2_INSTANCES_MACHINES_BUFFER=0 EC2_INSTANCES_MAX_INSTANCES=20 +EC2_INSTANCES_TIME_BEFORE_DRAINING="00:00:10" EC2_INSTANCES_TIME_BEFORE_TERMINATION="00:03:00" EC2_INSTANCES_ALLOWED_TYPES='{"t2.micro": {"ami_id": "XXXXXXXX", "custom_boot_scripts": ["whoami"], "pre_pull_images": ["ubuntu:latest"]}}' EC2_INSTANCES_KEY_NAME=XXXXXXXXXX diff --git a/services/autoscaling/tests/unit/test_core_settings.py b/services/autoscaling/tests/unit/test_core_settings.py index 81b3c8fc487..89c12efc602 100644 --- a/services/autoscaling/tests/unit/test_core_settings.py +++ b/services/autoscaling/tests/unit/test_core_settings.py @@ -88,6 +88,26 @@ def test_defining_both_computational_and_dynamic_modes_is_invalid_and_raises( ApplicationSettings.create_from_envs() +def test_invalid_EC2_INSTANCES_TIME_BEFORE_DRAINING( # noqa: N802 + app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch +): + setenvs_from_dict(monkeypatch, {"EC2_INSTANCES_TIME_BEFORE_DRAINING": "1:05:00"}) + settings = ApplicationSettings.create_from_envs() + assert settings.AUTOSCALING_EC2_INSTANCES + assert settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING + assert ( + datetime.timedelta(minutes=1) + == settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING + ) + setenvs_from_dict(monkeypatch, {"EC2_INSTANCES_TIME_BEFORE_DRAINING": "-1:05:00"}) + settings = ApplicationSettings.create_from_envs() + assert settings.AUTOSCALING_EC2_INSTANCES + assert ( + datetime.timedelta(seconds=10) + == settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING + ) + + def test_invalid_EC2_INSTANCES_TIME_BEFORE_TERMINATION( # noqa: N802 app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch ): diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index f51c36e191c..20ec37b2198 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -44,6 +44,7 @@ from simcore_service_autoscaling.modules.docker import get_docker_client from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API from simcore_service_autoscaling.utils.utils_docker import ( + _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, _OSPARC_SERVICE_READY_LABEL_KEY, _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, ) @@ -581,7 +582,40 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # 4. now scaling down, as we deleted all the tasks # del dask_future + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + mock_dask_is_worker_connected.assert_called_once() + mock_dask_is_worker_connected.reset_mock() + mock_dask_get_worker_has_results_in_memory.assert_called() + assert mock_dask_get_worker_has_results_in_memory.call_count == 2 + mock_dask_get_worker_has_results_in_memory.reset_mock() + mock_dask_get_worker_used_resources.assert_called() + assert mock_dask_get_worker_used_resources.call_count == 2 + mock_dask_get_worker_used_resources.reset_mock() + # the node shall be waiting before draining + mock_docker_set_node_availability.assert_not_called() + mock_docker_tag_node.assert_called_once_with( + get_docker_client(initialized_app), + fake_attached_node, + tags=fake_attached_node.Spec.Labels + | { + _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: mock.ANY, + }, + available=True, + ) + mock_docker_tag_node.reset_mock() + + # now update the fake node to have the required label as expected + assert app_settings.AUTOSCALING_EC2_INSTANCES + fake_attached_node.Spec.Labels[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY] = ( + arrow.utcnow() + .shift( + seconds=-app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING.total_seconds() + - 1 + ) + .datetime.isoformat() + ) + # now it will drain await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) mock_dask_is_worker_connected.assert_called_once() mock_dask_is_worker_connected.reset_mock() @@ -598,6 +632,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 fake_attached_node, tags=fake_attached_node.Spec.Labels | { + _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: mock.ANY, _OSPARC_SERVICE_READY_LABEL_KEY: "false", _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, }, diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index d7e76c8cb0f..d8743dcd078 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -40,7 +40,6 @@ from simcore_service_autoscaling.models import AssociatedInstance, Cluster from simcore_service_autoscaling.modules.auto_scaling_core import ( _activate_drained_nodes, - _deactivate_empty_nodes, _find_terminateable_instances, _try_scale_down_cluster, auto_scale_cluster, @@ -53,6 +52,7 @@ get_docker_client, ) from simcore_service_autoscaling.utils.utils_docker import ( + _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, _OSPARC_SERVICE_READY_LABEL_KEY, _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, ) @@ -669,6 +669,32 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 instance_type=expected_ec2_type, instance_state="running", ) + # the node shall be waiting before draining + mock_docker_set_node_availability.assert_not_called() + mock_docker_tag_node.assert_called_once_with( + get_docker_client(initialized_app), + fake_attached_node, + tags=fake_attached_node.Spec.Labels + | { + _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: mock.ANY, + }, + available=True, + ) + mock_docker_tag_node.reset_mock() + + # now update the fake node to have the required label as expected + assert app_settings.AUTOSCALING_EC2_INSTANCES + fake_attached_node.Spec.Labels[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY] = ( + arrow.utcnow() + .shift( + seconds=-app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING.total_seconds() + - 1 + ) + .datetime.isoformat() + ) + + # now it will drain + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) mock_docker_set_node_availability.assert_not_called() mock_docker_tag_node.assert_called_once_with( get_docker_client(initialized_app), @@ -1034,150 +1060,6 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( ) -async def test__deactivate_empty_nodes( - minimal_configuration: None, - initialized_app: FastAPI, - cluster: Callable[..., Cluster], - host_node: Node, - fake_ec2_instance_data: Callable[..., EC2InstanceData], - mock_docker_set_node_availability: mock.Mock, - mock_docker_tag_node: mock.Mock, - with_drain_nodes_labelled: bool, -): - # since we have no service running, we expect the passed node to be set to drain - active_cluster = cluster( - active_nodes=[ - AssociatedInstance(node=host_node, ec2_instance=fake_ec2_instance_data()) - ] - ) - updated_cluster = await _deactivate_empty_nodes(initialized_app, active_cluster) - assert not updated_cluster.active_nodes - assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) - mock_docker_set_node_availability.assert_not_called() - assert host_node.Spec - assert host_node.Spec.Labels - mock_docker_tag_node.assert_called_once_with( - mock.ANY, - host_node, - tags=host_node.Spec.Labels - | { - _OSPARC_SERVICE_READY_LABEL_KEY: "false", - _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, - }, - available=with_drain_nodes_labelled, - ) - - -async def test__deactivate_empty_nodes_to_drain_when_services_running_are_missing_labels( - minimal_configuration: None, - initialized_app: FastAPI, - cluster: Callable[..., Cluster], - host_node: Node, - fake_ec2_instance_data: Callable[..., EC2InstanceData], - mock_docker_set_node_availability: mock.Mock, - mock_docker_tag_node: mock.Mock, - create_service: Callable[ - [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] - ], - task_template: dict[str, Any], - create_task_reservations: Callable[[int, int], dict[str, Any]], - host_cpu_count: int, - with_drain_nodes_labelled: bool, -): - # create a service that runs without task labels - task_template_that_runs = task_template | create_task_reservations( - int(host_cpu_count / 2 + 1), 0 - ) - await create_service( - task_template_that_runs, - {}, - "running", - ) - active_cluster = cluster( - active_nodes=[ - AssociatedInstance(node=host_node, ec2_instance=fake_ec2_instance_data()) - ] - ) - updated_cluster = await _deactivate_empty_nodes(initialized_app, active_cluster) - assert not updated_cluster.active_nodes - assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) - mock_docker_set_node_availability.assert_not_called() - assert host_node.Spec - assert host_node.Spec.Labels - mock_docker_tag_node.assert_called_once_with( - mock.ANY, - host_node, - tags=host_node.Spec.Labels - | { - _OSPARC_SERVICE_READY_LABEL_KEY: "false", - _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY, - }, - available=with_drain_nodes_labelled, - ) - - -async def test__deactivate_empty_nodes_does_not_drain_if_service_is_running_with_correct_labels( - minimal_configuration: None, - app_settings: ApplicationSettings, - initialized_app: FastAPI, - cluster: Callable[..., Cluster], - host_node: Node, - fake_ec2_instance_data: Callable[..., EC2InstanceData], - mock_docker_set_node_availability: mock.Mock, - mock_docker_tag_node: mock.Mock, - create_service: Callable[ - [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] - ], - task_template: dict[str, Any], - create_task_reservations: Callable[[int, int], dict[str, Any]], - service_monitored_labels: dict[DockerLabelKey, str], - host_cpu_count: int, -): - # create a service that runs without task labels - task_template_that_runs = task_template | create_task_reservations( - int(host_cpu_count / 2 + 1), 0 - ) - assert app_settings.AUTOSCALING_NODES_MONITORING - await create_service( - task_template_that_runs, - service_monitored_labels, - "running", - ) - - # since we have no service running, we expect the passed node to be set to drain - assert host_node.Description - assert host_node.Description.Resources - assert host_node.Description.Resources.NanoCPUs - host_node_resources = Resources.parse_obj( - { - "ram": host_node.Description.Resources.MemoryBytes, - "cpus": host_node.Description.Resources.NanoCPUs / 10**9, - } - ) - fake_ec2_instance = fake_ec2_instance_data(resources=host_node_resources) - fake_associated_instance = AssociatedInstance( - node=host_node, ec2_instance=fake_ec2_instance - ) - node_used_resources = await DynamicAutoscaling().compute_node_used_resources( - initialized_app, fake_associated_instance - ) - assert node_used_resources - - active_cluster = cluster( - active_nodes=[ - AssociatedInstance( - node=host_node, - ec2_instance=fake_ec2_instance, - available_resources=host_node_resources - node_used_resources, - ) - ] - ) - updated_cluster = await _deactivate_empty_nodes(initialized_app, active_cluster) - assert updated_cluster == active_cluster - mock_docker_set_node_availability.assert_not_called() - mock_docker_tag_node.assert_not_called() - - async def test__find_terminateable_nodes_with_no_hosts( minimal_configuration: None, initialized_app: FastAPI, diff --git a/services/autoscaling/tests/unit/test_utils_docker.py b/services/autoscaling/tests/unit/test_utils_docker.py index e21a720cfca..730e2510e3b 100644 --- a/services/autoscaling/tests/unit/test_utils_docker.py +++ b/services/autoscaling/tests/unit/test_utils_docker.py @@ -11,6 +11,7 @@ from typing import Any import aiodocker +import arrow import pytest from aws_library.ec2.models import EC2InstanceData, Resources from deepdiff import DeepDiff @@ -33,9 +34,11 @@ from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.utils_envs import EnvVarsDict from servicelib.docker_utils import to_datetime +from settings_library.docker_registry import RegistrySettings from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.modules.docker import AutoscalingDocker from simcore_service_autoscaling.utils.utils_docker import ( + _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, _OSPARC_SERVICE_READY_LABEL_KEY, _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, Node, @@ -46,22 +49,28 @@ compute_node_used_resources, compute_tasks_needed_resources, find_node_with_name, + get_docker_login_on_start_bash_command, get_docker_pull_images_crontab, get_docker_pull_images_on_start_bash_command, get_docker_swarm_join_bash_command, get_max_resources_from_docker_task, get_monitored_nodes, get_new_node_docker_tags, + get_node_empty_since, + get_node_last_readyness_update, get_node_total_resources, + get_task_instance_restriction, get_worker_nodes, is_node_osparc_ready, is_node_ready_and_available, pending_service_tasks_with_insufficient_resources, remove_nodes, set_node_availability, + set_node_found_empty, set_node_osparc_ready, tag_node, ) +from types_aiobotocore_ec2.literals import InstanceTypeType @pytest.fixture @@ -320,12 +329,12 @@ async def test_pending_service_task_with_insufficient_resources_with_service_lac diff = DeepDiff( pending_tasks[0], service_tasks[0], - exclude_paths={ + exclude_paths=[ "UpdatedAt", "Version", "root['Status']['Err']", "root['Status']['Timestamp']", - }, + ], ) assert not diff, f"{diff}" @@ -388,12 +397,12 @@ async def test_pending_service_task_with_insufficient_resources_with_labelled_se diff = DeepDiff( pending_tasks[0], service_tasks[0], - exclude_paths={ + exclude_paths=[ "UpdatedAt", "Version", "root['Status']['Err']", "root['Status']['Timestamp']", - }, + ], ) assert not diff, f"{diff}" @@ -563,6 +572,61 @@ async def test_get_resources_from_docker_task_with_reservations_and_limits_retur ) +@pytest.mark.parametrize( + "placement_constraints, expected_instance_type", + [ + (None, None), + (["blahblah==true", "notsoblahblah!=true"], None), + (["blahblah==true", "notsoblahblah!=true", "node.labels.blahblah==true"], None), + ( + [ + "blahblah==true", + "notsoblahblah!=true", + f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}==true", + ], + None, + ), + ( + [ + "blahblah==true", + "notsoblahblah!=true", + f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}==t3.medium", + ], + "t3.medium", + ), + ], +) +async def test_get_task_instance_restriction( + autoscaling_docker: AutoscalingDocker, + host_node: Node, + create_service: Callable[ + [dict[str, Any], dict[DockerLabelKey, str] | None, str, list[str] | None], + Awaitable[Service], + ], + task_template: dict[str, Any], + create_task_reservations: Callable[[int, int], dict[str, Any]], + faker: Faker, + placement_constraints: list[str] | None, + expected_instance_type: InstanceTypeType | None, +): + # this one has no instance restriction + service = await create_service( + task_template, + None, + "pending" if placement_constraints else "running", + placement_constraints, + ) + assert service.Spec + service_tasks = parse_obj_as( + list[Task], + await autoscaling_docker.tasks.list(filters={"service": service.Spec.Name}), + ) + instance_type_or_none = await get_task_instance_restriction( + autoscaling_docker, service_tasks[0] + ) + assert instance_type_or_none == expected_instance_type + + async def test_compute_tasks_needed_resources( autoscaling_docker: AutoscalingDocker, host_node: Node, @@ -595,6 +659,7 @@ async def test_compute_tasks_needed_resources( ) all_tasks = service_tasks for s in services: + assert s.Spec service_tasks = parse_obj_as( list[Task], await autoscaling_docker.tasks.list(filters={"service": s.Spec.Name}), @@ -653,7 +718,7 @@ async def test_compute_node_used_resources_with_service( # 3. if we look for services with some other label, they should then become invisible again node_used_resources = await compute_node_used_resources( - autoscaling_docker, host_node, service_labels=[faker.pystr()] + autoscaling_docker, host_node, service_labels=[DockerLabelKey(faker.pystr())] ) assert node_used_resources == Resources(cpus=0, ram=ByteSize(0)) # 4. if we look for services with 1 correct label, they should then become visible again @@ -798,6 +863,17 @@ async def test_get_docker_swarm_join_script_returning_unexpected_command_raises( await asyncio.sleep(2) +def test_get_docker_login_on_start_bash_command(): + registry_settings = RegistrySettings( + **RegistrySettings.Config.schema_extra["examples"][0] + ) + returned_command = get_docker_login_on_start_bash_command(registry_settings) + assert ( + f'echo "{registry_settings.REGISTRY_PW.get_secret_value()}" | docker login --username {registry_settings.REGISTRY_USER} --password-stdin {registry_settings.resolved_registry_url}' + == returned_command + ) + + async def test_try_get_node_with_name( autoscaling_docker: AutoscalingDocker, host_node: Node ): @@ -1082,6 +1158,8 @@ async def test_set_node_osparc_ready( ): # initial state assert is_node_ready_and_available(host_node, availability=Availability.active) + host_node_last_readyness_update = get_node_last_readyness_update(host_node) + assert host_node_last_readyness_update # set the node to drain updated_node = await set_node_availability( autoscaling_docker, host_node, available=False @@ -1089,6 +1167,9 @@ async def test_set_node_osparc_ready( assert is_node_ready_and_available(updated_node, availability=Availability.drain) # the node is also not osparc ready assert not is_node_osparc_ready(updated_node) + # the node readyness label was not updated here + updated_last_readyness = get_node_last_readyness_update(updated_node) + assert updated_last_readyness == host_node_last_readyness_update # this implicitely make the node active as well updated_node = await set_node_osparc_ready( @@ -1096,12 +1177,58 @@ async def test_set_node_osparc_ready( ) assert is_node_ready_and_available(updated_node, availability=Availability.active) assert is_node_osparc_ready(updated_node) + updated_last_readyness = get_node_last_readyness_update(updated_node) + assert updated_last_readyness > host_node_last_readyness_update # make it not osparc ready updated_node = await set_node_osparc_ready( app_settings, autoscaling_docker, host_node, ready=False ) assert not is_node_osparc_ready(updated_node) assert is_node_ready_and_available(updated_node, availability=Availability.drain) + assert get_node_last_readyness_update(updated_node) > updated_last_readyness + + +async def test_set_node_found_empty( + disabled_rabbitmq: None, + disabled_ec2: None, + mocked_redis_server: None, + enabled_dynamic_mode: EnvVarsDict, + disable_dynamic_service_background_task: None, + host_node: Node, + autoscaling_docker: AutoscalingDocker, +): + # initial state + assert is_node_ready_and_available(host_node, availability=Availability.active) + assert host_node.Spec + assert host_node.Spec.Labels + assert _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY not in host_node.Spec.Labels + + # the date is in the future as nothing was done + node_empty_since = await get_node_empty_since(host_node) + assert node_empty_since is None + + # now we set it to empty + updated_node = await set_node_found_empty(autoscaling_docker, host_node, empty=True) + assert updated_node.Spec + assert updated_node.Spec.Labels + assert _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY in updated_node.Spec.Labels + + # we can get that empty date back + node_empty_since = await get_node_empty_since(updated_node) + assert node_empty_since is not None + assert node_empty_since < arrow.utcnow().datetime + + # now we remove the empty label + updated_node = await set_node_found_empty( + autoscaling_docker, host_node, empty=False + ) + assert updated_node.Spec + assert updated_node.Spec.Labels + assert _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY not in updated_node.Spec.Labels + + # we can get the date again in the future + node_empty_since = await get_node_empty_since(updated_node) + assert node_empty_since is None async def test_attach_node( diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py index 7b5a50ed140..579267de6b4 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py @@ -90,6 +90,12 @@ class WorkersEC2InstancesSettings(BaseCustomSettings): "this is required to start a new EC2 instance", ) + WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING: datetime.timedelta = Field( + default=datetime.timedelta(minutes=1), + description="Time after which an EC2 instance may be terminated (min 0 max 1 minute) " + "(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)", + ) + WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field( default=datetime.timedelta(minutes=3), description="Time after which an EC2 instance may be terminated (min 0, max 59 minutes) " diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml index d078da7beb3..a5b24f37cef 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml @@ -102,6 +102,7 @@ services: EC2_INSTANCES_NAME_PREFIX: ${EC2_INSTANCES_NAME_PREFIX} EC2_INSTANCES_SECURITY_GROUP_IDS: ${WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS} EC2_INSTANCES_SUBNET_ID: ${WORKERS_EC2_INSTANCES_SUBNET_ID} + EC2_INSTANCES_TIME_BEFORE_DRAINING: ${WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING} EC2_INSTANCES_TIME_BEFORE_TERMINATION: ${WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION} LOG_FORMAT_LOCAL_DEV_ENABLED: 1 LOG_LEVEL: ${LOG_LEVEL:-WARNING} diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py index 043783c5116..d7aaa2303e3 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py @@ -98,6 +98,7 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str: f"WORKERS_EC2_INSTANCES_MAX_INSTANCES={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_MAX_INSTANCES}", f"WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS={_convert_to_env_list(app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS)}", f"WORKERS_EC2_INSTANCES_SUBNET_ID={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_SUBNET_ID}", + f"WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING}", f"WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION={app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES.WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION}", ] diff --git a/services/clusters-keeper/tests/manual/README.md b/services/clusters-keeper/tests/manual/README.md index e48d1423530..4ef8e0bd72c 100644 --- a/services/clusters-keeper/tests/manual/README.md +++ b/services/clusters-keeper/tests/manual/README.md @@ -78,6 +78,7 @@ WORKERS_EC2_INSTANCES_KEY_NAME=XXXXXXX WORKERS_EC2_INSTANCES_MAX_INSTANCES=10 WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS="[\"XXXXXXX\"]" WORKERS_EC2_INSTANCES_SUBNET_ID=XXXXXXX +WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING="00:00:20" WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION="00:03:00" WORKERS_EC2_INSTANCES_CUSTOM_TAGS='{"osparc-tag": "some fun tag value"}' ``` diff --git a/services/docker-compose.yml b/services/docker-compose.yml index bc04bc8995c..7b025b5ff67 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -92,6 +92,7 @@ services: EC2_INSTANCES_SECURITY_GROUP_IDS: ${EC2_INSTANCES_SECURITY_GROUP_IDS} EC2_INSTANCES_SUBNET_ID: ${EC2_INSTANCES_SUBNET_ID} EC2_INSTANCES_KEY_NAME: ${EC2_INSTANCES_KEY_NAME} + EC2_INSTANCES_TIME_BEFORE_DRAINING: ${EC2_INSTANCES_TIME_BEFORE_DRAINING} EC2_INSTANCES_TIME_BEFORE_TERMINATION: ${EC2_INSTANCES_TIME_BEFORE_TERMINATION} EC2_INSTANCES_CUSTOM_TAGS: ${EC2_INSTANCES_CUSTOM_TAGS} @@ -194,6 +195,7 @@ services: SWARM_STACK_NAME: ${SWARM_STACK_NAME} CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES: ${CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES} WORKERS_EC2_INSTANCES_ALLOWED_TYPES: ${WORKERS_EC2_INSTANCES_ALLOWED_TYPES} + WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING: ${WORKERS_EC2_INSTANCES_TIME_BEFORE_DRAINING} WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION: ${WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION} WORKERS_EC2_INSTANCES_KEY_NAME: ${WORKERS_EC2_INSTANCES_KEY_NAME} WORKERS_EC2_INSTANCES_MAX_INSTANCES: ${WORKERS_EC2_INSTANCES_MAX_INSTANCES}