Skip to content

Commit

Permalink
šŸ›Autoscaling/Comp backend: drain retired nodes so that they can be reā€¦
Browse files Browse the repository at this point in the history
ā€¦-used (#6345)
  • Loading branch information
sanderegg committed Sep 12, 2024
1 parent f6d362b commit 07c200e
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 5 deletions.
10 changes: 9 additions & 1 deletion services/autoscaling/src/simcore_service_autoscaling/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class Cluster: # pylint: disable=too-many-instance-attributes
"description": "This is a EC2-backed docker node which is docker drained and waiting for termination"
}
)
retired_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2-backed docker node which was retired and waiting to be drained and eventually terminated or re-used"
}
)
terminated_instances: list[NonAssociatedInstance]

def can_scale_down(self) -> bool:
Expand All @@ -107,6 +112,7 @@ def can_scale_down(self) -> bool:
or self.drained_nodes
or self.pending_ec2s
or self.terminating_nodes
or self.retired_nodes
)

def total_number_of_machines(self) -> int:
Expand All @@ -119,6 +125,7 @@ def total_number_of_machines(self) -> int:
+ len(self.pending_ec2s)
+ len(self.broken_ec2s)
+ len(self.terminating_nodes)
+ len(self.retired_nodes)
)

def __repr__(self) -> str:
Expand All @@ -137,7 +144,8 @@ def _get_instance_ids(
f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, "
f"disconnected-nodes: count={len(self.disconnected_nodes)}, "
f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, "
f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)}, "
f"retired-nodes: count={len(self.retired_nodes)} {_get_instance_ids(self.retired_nodes)}, "
f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)})"
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def _analyze_current_cluster(
]

# analyse attached ec2s
active_nodes, pending_nodes, all_drained_nodes = [], [], []
active_nodes, pending_nodes, all_drained_nodes, retired_nodes = [], [], [], []
for instance in attached_ec2s:
if await auto_scaling_mode.is_instance_active(app, instance):
node_used_resources = await auto_scaling_mode.compute_node_used_resources(
Expand All @@ -128,6 +128,9 @@ async def _analyze_current_cluster(
)
elif auto_scaling_mode.is_instance_drained(instance):
all_drained_nodes.append(instance)
elif await auto_scaling_mode.is_instance_retired(app, instance):
# it should be drained, but it is not, so we force it to be drained such that it might be re-used if needed
retired_nodes.append(instance)
else:
pending_nodes.append(instance)

Expand All @@ -149,6 +152,7 @@ async def _analyze_current_cluster(
NonAssociatedInstance(ec2_instance=i) for i in terminated_ec2_instances
],
disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)],
retired_nodes=retired_nodes,
)
_logger.info("current state: %s", f"{cluster!r}")
return cluster
Expand Down Expand Up @@ -969,6 +973,43 @@ async def _notify_machine_creation_progress(
)


async def _drain_retired_nodes(
app: FastAPI,
cluster: Cluster,
) -> Cluster:
if not cluster.retired_nodes:
return cluster

app_settings = get_application_settings(app)
docker_client = get_docker_client(app)
# drain this empty nodes
updated_nodes: list[Node] = await asyncio.gather(
*(
utils_docker.set_node_osparc_ready(
app_settings,
docker_client,
node.node,
ready=False,
)
for node in cluster.retired_nodes
)
)
if updated_nodes:
_logger.info(
"following nodes were set to drain: '%s'",
f"{[node.Description.Hostname for node in updated_nodes if node.Description]}",
)
newly_drained_instances = [
AssociatedInstance(node=node, ec2_instance=instance.ec2_instance)
for instance, node in zip(cluster.retired_nodes, updated_nodes, strict=True)
]
return dataclasses.replace(
cluster,
retired_nodes=[],
drained_nodes=cluster.drained_nodes + newly_drained_instances,
)


async def _autoscale_cluster(
app: FastAPI,
cluster: Cluster,
Expand Down Expand Up @@ -1071,6 +1112,7 @@ async def auto_scale_cluster(
cluster = await _try_attach_pending_ec2s(
app, cluster, auto_scaling_mode, allowed_instance_types
)
cluster = await _drain_retired_nodes(app, cluster)

cluster = await _autoscale_cluster(
app, cluster, auto_scaling_mode, allowed_instance_types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ async def compute_cluster_total_resources(
async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool:
...

@staticmethod
@abstractmethod
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
...

@staticmethod
def is_instance_drained(instance: AssociatedInstance) -> bool:
return not utils_docker.is_node_osparc_ready(instance.node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
)

@staticmethod
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
if not utils_docker.is_node_osparc_ready(instance.node):
return False
return await dask.is_worker_retired(
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
)

@staticmethod
async def try_retire_nodes(app: FastAPI) -> None:
await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app))
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool
assert app # nosec
return utils_docker.is_node_osparc_ready(instance.node)

@staticmethod
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
assert app # nosec
assert instance # nosec
# nothing to do here
return False

@staticmethod
async def try_retire_nodes(app: FastAPI) -> None:
assert app # nosec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import distributed.scheduler
from aws_library.ec2.models import EC2InstanceData, Resources
from dask_task_models_library.resource_constraints import DaskTaskResources
from distributed.core import Status
from models_library.clusters import InternalClusterAuthentication, TLSAuthentication
from pydantic import AnyUrl, ByteSize, parse_obj_as

Expand Down Expand Up @@ -120,8 +121,28 @@ async def is_worker_connected(
) -> bool:
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
async with _scheduler_client(scheduler_url, authentication) as client:
_dask_worker_from_ec2_instance(client, worker_ec2_instance)
return True
_, worker_details = _dask_worker_from_ec2_instance(
client, worker_ec2_instance
)
return Status(worker_details["status"]) == Status.running
return False


async def is_worker_retired(
scheduler_url: AnyUrl,
authentication: InternalClusterAuthentication,
worker_ec2_instance: EC2InstanceData,
) -> bool:
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
async with _scheduler_client(scheduler_url, authentication) as client:
_, worker_details = _dask_worker_from_ec2_instance(
client, worker_ec2_instance
)
return Status(worker_details["status"]) in {
Status.closed,
Status.closing,
Status.closing_gracefully,
}
return False


Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
services:
autoscaling:
environment:
- AUTOSCALING_DASK={}
- DASK_MONITORING_URL=tcp://dask-scheduler:8786
- DASK_SCHEDULER_AUTH='{}'
- DASK_SCHEDULER_AUTH={}
dask-sidecar:
image: itisfoundation/dask-sidecar:master-github-latest
init: true
Expand Down
1 change: 1 addition & 0 deletions services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ def _creator(**cluter_overrides) -> Cluster:
buffer_ec2s=[],
disconnected_nodes=[],
terminating_nodes=[],
retired_nodes=[],
terminated_instances=[],
),
**cluter_overrides,
Expand Down

0 comments on commit 07c200e

Please sign in to comment.