From 07c200e79a921d447e6455b6cef86cce0850bcbd Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Tue, 10 Sep 2024 21:18:15 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9BAutoscaling/Comp=20backend:=20drain?= =?UTF-8?q?=20retired=20nodes=20so=20that=20they=20can=20be=20re-used=20(#?= =?UTF-8?q?6345)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/simcore_service_autoscaling/models.py | 10 ++++- .../modules/auto_scaling_core.py | 44 ++++++++++++++++++- .../modules/auto_scaling_mode_base.py | 5 +++ .../auto_scaling_mode_computational.py | 8 ++++ .../modules/auto_scaling_mode_dynamic.py | 7 +++ .../modules/dask.py | 25 ++++++++++- .../manual/docker-compose-computational.yml | 3 +- services/autoscaling/tests/unit/conftest.py | 1 + 8 files changed, 98 insertions(+), 5 deletions(-) diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index 2134b6ce36f..74ce27aae19 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -98,6 +98,11 @@ class Cluster: # pylint: disable=too-many-instance-attributes "description": "This is a EC2-backed docker node which is docker drained and waiting for termination" } ) + retired_nodes: list[AssociatedInstance] = field( + metadata={ + "description": "This is a EC2-backed docker node which was retired and waiting to be drained and eventually terminated or re-used" + } + ) terminated_instances: list[NonAssociatedInstance] def can_scale_down(self) -> bool: @@ -107,6 +112,7 @@ def can_scale_down(self) -> bool: or self.drained_nodes or self.pending_ec2s or self.terminating_nodes + or self.retired_nodes ) def total_number_of_machines(self) -> int: @@ -119,6 +125,7 @@ def total_number_of_machines(self) -> int: + len(self.pending_ec2s) + len(self.broken_ec2s) + len(self.terminating_nodes) + + len(self.retired_nodes) ) def __repr__(self) -> str: @@ -137,7 +144,8 @@ def _get_instance_ids( f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, " f"disconnected-nodes: count={len(self.disconnected_nodes)}, " f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, " - f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)}, " + f"retired-nodes: count={len(self.retired_nodes)} {_get_instance_ids(self.retired_nodes)}, " + f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)})" ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 714708fe2df..835e8a1e50e 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -113,7 +113,7 @@ async def _analyze_current_cluster( ] # analyse attached ec2s - active_nodes, pending_nodes, all_drained_nodes = [], [], [] + active_nodes, pending_nodes, all_drained_nodes, retired_nodes = [], [], [], [] for instance in attached_ec2s: if await auto_scaling_mode.is_instance_active(app, instance): node_used_resources = await auto_scaling_mode.compute_node_used_resources( @@ -128,6 +128,9 @@ async def _analyze_current_cluster( ) elif auto_scaling_mode.is_instance_drained(instance): all_drained_nodes.append(instance) + elif await auto_scaling_mode.is_instance_retired(app, instance): + # it should be drained, but it is not, so we force it to be drained such that it might be re-used if needed + retired_nodes.append(instance) else: pending_nodes.append(instance) @@ -149,6 +152,7 @@ async def _analyze_current_cluster( NonAssociatedInstance(ec2_instance=i) for i in terminated_ec2_instances ], disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)], + retired_nodes=retired_nodes, ) _logger.info("current state: %s", f"{cluster!r}") return cluster @@ -969,6 +973,43 @@ async def _notify_machine_creation_progress( ) +async def _drain_retired_nodes( + app: FastAPI, + cluster: Cluster, +) -> Cluster: + if not cluster.retired_nodes: + return cluster + + app_settings = get_application_settings(app) + docker_client = get_docker_client(app) + # drain this empty nodes + updated_nodes: list[Node] = await asyncio.gather( + *( + utils_docker.set_node_osparc_ready( + app_settings, + docker_client, + node.node, + ready=False, + ) + for node in cluster.retired_nodes + ) + ) + if updated_nodes: + _logger.info( + "following nodes were set to drain: '%s'", + f"{[node.Description.Hostname for node in updated_nodes if node.Description]}", + ) + newly_drained_instances = [ + AssociatedInstance(node=node, ec2_instance=instance.ec2_instance) + for instance, node in zip(cluster.retired_nodes, updated_nodes, strict=True) + ] + return dataclasses.replace( + cluster, + retired_nodes=[], + drained_nodes=cluster.drained_nodes + newly_drained_instances, + ) + + async def _autoscale_cluster( app: FastAPI, cluster: Cluster, @@ -1071,6 +1112,7 @@ async def auto_scale_cluster( cluster = await _try_attach_pending_ec2s( app, cluster, auto_scaling_mode, allowed_instance_types ) + cluster = await _drain_retired_nodes(app, cluster) cluster = await _autoscale_cluster( app, cluster, auto_scaling_mode, allowed_instance_types diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py index 4de40db4a1d..35db81b0a22 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py @@ -86,6 +86,11 @@ async def compute_cluster_total_resources( async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: ... + @staticmethod + @abstractmethod + async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + ... + @staticmethod def is_instance_drained(instance: AssociatedInstance) -> bool: return not utils_docker.is_node_osparc_ready(instance.node) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 3fe01af0350..b20fb0e7c67 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -177,6 +177,14 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance ) + @staticmethod + async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + if not utils_docker.is_node_osparc_ready(instance.node): + return False + return await dask.is_worker_retired( + _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance + ) + @staticmethod async def try_retire_nodes(app: FastAPI) -> None: await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app)) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py index e1d356d9b06..7b7c8cefd6c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py @@ -102,6 +102,13 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool assert app # nosec return utils_docker.is_node_osparc_ready(instance.node) + @staticmethod + async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + assert app # nosec + assert instance # nosec + # nothing to do here + return False + @staticmethod async def try_retire_nodes(app: FastAPI) -> None: assert app # nosec diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index a8417ab69b9..edd00e72314 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -11,6 +11,7 @@ import distributed.scheduler from aws_library.ec2.models import EC2InstanceData, Resources from dask_task_models_library.resource_constraints import DaskTaskResources +from distributed.core import Status from models_library.clusters import InternalClusterAuthentication, TLSAuthentication from pydantic import AnyUrl, ByteSize, parse_obj_as @@ -120,8 +121,28 @@ async def is_worker_connected( ) -> bool: with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError): async with _scheduler_client(scheduler_url, authentication) as client: - _dask_worker_from_ec2_instance(client, worker_ec2_instance) - return True + _, worker_details = _dask_worker_from_ec2_instance( + client, worker_ec2_instance + ) + return Status(worker_details["status"]) == Status.running + return False + + +async def is_worker_retired( + scheduler_url: AnyUrl, + authentication: InternalClusterAuthentication, + worker_ec2_instance: EC2InstanceData, +) -> bool: + with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError): + async with _scheduler_client(scheduler_url, authentication) as client: + _, worker_details = _dask_worker_from_ec2_instance( + client, worker_ec2_instance + ) + return Status(worker_details["status"]) in { + Status.closed, + Status.closing, + Status.closing_gracefully, + } return False diff --git a/services/autoscaling/tests/manual/docker-compose-computational.yml b/services/autoscaling/tests/manual/docker-compose-computational.yml index 29575c76f7e..d97387ca95b 100644 --- a/services/autoscaling/tests/manual/docker-compose-computational.yml +++ b/services/autoscaling/tests/manual/docker-compose-computational.yml @@ -1,8 +1,9 @@ services: autoscaling: environment: + - AUTOSCALING_DASK={} - DASK_MONITORING_URL=tcp://dask-scheduler:8786 - - DASK_SCHEDULER_AUTH='{}' + - DASK_SCHEDULER_AUTH={} dask-sidecar: image: itisfoundation/dask-sidecar:master-github-latest init: true diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 9bf74f1b9d5..ea6cfb46cb2 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -738,6 +738,7 @@ def _creator(**cluter_overrides) -> Cluster: buffer_ec2s=[], disconnected_nodes=[], terminating_nodes=[], + retired_nodes=[], terminated_instances=[], ), **cluter_overrides,