From 60dbc85fbfea35aedac22f7ea99f397d67919ac8 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Fri, 20 Oct 2023 15:52:05 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8Autoscaling:=201st=20draft=20on=20auto?= =?UTF-8?q?-scaling=20computational=20clusters=20(#4711)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/pytest_simcore/dask_scheduler.py | 24 +- services/autoscaling/requirements/_base.in | 2 + services/autoscaling/requirements/_base.txt | 104 +++- services/autoscaling/requirements/_test.txt | 16 +- services/autoscaling/requirements/_tools.txt | 4 +- services/autoscaling/sandbox/script.py | 534 ----------------- .../src/simcore_service_autoscaling/_meta.py | 10 + .../core/application.py | 3 + .../core/errors.py | 12 + .../core/settings.py | 41 +- .../src/simcore_service_autoscaling/models.py | 11 + .../auto_scaling_mode_computational.py | 139 +++++ .../modules/auto_scaling_task.py | 29 +- .../modules/dask.py | 210 +++++++ .../utils/auto_scaling_core.py | 18 + .../utils/computational_scaling.py | 112 ++++ .../simcore_service_autoscaling/utils/ec2.py | 15 +- .../utils/rabbitmq.py | 3 +- .../utils/utils_docker.py | 7 + services/autoscaling/tests/manual/.env-devel | 22 + services/autoscaling/tests/manual/Makefile | 39 ++ services/autoscaling/tests/manual/README.md | 69 +++ .../manual/docker-compose-computational.yml | 46 ++ .../tests/manual/docker-compose.yml | 61 ++ services/autoscaling/tests/unit/conftest.py | 47 +- .../tests/unit/test_core_settings.py | 20 + ...test_modules_auto_scaling_computational.py | 564 ++++++++++++++++++ .../unit/test_modules_auto_scaling_task.py | 12 + .../tests/unit/test_modules_dask.py | 306 ++++++++++ .../unit/test_utils_auto_scaling_core.py | 4 +- .../unit/test_utils_computational_scaling.py | 230 +++++++ .../tests/unit/test_utils_docker.py | 9 + 32 files changed, 2161 insertions(+), 562 deletions(-) delete mode 100644 services/autoscaling/sandbox/script.py create mode 100644 services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py create mode 100644 services/autoscaling/src/simcore_service_autoscaling/modules/dask.py create mode 100644 services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py create mode 100644 services/autoscaling/tests/manual/.env-devel create mode 100644 services/autoscaling/tests/manual/Makefile create mode 100644 services/autoscaling/tests/manual/README.md create mode 100644 services/autoscaling/tests/manual/docker-compose-computational.yml create mode 100644 services/autoscaling/tests/manual/docker-compose.yml create mode 100644 services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py create mode 100644 services/autoscaling/tests/unit/test_modules_dask.py create mode 100644 services/autoscaling/tests/unit/test_utils_computational_scaling.py diff --git a/packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py b/packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py index 211d044ce26..c3af235c220 100644 --- a/packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py +++ b/packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py @@ -63,14 +63,34 @@ async def dask_spec_local_cluster( monkeypatch: pytest.MonkeyPatch, dask_workers_config: dict[str, Any], dask_scheduler_config: dict[str, Any], -) -> AsyncIterable[distributed.SpecCluster]: +) -> AsyncIterator[distributed.SpecCluster]: # in this mode we can precisely create a specific cluster async with distributed.SpecCluster( workers=dask_workers_config, scheduler=dask_scheduler_config, asynchronous=True, - name="pytest_cluster", + name="pytest_dask_spec_local_cluster", + ) as cluster: + scheduler_address = URL(cluster.scheduler_address) + monkeypatch.setenv( + "COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL", + f"{scheduler_address}" or "invalid", + ) + yield cluster + + +@pytest.fixture +async def dask_local_cluster_without_workers( + monkeypatch: pytest.MonkeyPatch, + dask_scheduler_config: dict[str, Any], +) -> AsyncIterable[distributed.SpecCluster]: + # in this mode we can precisely create a specific cluster + + async with distributed.SpecCluster( + scheduler=dask_scheduler_config, + asynchronous=True, + name="pytest_dask_local_cluster_without_workers", ) as cluster: scheduler_address = URL(cluster.scheduler_address) monkeypatch.setenv( diff --git a/services/autoscaling/requirements/_base.in b/services/autoscaling/requirements/_base.in index 2b727fc6470..2d19068def7 100644 --- a/services/autoscaling/requirements/_base.in +++ b/services/autoscaling/requirements/_base.in @@ -4,6 +4,7 @@ # NOTE: ALL version constraints MUST be commented --constraint ../../../requirements/constraints.txt --constraint ./constraints.txt +--constraint ../../../services/dask-sidecar/requirements/_dask-distributed.txt # intra-repo required dependencies --requirement ../../../packages/models-library/requirements/_base.in @@ -15,6 +16,7 @@ aiodocker aioboto3 +dask[distributed] fastapi packaging types-aiobotocore[ec2] diff --git a/services/autoscaling/requirements/_base.txt b/services/autoscaling/requirements/_base.txt index 99015cd7a79..d9c3375c34d 100644 --- a/services/autoscaling/requirements/_base.txt +++ b/services/autoscaling/requirements/_base.txt @@ -11,7 +11,9 @@ aio-pika==9.1.2 aioboto3==10.4.0 # via -r requirements/_base.in aiobotocore==2.4.2 - # via aioboto3 + # via + # aioboto3 + # aiobotocore aiodebug==2.3.0 # via # -c requirements/../../../packages/service-library/requirements/./_base.in @@ -87,10 +89,27 @@ certifi==2023.7.22 # httpx charset-normalizer==3.0.1 # via aiohttp -click==8.1.3 +click==8.1.7 # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask + # distributed # typer # uvicorn +cloudpickle==2.2.1 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask + # distributed +dask==2023.3.2 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # -r requirements/_base.in + # distributed +distributed==2023.3.2 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask dnspython==2.3.0 # via email-validator email-validator==1.3.1 @@ -112,6 +131,10 @@ frozenlist==1.3.3 # via # aiohttp # aiosignal +fsspec==2023.6.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask h11==0.14.0 # via # httpcore @@ -136,6 +159,23 @@ idna==3.4 # email-validator # httpx # yarl +importlib-metadata==6.8.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask +jinja2==3.1.2 + # via + # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/./../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/./../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/./../../../requirements/constraints.txt + # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../requirements/constraints.txt + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed jmespath==1.0.1 # via # boto3 @@ -145,10 +185,23 @@ jsonschema==3.2.0 # -r requirements/../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/./../../../packages/models-library/requirements/_base.in +locket==1.0.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed + # partd markdown-it-py==3.0.0 # via rich +markupsafe==2.1.3 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # jinja2 mdurl==0.1.2 # via markdown-it-py +msgpack==1.0.5 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed multidict==6.0.4 # via # aiohttp @@ -158,10 +211,22 @@ orjson==3.9.7 # -r requirements/../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/./../../../packages/models-library/requirements/_base.in -packaging==23.0 - # via -r requirements/_base.in +packaging==23.1 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # -r requirements/_base.in + # dask + # distributed pamqp==3.2.1 # via aiormq +partd==1.4.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask +psutil==5.9.5 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed pydantic==1.10.2 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -206,7 +271,10 @@ pyyaml==6.0.1 # -c requirements/../../../packages/service-library/requirements/./_base.in # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # -r requirements/../../../packages/service-library/requirements/_base.in + # dask + # distributed redis==4.5.4 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -236,6 +304,10 @@ sniffio==1.3.0 # anyio # httpcore # httpx +sortedcontainers==2.4.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed starlette==0.27.0 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -248,6 +320,10 @@ starlette==0.27.0 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # fastapi +tblib==2.0.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed tenacity==8.2.1 # via # -c requirements/../../../packages/service-library/requirements/./_base.in @@ -255,7 +331,15 @@ tenacity==8.2.1 toolz==0.12.0 # via # -c requirements/../../../packages/service-library/requirements/./_base.in + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # -r requirements/../../../packages/service-library/requirements/_base.in + # dask + # distributed + # partd +tornado==6.3.3 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed tqdm==4.64.1 # via # -c requirements/../../../packages/service-library/requirements/./_base.in @@ -278,7 +362,7 @@ typing-extensions==4.5.0 # pydantic # types-aiobotocore # types-aiobotocore-ec2 -urllib3==1.26.14 +urllib3==1.26.16 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -289,7 +373,9 @@ urllib3==1.26.14 # -c requirements/../../../packages/service-library/requirements/./../../../requirements/constraints.txt # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # botocore + # distributed uvicorn==0.20.0 # via -r requirements/../../../packages/service-library/requirements/_fastapi.in wrapt==1.14.1 @@ -299,6 +385,14 @@ yarl==1.9.2 # aio-pika # aiohttp # aiormq +zict==3.0.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed +zipp==3.16.2 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # importlib-metadata # The following packages are considered to be unsafe in a requirements file: # setuptools diff --git a/services/autoscaling/requirements/_test.txt b/services/autoscaling/requirements/_test.txt index 90c17c410cd..17155aa4e3f 100644 --- a/services/autoscaling/requirements/_test.txt +++ b/services/autoscaling/requirements/_test.txt @@ -53,7 +53,7 @@ charset-normalizer==3.0.1 # via # -c requirements/_base.txt # requests -click==8.1.3 +click==8.1.7 # via # -c requirements/_base.txt # flask @@ -121,6 +121,7 @@ itsdangerous==2.1.2 jinja2==3.1.2 # via # -c requirements/../../../requirements/constraints.txt + # -c requirements/_base.txt # flask # moto jmespath==1.0.1 @@ -151,6 +152,7 @@ lupa==2.0 # via fakeredis markupsafe==2.1.3 # via + # -c requirements/_base.txt # jinja2 # werkzeug moto==4.2.2 @@ -165,7 +167,7 @@ openapi-spec-validator==0.4.0 # via moto ordered-set==4.1.0 # via deepdiff -packaging==23.0 +packaging==23.1 # via # -c requirements/_base.txt # docker @@ -179,7 +181,9 @@ pluggy==1.3.0 pprintpp==0.4.0 # via pytest-icdiff psutil==5.9.5 - # via -r requirements/_test.in + # via + # -c requirements/_base.txt + # -r requirements/_test.in py-partiql-parser==0.3.6 # via moto pyasn1==0.5.0 @@ -277,7 +281,9 @@ sniffio==1.3.0 # httpcore # httpx sortedcontainers==2.4.0 - # via fakeredis + # via + # -c requirements/_base.txt + # fakeredis sshpubkeys==3.3.1 # via moto sympy==1.12 @@ -293,7 +299,7 @@ typing-extensions==4.5.0 # -c requirements/_base.txt # aws-sam-translator # pydantic -urllib3==1.26.14 +urllib3==1.26.16 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt diff --git a/services/autoscaling/requirements/_tools.txt b/services/autoscaling/requirements/_tools.txt index 9041fca26cb..7050ede9912 100644 --- a/services/autoscaling/requirements/_tools.txt +++ b/services/autoscaling/requirements/_tools.txt @@ -14,7 +14,7 @@ bump2version==1.0.1 # via -r requirements/../../../requirements/devenv.txt cfgv==3.4.0 # via pre-commit -click==8.1.3 +click==8.1.7 # via # -c requirements/_base.txt # -c requirements/_test.txt @@ -40,7 +40,7 @@ mypy-extensions==1.0.0 # via black nodeenv==1.8.0 # via pre-commit -packaging==23.0 +packaging==23.1 # via # -c requirements/_base.txt # -c requirements/_test.txt diff --git a/services/autoscaling/sandbox/script.py b/services/autoscaling/sandbox/script.py deleted file mode 100644 index cbd4c720111..00000000000 --- a/services/autoscaling/sandbox/script.py +++ /dev/null @@ -1,534 +0,0 @@ -# pylint: skip-file - -# -# Moved by pcrespov from allexandre/osparc-dask-auto-scaling/-/blob/master/script.py -# - -import math -import re -import time -from datetime import datetime, timedelta - -import boto3 -import dask_gateway -import docker -from dask.distributed import Client, Scheduler -from environs import Env - -# Init :Check that schduler is working, if not use the notebook to start it - remove old node in the swarm and in aws -# TODO add python -m pip install dask distributed --upgrade to requirements -# TODO Case when ressources asked are not meet by any sidecar resource resctrictions all tasks mis dictionnaire cluster scheduler worker_Info -# Sylvain 3.8.10 -env = Env() -env.read_env() - -local_file_directory = "data" -Scheduler.pickle = True -docker_client = docker.from_env() - -list_created_clusters = [] - - -aws_EC2 = [ - # { "name": "t2.nano", "CPUs" : 1, "RAM" : 0.5}, - # { "name": "t2.micro", "CPUs" : 1, "RAM" : 1}, - # { "name": "t2.small", "CPUs" : 1, "RAM" : 2}, - # { "name": "t2.medium", "CPUs" : 2, "RAM" : 4}, - # { "name": "t2.large", "CPUs" : 2, "RAM" : 8}, - {"name": "t2.xlarge", "CPUs": 4, "RAM": 16}, - {"name": "t2.2xlarge", "CPUs": 8, "RAM": 32}, - {"name": "r5n.4xlarge", "CPUs": 16, "RAM": 128}, - {"name": "r5n.8xlarge", "CPUs": 32, "RAM": 256} - # { "name": "r5n.12xlarge", "CPUs" : 48, "RAM" : 384}, - # { "name": "r5n.16xlarge", "CPUs" : 64, "RAM" : 512}, - # { "name": "r5n.24xlarge", "CPUs" : 96, "RAM" : 768} -] - -# THanks to https://gist.github.com/shawnbutts/3906915 -def bytesto(bytes_size, to, bsize=1024): - """convert bytes to megabytes, etc. - sample code: - print('mb= ' + str(bytesto(314575262000000, 'm'))) - sample output: - mb= 300002347.946 - """ - a = {"k": 1, "m": 2, "g": 3, "t": 4, "p": 5, "e": 6} - r = float(bytes_size) - for _ in range(a[to]): - r = r / bsize - return r - - -# Inteveral between checks in s -check_time = int(env.str("INTERVAL_CHECK")) - - -def get_number_of_tasks(dask_scheduler=None): - return f"{dask_scheduler.tasks}" - - -def get_workers_info(dask_scheduler=None): - return f"{dask_scheduler.workers}" - - -def check_node_resources(): - nodes = docker_client.nodes.list() - # We compile RAM and CPU capabilities of each node who have the label sidecar - # TODO take in account personalized workers - # Total resources of the cluster - nodes_sidecar_data = [] - for node in nodes: - for label in node.attrs["Spec"]["Labels"]: - if label == "sidecar": - nodes_sidecar_data.append( - { - "ID": node.attrs["ID"], - "RAM": bytesto( - node.attrs["Description"]["Resources"]["MemoryBytes"], - "g", - bsize=1024, - ), - "CPU": int(node.attrs["Description"]["Resources"]["NanoCPUs"]) - / 1000000000, - } - ) - - total_nodes_cpus = 0 - total_nodes_ram = 0 - nodes_ids = [] - for node in nodes_sidecar_data: - total_nodes_cpus = total_nodes_cpus + node["CPU"] - total_nodes_ram = total_nodes_ram + node["RAM"] - nodes_ids.append(node["ID"]) - - return { - "total_cpus": total_nodes_cpus, - "total_ram": total_nodes_ram, - "nodes_ids": nodes_ids, - } - - -# TODO discuss with the team consideration between limits and reservations on dy services -def check_tasks_resources(nodes_ids): - total_tasks_cpus = 0 - total_tasks_ram = 0 - tasks_ressources = [] - total_pending_tasks_cpus = 0 - total_pending_tasks_ram = 0 - tasks_pending_ressources = [] - serv = docker_client.services.list() - count_tasks_pending = 0 - for service in serv: - tasks = service.tasks() - for task in tasks: - if task["Status"]["State"] == "running" and task["NodeID"] in nodes_ids: - if "Resources" in task["Spec"] and task["Spec"]["Resources"] != {}: - ram = 0 - cpu = 0 - if "Reservations" in task["Spec"]["Resources"]: - if "MemoryBytes" in task["Spec"]["Resources"]["Reservations"]: - ram = bytesto( - task["Spec"]["Resources"]["Reservations"][ - "MemoryBytes" - ], - "g", - bsize=1024, - ) - if "NanoCPUs" in task["Spec"]["Resources"]["Reservations"]: - cpu = ( - int( - task["Spec"]["Resources"]["Reservations"][ - "NanoCPUs" - ] - ) - / 1000000000 - ) - tasks_ressources.append({"ID": task["ID"], "RAM": ram, "CPU": cpu}) - - elif ( - task["Status"]["State"] == "pending" - and task["Status"]["Message"] == "pending task scheduling" - and "insufficient resources on" in task["Status"]["Err"] - ): - count_tasks_pending = count_tasks_pending + 1 - if "Resources" in task["Spec"] and task["Spec"]["Resources"] != {}: - ram = 0 - cpu = 0 - if "Reservations" in task["Spec"]["Resources"]: - if "MemoryBytes" in task["Spec"]["Resources"]["Reservations"]: - ram = bytesto( - task["Spec"]["Resources"]["Reservations"][ - "MemoryBytes" - ], - "g", - bsize=1024, - ) - if "NanoCPUs" in task["Spec"]["Resources"]["Reservations"]: - cpu = ( - int( - task["Spec"]["Resources"]["Reservations"][ - "NanoCPUs" - ] - ) - / 1000000000 - ) - tasks_pending_ressources.append( - {"ID": task["ID"], "RAM": ram, "CPU": cpu} - ) - - total_tasks_cpus = 0 - total_tasks_ram = 0 - for task in tasks_ressources: - total_tasks_cpus = total_tasks_cpus + task["CPU"] - total_tasks_ram = total_tasks_ram + task["RAM"] - - for task in tasks_pending_ressources: - total_pending_tasks_cpus = total_pending_tasks_cpus + task["CPU"] - total_pending_tasks_ram = total_pending_tasks_ram + task["RAM"] - return { - "total_cpus_running_tasks": total_tasks_cpus, - "total_ram_running_tasks": total_tasks_ram, - "total_cpus_pending_tasks": total_pending_tasks_cpus, - "total_ram_pending_tasks": total_pending_tasks_ram, - "count_tasks_pending": count_tasks_pending, - } - - -# Check if the swarm need to scale up -# TODO currently the script has to be executed directly on the manager. Implenting a version that connect with ssh and handle the case when one manager is down to be able to have redundancy -def check_dynamic(): - user_data = ( - """#!/bin/bash - cd /home/ubuntu - hostname=$(ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "hostname" 2>&1) - token=$(ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "docker swarm join-token -q worker") - host=$(ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "docker swarm join-token worker" 2>&1) - docker swarm join --token ${token} ${host##* } - label=$(ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "docker node ls | grep $(hostname)") - label="$(cut -d' ' -f1 <<<"$label")" - ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "docker node update --label-add sidecar=true $label" - ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "docker node update --label-add standardworker=true $label" - """ - ) - # docker_client.containers.run("ubuntu:latest", "echo hello world") - serv = docker_client.services.list() - # We need the data of each task and the data of each node to know if we need to scale up or not - # Test if some tasks are in a pending mode because of a lack of resources - need_resources = False - for service in serv: - tasks = service.tasks() - for task in tasks: - if ( - task["Status"]["State"] == "pending" - and task["Status"]["Message"] == "pending task scheduling" - and "insufficient resources on" in task["Status"]["Err"] - ): - need_resources = True - break - - # We compile RAM and CPU capabilities of each node who have the label sidecar - # TODO take in account personalized workers - # Total resources of the cluster - if need_resources: - total_nodes = check_node_resources() - total_tasks = check_tasks_resources(total_nodes["nodes_ids"]) - available_cpus = ( - total_nodes["total_cpus"] - total_tasks["total_cpus_running_tasks"] - ) - available_ram = ( - total_nodes["total_ram"] - total_tasks["total_ram_running_tasks"] - ) - # print("avail cpuz" + str(available_cpus) + " avail ram" + str(available_ram)) - needed_cpus = ( - available_cpus - total_tasks["total_cpus_pending_tasks"] - ) * -1 + 2 # Cpus used for other tasks - needed_ram = ( - available_ram - total_tasks["total_ram_pending_tasks"] - ) * -1 + 4 # Ram used for other stasks - # print("taskcpus_needed : " + str(total_tasks["total_cpus_pending_tasks"]) + " staskRAMneeded : " + str(total_tasks["total_ram_pending_tasks"])) - print( - "The Swarm currently has " - + str(total_tasks["count_tasks_pending"]) - + " task(s) in pending mode" - ) - # print("Theses task require a total of " + str(needed_cpus) + " cpus and " + str(needed_ram) + " GB of RAM in order to be executed.") - print( - "Theses task(s) require a total of " - + str(math.ceil(total_tasks["total_cpus_pending_tasks"])) - + " cpus and " - + str(math.ceil(total_tasks["total_ram_pending_tasks"])) - + " GB of RAM in order to be executed." - ) - for instance in aws_EC2: - # if instance["CPUs"] >= needed_cpus and instance["RAM"] >= needed_ram: - if instance["CPUs"] >= math.ceil( - total_tasks["total_cpus_pending_tasks"] - ) and instance["RAM"] >= math.ceil(total_tasks["total_ram_pending_tasks"]): - now = datetime.now() + timedelta(hours=2) - dt_string = now.strftime("%d/%m/%Y %H:%M:%S") - print( - "A new EC2 instance has been selected to add more resources to the cluster. Name : " - + instance["name"] - + " Cpus : " - + str(instance["CPUs"]) - + " RAM : " - + str(instance["RAM"]) - + "GB" - ) - start_instance_aws( - "ami-097895f2d7d86f07e", - instance["name"], - "Autoscaling node " + dt_string, - "dynamic", - user_data, - ) - break - else: - print("No pending task(s) on the swarm detected.") - - # TODO Better algorythm - - -# TODO VPn handling is bad -# If no cluster I create one -# Test how it works without cluster -# To start the script for the first time, create the cluster with a jupyter notebook -def check_computationnal(): - # When we launch a new task, we check if the desired capacity doesn't exceed the total cluster capacity or the most powerful worker capacity - g = dask_gateway.Gateway( - address=env.str("DASK_GATEWAY_ADDRESS"), - auth=dask_gateway.BasicAuth( - env.str("DASK_GATEWAY_LOGIN"), env.str("DASK_GATEWAY_PWD") - ), - ) - - # At first, we need to create a cluster if there is none - if g.list_clusters() == []: - print("Currently 0 cluster in the gateway. We create a new one") - list_created_clusters.append(g.new_cluster()) - - cluster = g.connect(g.list_clusters()[0].name) - # cluster.adapt(minimum=1, maximum=100) - scheduler_infos = cluster.scheduler_info - client = cluster.get_client() - - max_worker_CPUs = 0 - max_worker_RAM = 0 - total_worker_CPUs = 0 - total_worker_RAM = 0 - print(scheduler_infos) - # cluster.adapt(minimum=1, maximum=15) - # TODO: case where a task want something which has enough RAM on one sidecar and enough CPU in another one but no sidecar has both ressources - for worker in scheduler_infos["workers"].values(): - total_worker_CPUs = total_worker_CPUs + int(worker["resources"]["CPU"]) - total_worker_RAM = total_worker_RAM + int(worker["resources"]["RAM"]) - if int(worker["resources"]["CPU"]) > max_worker_CPUs: - max_worker_CPUs = int(worker["resources"]["CPU"]) - if int(worker["resources"]["RAM"]) > max_worker_RAM: - max_worker_RAM = int(worker["resources"]["RAM"]) - - max_worker_RAM = bytesto(max_worker_RAM, "g", bsize=1024) - total_worker_RAM = bytesto(total_worker_RAM, "g", bsize=1024) - # cl= Client("gateway://test.test.osparc.io:8000/993bb0c4a51f4d44bd41393679a56c8d") - - # print("Total workers CPUs : " + str(total_worker_CPUs)) - # print("Total workers RAM : " + str(round(total_worker_RAM, 1)) + "G") - # print("Max worker CPUs : " + str(max_worker_CPUs)) - # print("Total workers RAM : " + str(round(max_worker_RAM, 1)) + "G") - # s = Scheduler() - # print(cluster.scheduler_comm) - cl = Client(cluster, security=cluster.security) - # print(g.proxy_address) - # print(cl.dashboard_link) - - # s = Scheduler(host="test.test.osparc.io/993bb0c4a51f4d44bd41393679a56c8d", port=8000, protocol="gateway", interface=) - # s.workers_list - # print(s.status) - tasks_infos = cl.run_on_scheduler(get_number_of_tasks) - # print(tasks_infos) - workers_infos = cl.run_on_scheduler(get_workers_info) - # workers_infos_dic_formatted = workers_infos.replace('SortedDict(', '')[:-1] - # print(workers_infos_dic_formatted) - # res = json.loads(workers_infos_dic_formatted) - result = re.search("processing: (.*)>", workers_infos) - if result is None: - total_tasks = 0 - else: - total_tasks = int(result.group(1)) - print("Current number of tasks managed by the scheduler : " + str(total_tasks)) - - # print(workers_infos.get("processing")) - # print(workers_infos) - # res = json.loads(workers_infos_dic_formatted) - print("Current number of workers : " + str(len(client.scheduler_info()["workers"]))) - task_handled = 0 - # IN this scenario, we look at the first worker only. In the future we need to look at all the workers - if len(client.scheduler_info()["workers"]) > 0: - workers_keys = list(client.scheduler_info()["workers"].keys())[0] - print( - "Number of tasks currently executed by the workers : " - + str( - client.scheduler_info()["workers"][workers_keys]["metrics"]["executing"] - ) - ) - task_handled = client.scheduler_info()["workers"][workers_keys]["metrics"][ - "executing" - ] - if task_handled < total_tasks: - print( - "The clusted can't handle the current load... Auto-scaling to add a new host" - ) - scale_up(2, 4) - else: - print("Computational services :Current cluster state OK, pausing for 30s.....") - - # print(client.status) - # Worker.ge - - # if task[CPU] > max_worker_CPUs or task[RAM] > max_worker_RAM: - - # Sample task - # future = client.submit(add, 132,423, resources={"CPU":10}, pure=False) - # future.result() - - -def add(x, y): - time.sleep(120) - return x + y - - -def scale_up(CPUs, RAM): - print("Processing the new instance on AWS..") - - # Has to be disccused - for host in aws_EC2: - if host["CPUs"] >= CPUs and host["RAM"] >= RAM: - new_host = host - - # Do we pass our scaling limits ? - # if total_worker_CPUs + host["CPUs"] >= int(env.str("MAX_CPUs_CLUSTER")) or total_worker_RAM + host["RAM"] >= int(env.str("MAX_RAM_CLUSTER")): - # print("Error : We would pass the defined cluster limits in term of RAM/CPUs. We can't scale up") - # else: - now = datetime.now() - dt_string = now.strftime("%d/%m/%Y %H:%M:%S") - user_data = ( - """#!/bin/bash - cd /home/ubuntu - hostname=$(ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "hostname" 2>&1) - token=$(ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "docker swarm join-token -q worker") - host=$(ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "docker swarm join-token worker" 2>&1) - docker swarm join --token ${token} ${host##* } - label=$(ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "docker node ls | grep $(hostname)") - label="$(cut -d' ' -f1 <<<"$label")" - ssh -i """ - + env.str("AWS_KEY_NAME") - + """.pem -oStrictHostKeyChecking=no ubuntu@""" - + env.str("AWS_DNS") - + """ "docker node update --label-add sidecar=true $label" - reboot_hour=$(last reboot | head -1 | awk '{print $8}') - reboot_mn="${reboot_hour: -2}" - if [ $reboot_mn -gt 4 ] - then - cron_mn=$((${reboot_mn} - 5)) - else - cron_mn=55 - fi - echo ${cron_mn} - cron_mn+=" * * * * /home/ubuntu/cron_terminate.bash" - cron_mn="*/10 * * * * /home/ubuntu/cron_terminate.bash" - echo "${cron_mn}" - (crontab -u ubuntu -l; echo "$cron_mn" ) | crontab -u ubuntu - - """ - ) - start_instance_aws( - "ami-0699f9dc425967eba", - "t2.2xlarge", - "Autoscaling node " + dt_string, - "computational", - user_data, - ) - - -def start_instance_aws(ami_id, instance_type, tag, service_type, user_data): - ec2Client = boto3.client( - "ec2", - aws_access_key_id=env.str("AWS_ACCESS_KEY_ID"), - aws_secret_access_key=env.str("AWS_SECRET_ACCESS_KEY"), - region_name="us-east-1", - ) - ec2Resource = boto3.resource("ec2", region_name="us-east-1") - ec2 = boto3.resource("ec2", region_name="us-east-1") - # TODO check bug on the auto-terminate ? - # Create the instance - instanceDict = ec2.create_instances( - ImageId=ami_id, - KeyName=env.str("AWS_KEY_NAME"), - InstanceType=instance_type, - SecurityGroupIds=[env.str("SECURITY_GROUP_IDS")], # Have to be parametrized - MinCount=1, - MaxCount=1, - InstanceInitiatedShutdownBehavior="terminate", - SubnetId=env.str("SUBNET_ID"), # Have to be parametrized - TagSpecifications=[ - {"ResourceType": "instance", "Tags": [{"Key": "Name", "Value": tag}]} - ], - UserData=user_data, - ) - instanceDict = instanceDict[0] - print( - "New instance launched for " - + service_type - + " services. Estimated time to launch and join the cluster : 2mns" - ) - print("Pausing for 10mns before next check") - time.sleep(600) - # print("Instance state: %s" % instanceDict.state) - # print("Public dns: %s" % instanceDict.public_dns_name) - # print("Instance id: %s" % instanceDict.id) - - -if __name__ == "__main__": - while True: - # check_computationnal() - check_dynamic() - time.sleep(check_time) diff --git a/services/autoscaling/src/simcore_service_autoscaling/_meta.py b/services/autoscaling/src/simcore_service_autoscaling/_meta.py index 25f8c2d6f5f..1e9e2b9f9f9 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/_meta.py +++ b/services/autoscaling/src/simcore_service_autoscaling/_meta.py @@ -28,6 +28,16 @@ f"v{__version__}" ) +APP_STARTED_COMPUTATIONAL_BANNER_MSG = r""" + _ _ _ _ + | | | | (_) | | + ___ ___ _ __ ___ _ __ _ _ | |_ __ _ | |_ _ ___ _ __ __ _ | | + / __|/ _ \ | '_ ` _ \ | '_ \ | | | || __|/ _` || __|| | / _ \ | '_ \ / _` || | + | (__| (_) || | | | | || |_) || |_| || |_| (_| || |_ | || (_) || | | || (_| || | + \___|\___/ |_| |_| |_|| .__/ \__,_| \__|\__,_| \__||_| \___/ |_| |_| \__,_||_| + | | + |_| +""" APP_STARTED_DYNAMIC_BANNER_MSG = r""" _ _ diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/application.py b/services/autoscaling/src/simcore_service_autoscaling/core/application.py index 06dc4779076..8cff6595b16 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/application.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/application.py @@ -8,6 +8,7 @@ APP_FINISHED_BANNER_MSG, APP_NAME, APP_STARTED_BANNER_MSG, + APP_STARTED_COMPUTATIONAL_BANNER_MSG, APP_STARTED_DISABLED_BANNER_MSG, APP_STARTED_DYNAMIC_BANNER_MSG, ) @@ -54,6 +55,8 @@ async def _on_startup() -> None: print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201 if settings.AUTOSCALING_NODES_MONITORING: print(APP_STARTED_DYNAMIC_BANNER_MSG, flush=True) # noqa: T201 + elif settings.AUTOSCALING_DASK: + print(APP_STARTED_COMPUTATIONAL_BANNER_MSG, flush=True) # noqa: T201 else: print(APP_STARTED_DISABLED_BANNER_MSG, flush=True) # noqa: T201 diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py index e4399a95182..522cd6638f8 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py @@ -25,3 +25,15 @@ class Ec2TooManyInstancesError(AutoscalingRuntimeError): class Ec2InvalidDnsNameError(AutoscalingRuntimeError): msg_template: str = "Invalid EC2 private DNS name {aws_private_dns_name}" + + +class DaskSchedulerNotFoundError(AutoscalingRuntimeError): + msg_template: str = "Scheduler in {url} was not found!" + + +class DaskNoWorkersError(AutoscalingRuntimeError): + msg_template: str = "There are no dask workers connected to scheduler in {url}" + + +class DaskWorkerNotFoundError(AutoscalingRuntimeError): + msg_template: str = "Dask worker running on {worker_host} is not registered to scheduler in {url}, it is not found!" diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index c904bc950ae..719227808ef 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -10,7 +10,15 @@ VersionTag, ) from models_library.docker import DockerGenericTag, DockerLabelKey -from pydantic import Field, NonNegativeInt, PositiveInt, parse_obj_as, validator +from pydantic import ( + AnyUrl, + Field, + NonNegativeInt, + PositiveInt, + parse_obj_as, + root_validator, + validator, +) from settings_library.base import BaseCustomSettings from settings_library.docker_registry import RegistrySettings from settings_library.rabbit import RabbitSettings @@ -99,6 +107,12 @@ class EC2InstancesSettings(BaseCustomSettings): description="script(s) to run on EC2 instance startup (be careful!), each entry is run one after the other using '&&' operator", ) + EC2_INSTANCES_NAME_PREFIX: str = Field( + default="autoscaling", + min_length=1, + description="prefix used to name the EC2 instances created by this instance of autoscaling", + ) + @validator("EC2_INSTANCES_TIME_BEFORE_TERMINATION") @classmethod def ensure_time_is_in_range(cls, value): @@ -134,6 +148,12 @@ class NodesMonitoringSettings(BaseCustomSettings): ) +class DaskMonitoringSettings(BaseCustomSettings): + DASK_MONITORING_URL: AnyUrl = Field( + ..., description="the url to the osparc-dask-scheduler" + ) + + class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): # CODE STATICS --------------------------------------------------------- API_VERSION: str = API_VERSION @@ -161,14 +181,14 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): # RUNTIME ----------------------------------------------------------- AUTOSCALING_DEBUG: bool = Field( - False, description="Debug mode", env=["AUTOSCALING_DEBUG", "DEBUG"] + default=False, description="Debug mode", env=["AUTOSCALING_DEBUG", "DEBUG"] ) AUTOSCALING_LOGLEVEL: LogLevel = Field( LogLevel.INFO, env=["AUTOSCALING_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"] ) AUTOSCALING_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field( - False, + default=False, env=[ "AUTOSCALING_LOG_FORMAT_LOCAL_DEV_ENABLED", "LOG_FORMAT_LOCAL_DEV_ENABLED", @@ -197,8 +217,10 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): AUTOSCALING_REGISTRY: RegistrySettings | None = Field(auto_default_from_env=True) + AUTOSCALING_DASK: DaskMonitoringSettings | None = Field(auto_default_from_env=True) + @cached_property - def LOG_LEVEL(self): + def LOG_LEVEL(self): # noqa: N802 return self.AUTOSCALING_LOGLEVEL @validator("AUTOSCALING_LOGLEVEL") @@ -207,6 +229,17 @@ def valid_log_level(cls, value: str) -> str: # NOTE: mypy is not happy without the cast return cast(str, cls.validate_log_level(value)) + @root_validator() + @classmethod + def exclude_both_dynamic_computational_mode(cls, values): + if ( + values.get("AUTOSCALING_DASK") is not None + and values.get("AUTOSCALING_NODES_MONITORING") is not None + ): + msg = "Autoscaling cannot be set to monitor both computational and dynamic services (both AUTOSCALING_DASK and AUTOSCALING_NODES_MONITORING are currently set!)" + raise ValueError(msg) + return values + def get_application_settings(app: FastAPI) -> ApplicationSettings: return cast(ApplicationSettings, app.state.settings) diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index 8b408745888..f14c30026f1 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -1,5 +1,6 @@ import datetime from dataclasses import dataclass, field +from typing import Any, TypeAlias from models_library.generated_models.docker_rest_api import Node from pydantic import BaseModel, ByteSize, NonNegativeFloat, PositiveInt @@ -91,3 +92,13 @@ class Cluster: } ) terminated_instances: list[EC2InstanceData] + + +DaskTaskId: TypeAlias = str +DaskTaskResources: TypeAlias = dict[str, Any] + + +@dataclass(frozen=True, kw_only=True) +class DaskTask: + task_id: DaskTaskId + required_resources: DaskTaskResources diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py new file mode 100644 index 00000000000..68c17ff5fff --- /dev/null +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -0,0 +1,139 @@ +import collections +import logging + +from fastapi import FastAPI +from models_library.docker import DockerLabelKey +from models_library.generated_models.docker_rest_api import Node +from pydantic import AnyUrl, ByteSize +from servicelib.logging_utils import LogLevelInt +from servicelib.utils import logged_gather + +from ..core.settings import get_application_settings +from ..models import ( + AssociatedInstance, + DaskTask, + EC2InstanceData, + EC2InstanceType, + Resources, +) +from ..utils import computational_scaling as utils +from ..utils import ec2, utils_docker +from . import dask +from .auto_scaling_mode_base import BaseAutoscaling +from .docker import get_docker_client + +_logger = logging.getLogger(__name__) + + +def _scheduler_url(app: FastAPI) -> AnyUrl: + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_DASK # nosec + return app_settings.AUTOSCALING_DASK.DASK_MONITORING_URL + + +class ComputationalAutoscaling(BaseAutoscaling): + @staticmethod + async def get_monitored_nodes(app: FastAPI) -> list[Node]: + return await utils_docker.get_worker_nodes(get_docker_client(app)) + + @staticmethod + def get_ec2_tags(app: FastAPI) -> dict[str, str]: + app_settings = get_application_settings(app) + return ec2.get_ec2_tags_computational(app_settings) + + @staticmethod + def get_new_node_docker_tags(app: FastAPI) -> dict[DockerLabelKey, str]: + assert app # nosec + return {} + + @staticmethod + async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]: + return await dask.list_unrunnable_tasks(_scheduler_url(app)) + + @staticmethod + def try_assigning_task_to_node( + task: DaskTask, + instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]], + ) -> bool: + return utils.try_assigning_task_to_node(task, instance_to_tasks) + + @staticmethod + async def try_assigning_task_to_pending_instances( + app: FastAPI, + pending_task, + list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list]], + type_to_instance_map: dict[str, EC2InstanceType], + *, + notify_progress: bool + ) -> bool: + return await utils.try_assigning_task_to_pending_instances( + app, + pending_task, + list_of_pending_instance_to_tasks, + type_to_instance_map, + notify_progress=notify_progress, + ) + + @staticmethod + def try_assigning_task_to_instance_types( + pending_task, + list_of_instance_to_tasks: list[tuple[EC2InstanceType, list]], + ) -> bool: + return utils.try_assigning_task_to_instance_types( + pending_task, list_of_instance_to_tasks + ) + + @staticmethod + async def log_message_from_tasks( + app: FastAPI, tasks: list, message: str, *, level: LogLevelInt + ) -> None: + assert app # nosec + assert tasks # nosec + _logger.log(level, "LOG: %s", message) + + @staticmethod + async def progress_message_from_tasks(app: FastAPI, tasks: list, progress: float): + assert app # nosec + assert tasks # nosec + _logger.info("PROGRESS: %s", progress) + + @staticmethod + def get_max_resources_from_task(task) -> Resources: + return utils.get_max_resources_from_dask_task(task) + + @staticmethod + async def compute_node_used_resources( + app: FastAPI, instance: AssociatedInstance + ) -> Resources: + num_results_in_memory = await dask.get_worker_still_has_results_in_memory( + _scheduler_url(app), instance.ec2_instance + ) + if num_results_in_memory > 0: + # NOTE: this is a trick to consider the node still useful + return Resources(cpus=1, ram=ByteSize()) + return await dask.get_worker_used_resources( + _scheduler_url(app), instance.ec2_instance + ) + + @staticmethod + async def compute_cluster_used_resources( + app: FastAPI, instances: list[AssociatedInstance] + ) -> Resources: + list_of_used_resources = await logged_gather( + *( + ComputationalAutoscaling.compute_node_used_resources(app, i) + for i in instances + ) + ) + counter = collections.Counter({k: 0 for k in Resources.__fields__}) + for result in list_of_used_resources: + counter.update(result.dict()) + return Resources.parse_obj(dict(counter)) + + @staticmethod + async def compute_cluster_total_resources( + app: FastAPI, instances: list[AssociatedInstance] + ) -> Resources: + return await dask.compute_cluster_total_resources( + _scheduler_url(app), instances + ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py index 0d0c5aee9ff..f8b41161d25 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py @@ -8,6 +8,7 @@ from ..core.settings import ApplicationSettings from .auto_scaling_core import auto_scale_cluster +from .auto_scaling_mode_computational import ComputationalAutoscaling from .auto_scaling_mode_dynamic import DynamicAutoscaling from .redis import get_redis_client @@ -19,15 +20,28 @@ def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: async def _startup() -> None: app_settings: ApplicationSettings = app.state.settings - lock_key = f"{app.title}:{app.version}:" + lock_key_parts = [app.title, app.version] lock_value = "" if app_settings.AUTOSCALING_NODES_MONITORING: - lock_key += f"dynamic:{app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS}" + lock_key_parts += [ + "dynamic", + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS, + ] lock_value = json.dumps( { "node_labels": app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS } ) + elif app_settings.AUTOSCALING_DASK: + lock_key_parts += [ + "computational", + app_settings.AUTOSCALING_DASK.DASK_MONITORING_URL, + ] + lock_value = json.dumps( + {"scheduler_url": app_settings.AUTOSCALING_DASK.DASK_MONITORING_URL} + ) + lock_key = ":".join(f"{k}" for k in lock_key_parts) + assert lock_key # nosec assert lock_value # nosec app.state.autoscaler_task = start_periodic_task( exclusive(get_redis_client(app), lock_key=lock_key, lock_value=lock_value)( @@ -36,7 +50,9 @@ async def _startup() -> None: interval=app_settings.AUTOSCALING_POLL_INTERVAL, task_name=_TASK_NAME, app=app, - auto_scaling_mode=DynamicAutoscaling(), + auto_scaling_mode=DynamicAutoscaling() + if app_settings.AUTOSCALING_NODES_MONITORING is not None + else ComputationalAutoscaling(), ) return _startup @@ -54,10 +70,15 @@ def setup(app: FastAPI): if any( s is None for s in [ - app_settings.AUTOSCALING_NODES_MONITORING, app_settings.AUTOSCALING_EC2_ACCESS, app_settings.AUTOSCALING_EC2_INSTANCES, ] + ) or all( + s is None + for s in [ + app_settings.AUTOSCALING_NODES_MONITORING, + app_settings.AUTOSCALING_DASK, + ] ): logger.warning( "the autoscaling background task is disabled by settings, nothing will happen!" diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py new file mode 100644 index 00000000000..d94c855a60d --- /dev/null +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -0,0 +1,210 @@ +import contextlib +import logging +from collections.abc import AsyncIterator, Coroutine +from typing import Any, Final, TypeAlias + +import distributed +from pydantic import AnyUrl, ByteSize, parse_obj_as + +from ..core.errors import ( + DaskNoWorkersError, + DaskSchedulerNotFoundError, + DaskWorkerNotFoundError, +) +from ..models import ( + AssociatedInstance, + DaskTask, + DaskTaskId, + DaskTaskResources, + EC2InstanceData, + Resources, +) +from ..utils.auto_scaling_core import ( + node_host_name_from_ec2_private_dns, + node_ip_from_ec2_private_dns, +) + +_logger = logging.getLogger(__name__) + + +async def _wrap_client_async_routine( + client_coroutine: Coroutine[Any, Any, Any] | Any | None +) -> Any: + """Dask async behavior does not go well with Pylance as it returns + a union of types. this wrapper makes both mypy and pylance happy""" + assert client_coroutine # nosec + return await client_coroutine + + +_DASK_SCHEDULER_CONNECT_TIMEOUT_S: Final[int] = 5 + + +@contextlib.asynccontextmanager +async def _scheduler_client(url: AnyUrl) -> AsyncIterator[distributed.Client]: + """ + Raises: + DaskSchedulerNotFoundError: if the scheduler was not found/cannot be reached + """ + try: + async with distributed.Client( + url, asynchronous=True, timeout=f"{_DASK_SCHEDULER_CONNECT_TIMEOUT_S}" + ) as client: + yield client + except OSError as exc: + raise DaskSchedulerNotFoundError(url=url) from exc + + +async def list_unrunnable_tasks(url: AnyUrl) -> list[DaskTask]: + """ + Raises: + DaskSchedulerNotFoundError + """ + + def _list_tasks( + dask_scheduler: distributed.Scheduler, + ) -> dict[str, dict[str, Any]]: + return { + task.key: task.resource_restrictions for task in dask_scheduler.unrunnable + } + + async with _scheduler_client(url) as client: + list_of_tasks: dict[ + DaskTaskId, DaskTaskResources + ] = await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) + _logger.info("found unrunnable tasks: %s", list_of_tasks) + return [ + DaskTask(task_id=task_id, required_resources=task_resources) + for task_id, task_resources in list_of_tasks.items() + ] + + +async def list_processing_tasks(url: AnyUrl) -> list[DaskTaskId]: + """ + Raises: + DaskSchedulerNotFoundError + """ + async with _scheduler_client(url) as client: + processing_tasks = set() + if worker_to_processing_tasks := await _wrap_client_async_routine( + client.processing() + ): + _logger.info("cluster worker processing: %s", worker_to_processing_tasks) + for tasks in worker_to_processing_tasks.values(): + processing_tasks |= set(tasks) + + return list(processing_tasks) + + +DaskWorkerUrl: TypeAlias = str +DaskWorkerDetails: TypeAlias = dict[str, Any] + + +def _dask_worker_from_ec2_instance( + client: distributed.Client, ec2_instance: EC2InstanceData +) -> tuple[DaskWorkerUrl, DaskWorkerDetails]: + """ + Raises: + Ec2InvalidDnsNameError + DaskNoWorkersError + DaskWorkerNotFoundError + """ + node_hostname = node_host_name_from_ec2_private_dns(ec2_instance) + node_ip = node_ip_from_ec2_private_dns(ec2_instance) + scheduler_info = client.scheduler_info() + assert client.scheduler # nosec + if "workers" not in scheduler_info or not scheduler_info["workers"]: + raise DaskNoWorkersError(url=client.scheduler.address) + workers: dict[DaskWorkerUrl, DaskWorkerDetails] = scheduler_info["workers"] + + _logger.debug("looking for %s in %s", f"{ec2_instance=}", f"{workers=}") + + # dict is of type dask_worker_address: worker_details + def _find_by_worker_host( + dask_worker: tuple[DaskWorkerUrl, DaskWorkerDetails] + ) -> bool: + _, details = dask_worker + return bool(details["host"] == node_ip) or bool( + node_hostname in details["name"] + ) + + filtered_workers = dict(filter(_find_by_worker_host, workers.items())) + if not filtered_workers: + raise DaskWorkerNotFoundError( + worker_host=ec2_instance.aws_private_dns, url=client.scheduler.address + ) + assert len(filtered_workers) == 1 # nosec + return next(iter(filtered_workers.items())) + + +async def get_worker_still_has_results_in_memory( + url: AnyUrl, ec2_instance: EC2InstanceData +) -> int: + """ + Raises: + DaskSchedulerNotFoundError + Ec2InvalidDnsNameError + DaskWorkerNotFoundError + DaskNoWorkersError + """ + async with _scheduler_client(url) as client: + _, worker_details = _dask_worker_from_ec2_instance(client, ec2_instance) + + worker_metrics: dict[str, Any] = worker_details["metrics"] + return 1 if worker_metrics.get("task_counts") else 0 + + +async def get_worker_used_resources( + url: AnyUrl, ec2_instance: EC2InstanceData +) -> Resources: + """ + Raises: + DaskSchedulerNotFoundError + Ec2InvalidDnsNameError + DaskWorkerNotFoundError + DaskNoWorkersError + """ + + def _get_worker_used_resources( + dask_scheduler: distributed.Scheduler, + ) -> dict[str, dict]: + used_resources = {} + for worker_name, worker_state in dask_scheduler.workers.items(): + used_resources[worker_name] = worker_state.used_resources + return used_resources + + async with _scheduler_client(url) as client: + worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance) + + # now get the used resources + used_resources_per_worker: dict[ + str, dict[str, Any] + ] = await _wrap_client_async_routine( + client.run_on_scheduler(_get_worker_used_resources) + ) + if worker_url not in used_resources_per_worker: + raise DaskWorkerNotFoundError(worker_host=worker_url, url=url) + worker_used_resources = used_resources_per_worker[worker_url] + return Resources( + cpus=worker_used_resources.get("CPU", 0), + ram=parse_obj_as(ByteSize, worker_used_resources.get("RAM", 0)), + ) + + +async def compute_cluster_total_resources( + url: AnyUrl, instances: list[AssociatedInstance] +) -> Resources: + if not instances: + return Resources.create_as_empty() + async with _scheduler_client(url) as client: + instance_hosts = ( + node_ip_from_ec2_private_dns(i.ec2_instance) for i in instances + ) + scheduler_info = client.scheduler_info() + if "workers" not in scheduler_info or not scheduler_info["workers"]: + raise DaskNoWorkersError(url=url) + workers: dict[str, Any] = scheduler_info["workers"] + for worker_details in workers.values(): + if worker_details["host"] not in instance_hosts: + continue + + return Resources.create_as_empty() diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py index 3fa6448fa42..a337f34041e 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py @@ -16,11 +16,29 @@ def node_host_name_from_ec2_private_dns( ec2_instance_data: EC2InstanceData, ) -> str: + """returns the node host name 'ip-10-2-3-22' from the ec2 private dns + Raises: + Ec2InvalidDnsNameError: if the dns name does not follow the expected pattern + """ if match := re.match(_EC2_INTERNAL_DNS_RE, ec2_instance_data.aws_private_dns): return match.group("host_name") raise Ec2InvalidDnsNameError(aws_private_dns_name=ec2_instance_data.aws_private_dns) +def node_ip_from_ec2_private_dns( + ec2_instance_data: EC2InstanceData, +) -> str: + """returns the node ipv4 from the ec2 private dns string + Raises: + Ec2InvalidDnsNameError: if the dns name does not follow the expected pattern + """ + return ( + node_host_name_from_ec2_private_dns(ec2_instance_data) + .removeprefix("ip-") + .replace("-", ".") + ) + + async def associate_ec2_instances_with_nodes( nodes: list[Node], ec2_instances: list[EC2InstanceData] ) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]: diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py new file mode 100644 index 00000000000..22c1f430371 --- /dev/null +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py @@ -0,0 +1,112 @@ +import datetime +import logging +from typing import Final + +from fastapi import FastAPI +from servicelib.utils_formatting import timedelta_as_minute_second + +from ..core.settings import get_application_settings +from ..models import ( + AssociatedInstance, + DaskTask, + EC2InstanceData, + EC2InstanceType, + Resources, +) +from . import utils_docker + +_logger = logging.getLogger(__name__) + +_DEFAULT_MAX_CPU: Final[float] = 1 +_DEFAULT_MAX_RAM: Final[int] = 1024 + + +def get_max_resources_from_dask_task(task: DaskTask) -> Resources: + return Resources( + cpus=task.required_resources.get("CPU", _DEFAULT_MAX_CPU), + ram=task.required_resources.get("RAM", _DEFAULT_MAX_RAM), + ) + + +def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources: + total = Resources.create_as_empty() + for t in tasks: + total += get_max_resources_from_dask_task(t) + return total + + +def try_assigning_task_to_node( + pending_task: DaskTask, + instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]], +) -> bool: + for instance, node_assigned_tasks in instance_to_tasks: + instance_total_resource = utils_docker.get_node_total_resources(instance.node) + tasks_needed_resources = _compute_tasks_needed_resources(node_assigned_tasks) + if ( + instance_total_resource - tasks_needed_resources + ) >= get_max_resources_from_dask_task(pending_task): + node_assigned_tasks.append(pending_task) + return True + return False + + +async def try_assigning_task_to_pending_instances( + app: FastAPI, + pending_task: DaskTask, + list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list[DaskTask]]], + type_to_instance_map: dict[str, EC2InstanceType], + *, + notify_progress: bool, +) -> bool: + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + instance_max_time_to_start = ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME + ) + for instance, instance_assigned_tasks in list_of_pending_instance_to_tasks: + instance_type = type_to_instance_map[instance.type] + instance_total_resources = Resources( + cpus=instance_type.cpus, ram=instance_type.ram + ) + tasks_needed_resources = _compute_tasks_needed_resources( + instance_assigned_tasks + ) + if ( + instance_total_resources - tasks_needed_resources + ) >= get_max_resources_from_dask_task(pending_task): + instance_assigned_tasks.append(pending_task) + if notify_progress: + now = datetime.datetime.now(datetime.timezone.utc) + time_since_launch = now - instance.launch_time + estimated_time_to_completion = ( + instance.launch_time + instance_max_time_to_start - now + ) + _logger.info( + "LOG: %s", + f"adding machines to the cluster (time waiting: {timedelta_as_minute_second(time_since_launch)}," + f" est. remaining time: {timedelta_as_minute_second(estimated_time_to_completion)})...please wait...", + ) + _logger.info( + "PROGRESS: %s", + time_since_launch.total_seconds() + / instance_max_time_to_start.total_seconds(), + ) + return True + return False + + +def try_assigning_task_to_instance_types( + pending_task: DaskTask, + list_of_instance_to_tasks: list[tuple[EC2InstanceType, list[DaskTask]]], +) -> bool: + for instance, instance_assigned_tasks in list_of_instance_to_tasks: + instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram) + tasks_needed_resources = _compute_tasks_needed_resources( + instance_assigned_tasks + ) + if ( + instance_total_resource - tasks_needed_resources + ) >= get_max_resources_from_dask_task(pending_task): + instance_assigned_tasks.append(pending_task) + return True + return False diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py b/services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py index a610789be6c..672a8c76733 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py @@ -28,7 +28,20 @@ def get_ec2_tags_dynamic(app_settings: ApplicationSettings) -> dict[str, str]: app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS ), # NOTE: this one gets special treatment in AWS GUI and is applied to the name of the instance - "Name": f"autoscaling-{app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME}", + "Name": f"{app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_NAME_PREFIX}-{app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME}", + } + + +def get_ec2_tags_computational(app_settings: ApplicationSettings) -> dict[str, str]: + assert app_settings.AUTOSCALING_DASK # nosec + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + return { + "io.simcore.autoscaling.version": f"{VERSION}", + "io.simcore.autoscaling.dask-scheduler_url": json.dumps( + app_settings.AUTOSCALING_DASK.DASK_MONITORING_URL + ), + # NOTE: this one gets special treatment in AWS GUI and is applied to the name of the instance + "Name": f"{app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_NAME_PREFIX}-{app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME}", } diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py index 1413a3c66bd..e8bce0ba5aa 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py @@ -75,7 +75,8 @@ async def create_autoscaling_status_message( origin = "unknown" if app_settings.AUTOSCALING_NODES_MONITORING: origin = f"dynamic:node_labels={app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS}" - + elif app_settings.AUTOSCALING_DASK: + origin = f"computational:scheduler_url={app_settings.AUTOSCALING_DASK.DASK_MONITORING_URL}" return RabbitAutoscalingStatusMessage.construct( origin=origin, nodes_total=len(cluster.active_nodes) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index 88844b79684..92c5fb5d116 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -62,6 +62,13 @@ async def get_monitored_nodes( ) +async def get_worker_nodes(docker_client: AutoscalingDocker) -> list[Node]: + return parse_obj_as( + list[Node], + await docker_client.nodes.list(filters={"role": ["worker"]}), + ) + + async def remove_nodes( docker_client: AutoscalingDocker, nodes: list[Node], force: bool = False ) -> list[Node]: diff --git a/services/autoscaling/tests/manual/.env-devel b/services/autoscaling/tests/manual/.env-devel new file mode 100644 index 00000000000..63bbdbb9fb2 --- /dev/null +++ b/services/autoscaling/tests/manual/.env-devel @@ -0,0 +1,22 @@ +AUTOSCALING_DEBUG=true +AUTOSCALING_LOGLEVEL=INFO +AUTOSCALING_TASK_INTERVAL=30 +EC2_ACCESS_KEY_ID=XXXXXXXXXX +EC2_INSTANCES_ALLOWED_TYPES="[\"t2.micro\"]" +EC2_INSTANCES_AMI_ID=XXXXXXXXXX +EC2_INSTANCES_KEY_NAME=XXXXXXXXXX +EC2_INSTANCES_SECURITY_GROUP_IDS="[\"XXXXXXXXXX\"]" +EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX +EC2_SECRET_ACCESS_KEY=XXXXXXXXXX +EC2_INSTANCES_NAME_PREFIX=testing-osparc-computational-cluster +LOG_FORMAT_LOCAL_DEV_ENABLED=True +# may be activated or not +# RABBIT_HOST=rabbit +# RABBIT_PASSWORD=test +# RABBIT_PORT=5672 +# RABBIT_SECURE=false +# RABBIT_USER=test +REDIS_HOST=redis +REDIS_PORT=6379 +SC_BOOT_MODE=debug-ptvsd +SC_BUILD_TARGET=development diff --git a/services/autoscaling/tests/manual/Makefile b/services/autoscaling/tests/manual/Makefile new file mode 100644 index 00000000000..48ab85d9068 --- /dev/null +++ b/services/autoscaling/tests/manual/Makefile @@ -0,0 +1,39 @@ +include ../../../../scripts/common.Makefile +include ../../../../scripts/common-service.Makefile + + +.PHONY: up-devel up-computational-devel down + + +.stack-devel.yml: .env + ../../../../scripts/docker/docker-compose-config.bash -e .env \ + docker-compose.yml \ + > $@ + +.stack-computational-devel.yml: .env + ../../../../scripts/docker/docker-compose-config.bash -e .env \ + docker-compose.yml \ + docker-compose-computational.yml \ + > $@ + +up-devel: .init-swarm .stack-devel.yml ## starts local test application + @docker stack deploy --with-registry-auth --compose-file=.stack-devel.yml autoscaling + +up-computational-devel: .init-swarm .stack-computational-devel.yml ## starts local test application in computational mode + # DASK_MONITORING_URL set to $(DASK_MONITORING_URL) + @docker stack deploy --with-registry-auth --compose-file=.stack-computational-devel.yml comp-autoscaling + +down: .env ## stops local test app dependencies (running bare metal against AWS) + # remove stacks + -@docker stack rm comp-autoscaling + -@docker stack rm autoscaling + # remove stack files + -rm -rf .stack-devel.yml + -rm -rf .stack-computational-devel.yml + + +SWARM_HOSTS = $(shell docker node ls --format="{{.Hostname}}" 2>$(if $(IS_WIN),NUL,/dev/null)) +.PHONY: .init-swarm +.init-swarm: + # Ensures swarm is initialized + $(if $(SWARM_HOSTS),,docker swarm init --advertise-addr=$(get_my_ip)) diff --git a/services/autoscaling/tests/manual/README.md b/services/autoscaling/tests/manual/README.md new file mode 100644 index 00000000000..a8be43853f5 --- /dev/null +++ b/services/autoscaling/tests/manual/README.md @@ -0,0 +1,69 @@ +# autoscaling service manual testing + +The autoscaling service may be started either in computational mode or in dynamic mode. + +The computational mode is used in conjunction with a dask-scheduler/dask-worker subsystem. +The dynamic mode is used directly with docker swarm facilities. + +## computational mode + +When ```DASK_MONITORING_URL``` is set the computational mode is enabled. + +### requirements + +1. AWS EC2 access +2. a machine running in EC2 with docker installed and access to osparc-simcore repository + +### instructions + +1. prepare autoscaling + +```bash +# run on EC2 instance +git clone https://github.com/ITISFoundation/osparc-simcore.git +cd osparc-simcore/services/autoscaling +make build-devel # this will build the autoscaling devel image +``` + +2. setup environment variables +```bash +# run on EC2 instance +cd osparc-simcore/services/autoscaling/tests/manual +make .env # generate an initial .env file +nano .env # edit .env and set the variables as needed +``` + +3. start autoscaling/dask-scheduler stack +```bash +# run on EC2 instance +cd osparc-simcore/services/autoscaling/tests/manual +make up-computational-devel # this will deploy the autoscaling/dask-scheduler/worker stack +``` + +4. start some dask tasks to trigger autoscaling +```bash +# run on any host +cd osparc-simcore/services/autoscaling +make install-dev +pip install ipython +ipython +``` +```python +import distributed +# connect to the dask-scheduler running on the EC2 machine +client = distributed.Client("tcp://{EC2_INSTANCE_PUBLIC_IP}:8786") + +# some dummy test function to run remotely +def test_fct(x,y): + return x+y + +# send the task over to the dask-scheduler +future = client.submit(test_fct, 3, 54, resources={"CPU": 1}, pure=False) + +# this will trigger the autoscaling to create a new machine (ensure the EC2_INSTANCES_ALLOWED_TYPES variable allows for machines capable of running the job with the wanted resources) +# after about 3 minutes the job will be run +future.done() # shall return True once done + +# remove the future from the dask-scheduler memory, shall trigger the autoscaling service to remove the created machine +del future +``` diff --git a/services/autoscaling/tests/manual/docker-compose-computational.yml b/services/autoscaling/tests/manual/docker-compose-computational.yml new file mode 100644 index 00000000000..ed3aca7d163 --- /dev/null +++ b/services/autoscaling/tests/manual/docker-compose-computational.yml @@ -0,0 +1,46 @@ +version: "3.8" +services: + autoscaling: + environment: + - DASK_MONITORING_URL=tcp://dask-scheduler:8786 + dask-sidecar: + dns: 8.8.8.8 # needed to access internet + image: itisfoundation/dask-sidecar:master-github-latest + init: true + hostname: "{{.Node.Hostname}}-{{.Service.Name}}" + volumes: + - computational_shared_data:${SIDECAR_COMP_SERVICES_SHARED_FOLDER:-/home/scu/computational_shared_data} + - /var/run/docker.sock:/var/run/docker.sock:ro + - ${ETC_HOSTNAME:-/etc/hostname}:/home/scu/hostname:ro + + environment: + DASK_LOG_FORMAT_LOCAL_DEV_ENABLED: 1 + DASK_NPROCS: 1 + DASK_SCHEDULER_URL: ${DASK_SCHEDULER_URL:-tcp://dask-scheduler:8786} + DASK_SIDECAR_NON_USABLE_RAM: 0 + DASK_SIDECAR_NUM_NON_USABLE_CPUS: 0 + LOG_LEVEL: ${LOG_LEVEL:-INFO} + SIDECAR_COMP_SERVICES_SHARED_FOLDER: ${SIDECAR_COMP_SERVICES_SHARED_FOLDER:-/home/scu/computational_shared_data} + SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME: computational_shared_data + + deploy: + mode: global + placement: + constraints: + - "node.role==worker" + + dask-scheduler: + dns: 8.8.8.8 # needed to access internet + image: itisfoundation/dask-sidecar:master-github-latest + init: true + hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}" + environment: + DASK_START_AS_SCHEDULER: 1 + LOG_LEVEL: ${LOG_LEVEL:-INFO} + ports: + - 8786:8786 + - 8787:8787 + +volumes: + computational_shared_data: + name: computational_shared_data diff --git a/services/autoscaling/tests/manual/docker-compose.yml b/services/autoscaling/tests/manual/docker-compose.yml new file mode 100644 index 00000000000..e70eb62ca3c --- /dev/null +++ b/services/autoscaling/tests/manual/docker-compose.yml @@ -0,0 +1,61 @@ +version: "3.8" +services: + rabbit: + image: itisfoundation/rabbitmq:3.11.2-management + init: true + hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}" + ports: + - "5672:5672" + - "15672:15672" + - "15692" + environment: + - RABBITMQ_DEFAULT_USER=${RABBIT_USER} + - RABBITMQ_DEFAULT_PASS=${RABBIT_PASSWORD} + healthcheck: + # see https://www.rabbitmq.com/monitoring.html#individual-checks for info about health-checks available in rabbitmq + test: rabbitmq-diagnostics -q status + interval: 5s + timeout: 30s + retries: 5 + start_period: 5s + + redis: + image: "redis:6.2.6@sha256:4bed291aa5efb9f0d77b76ff7d4ab71eee410962965d052552db1fb80576431d" + init: true + hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}" + ports: + - "6379:6379" + healthcheck: + test: [ "CMD", "redis-cli", "ping" ] + interval: 5s + timeout: 30s + retries: 50 + volumes: + - redis-data:/data + + redis-commander: + image: rediscommander/redis-commander:latest + init: true + hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}" + ports: + - "18081:8081" + environment: + - REDIS_HOSTS=resources:${REDIS_HOST}:${REDIS_PORT}:0,locks:${REDIS_HOST}:${REDIS_PORT}:1,validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,announcements:${REDIS_HOST}:${REDIS_PORT}:5 + # If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml + + autoscaling: + image: local/autoscaling:development + dns: 8.8.8.8 # needed to access internet + init: true + hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}" + ports: + - "8006:8000" + - "3012:3000" + env_file: + - .env + volumes: + - "/var/run/docker.sock:/var/run/docker.sock" + - ../../:/devel/services/autoscaling + - ../../../../packages:/devel/packages +volumes: + redis-data: diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 39665cbea5d..02dbe18a6bd 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -12,6 +12,7 @@ from typing import Any, Final, cast import aiodocker +import distributed import httpx import psutil import pytest @@ -41,7 +42,7 @@ from settings_library.rabbit import RabbitSettings from simcore_service_autoscaling.core.application import create_app from simcore_service_autoscaling.core.settings import ApplicationSettings, EC2Settings -from simcore_service_autoscaling.models import Cluster +from simcore_service_autoscaling.models import Cluster, DaskTaskResources from simcore_service_autoscaling.modules.docker import AutoscalingDocker from simcore_service_autoscaling.modules.ec2 import AutoscalingEC2, EC2InstanceData from tenacity import retry @@ -53,6 +54,7 @@ from types_aiobotocore_ec2.literals import InstanceTypeType pytest_plugins = [ + "pytest_simcore.dask_scheduler", "pytest_simcore.docker_compose", "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", @@ -140,6 +142,20 @@ def enabled_dynamic_mode( ) +@pytest.fixture +def enabled_computational_mode( + app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, faker: Faker +) -> EnvVarsDict: + return app_environment | setenvs_from_dict( + monkeypatch, + { + "DASK_MONITORING_URL": faker.url(), + "DASK_MONITORING_USER_NAME": faker.user_name(), + "DASK_MONITORING_PASSWORD": faker.password(), + }, + ) + + @pytest.fixture def disabled_rabbitmq(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch): monkeypatch.delenv("RABBIT_HOST") @@ -652,7 +668,7 @@ def _creator(**overrides) -> EC2InstanceData: { "launch_time": faker.date_time(tzinfo=timezone.utc), "id": faker.uuid4(), - "aws_private_dns": faker.name(), + "aws_private_dns": f"ip-{faker.ipv4().replace('.', '-')}.ec2.internal", "type": faker.pystr(), "state": faker.pystr(), } @@ -663,6 +679,15 @@ def _creator(**overrides) -> EC2InstanceData: return _creator +@pytest.fixture +def fake_localhost_ec2_instance_data( + fake_ec2_instance_data: Callable[..., EC2InstanceData] +) -> EC2InstanceData: + local_ip = get_localhost_ip() + fake_local_ec2_private_dns = f"ip-{local_ip.replace('.', '-')}.ec2.internal" + return fake_ec2_instance_data(aws_private_dns=fake_local_ec2_private_dns) + + @pytest.fixture async def mocked_redis_server(mocker: MockerFixture) -> None: mock_redis = FakeRedis() @@ -685,3 +710,21 @@ def _creator(**cluter_overrides) -> Cluster: ) return _creator + + +@pytest.fixture +async def create_dask_task( + dask_spec_cluster_client: distributed.Client, +) -> Callable[[DaskTaskResources], distributed.Future]: + def _remote_pytest_fct(x: int, y: int) -> int: + return x + y + + def _creator(required_resources: DaskTaskResources) -> distributed.Future: + # NOTE: pure will ensure dask does not re-use the task results if we run it several times + future = dask_spec_cluster_client.submit( + _remote_pytest_fct, 23, 43, resources=required_resources, pure=False + ) + assert future + return future + + return _creator diff --git a/services/autoscaling/tests/unit/test_core_settings.py b/services/autoscaling/tests/unit/test_core_settings.py index cca0450f9af..45376d5963e 100644 --- a/services/autoscaling/tests/unit/test_core_settings.py +++ b/services/autoscaling/tests/unit/test_core_settings.py @@ -6,6 +6,7 @@ import json import pytest +from pydantic import ValidationError from pytest_simcore.helpers.utils_envs import EnvVarsDict from simcore_service_autoscaling.core.settings import ApplicationSettings @@ -15,6 +16,7 @@ def test_settings(app_environment: EnvVarsDict): assert settings.AUTOSCALING_EC2_ACCESS assert settings.AUTOSCALING_EC2_INSTANCES assert settings.AUTOSCALING_NODES_MONITORING is None + assert settings.AUTOSCALING_DASK is None assert settings.AUTOSCALING_RABBITMQ assert settings.AUTOSCALING_REDIS @@ -24,10 +26,28 @@ def test_settings_dynamic_mode(enabled_dynamic_mode: EnvVarsDict): assert settings.AUTOSCALING_EC2_ACCESS assert settings.AUTOSCALING_EC2_INSTANCES assert settings.AUTOSCALING_NODES_MONITORING + assert settings.AUTOSCALING_DASK is None assert settings.AUTOSCALING_RABBITMQ assert settings.AUTOSCALING_REDIS +def test_settings_computational_mode(enabled_computational_mode: EnvVarsDict): + settings = ApplicationSettings.create_from_envs() + assert settings.AUTOSCALING_EC2_ACCESS + assert settings.AUTOSCALING_EC2_INSTANCES + assert settings.AUTOSCALING_NODES_MONITORING is None + assert settings.AUTOSCALING_DASK + assert settings.AUTOSCALING_RABBITMQ + assert settings.AUTOSCALING_REDIS + + +def test_defining_both_computational_and_dynamic_modes_is_invalid_and_raises( + enabled_dynamic_mode: EnvVarsDict, enabled_computational_mode: EnvVarsDict +): + with pytest.raises(ValidationError): + ApplicationSettings.create_from_envs() + + def test_invalid_EC2_INSTANCES_TIME_BEFORE_TERMINATION( app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch ): diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py new file mode 100644 index 00000000000..4adc33735cd --- /dev/null +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -0,0 +1,564 @@ +# pylint: disable=no-value-for-parameter +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable +# pylint: disable=too-many-arguments + + +import asyncio +import base64 +from collections.abc import Callable, Iterator +from copy import deepcopy +from dataclasses import dataclass +from typing import Any +from unittest import mock + +import distributed +import pytest +from fastapi import FastAPI +from models_library.generated_models.docker_rest_api import Availability +from models_library.generated_models.docker_rest_api import Node as DockerNode +from models_library.generated_models.docker_rest_api import NodeState, NodeStatus +from models_library.rabbitmq_messages import RabbitAutoscalingStatusMessage +from pydantic import ByteSize, parse_obj_as +from pytest_mock import MockerFixture +from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict +from simcore_service_autoscaling.core.settings import ApplicationSettings +from simcore_service_autoscaling.models import ( + AssociatedInstance, + Cluster, + EC2InstanceData, + Resources, +) +from simcore_service_autoscaling.modules.auto_scaling_core import ( + _deactivate_empty_nodes, + auto_scale_cluster, +) +from simcore_service_autoscaling.modules.auto_scaling_mode_computational import ( + ComputationalAutoscaling, +) +from simcore_service_autoscaling.modules.dask import DaskTaskResources +from simcore_service_autoscaling.modules.docker import get_docker_client +from types_aiobotocore_ec2.client import EC2Client + + +@pytest.fixture +def local_dask_scheduler_server_envs( + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, + dask_spec_local_cluster: distributed.SpecCluster, +) -> EnvVarsDict: + return app_environment | setenvs_from_dict( + monkeypatch, + { + "DASK_MONITORING_URL": dask_spec_local_cluster.scheduler_address, + }, + ) + + +@pytest.fixture +def minimal_configuration( + docker_swarm: None, + enabled_computational_mode: EnvVarsDict, + local_dask_scheduler_server_envs: EnvVarsDict, + disabled_rabbitmq: None, + disable_dynamic_service_background_task: None, + aws_subnet_id: str, + aws_security_group_id: str, + aws_ami_id: str, + aws_allowed_ec2_instance_type_names: list[str], + mocked_redis_server: None, +) -> None: + ... + + +@pytest.fixture +def dask_workers_config() -> dict[str, Any]: + # NOTE: we override here the config to get a "weak" cluster + return { + "weak-worker": { + "cls": distributed.Worker, + "options": {"nthreads": 2, "resources": {"CPU": 2, "RAM": 2e9}}, + } + } + + +@pytest.fixture +def empty_cluster(cluster: Callable[..., Cluster]) -> Cluster: + return cluster() + + +async def _assert_ec2_instances( + ec2_client: EC2Client, + *, + num_reservations: int, + num_instances: int, + instance_type: str, + instance_state: str, +) -> list[str]: + all_instances = await ec2_client.describe_instances() + + assert len(all_instances["Reservations"]) == num_reservations + internal_dns_names = [] + for reservation in all_instances["Reservations"]: + assert "Instances" in reservation + assert len(reservation["Instances"]) == num_instances + for instance in reservation["Instances"]: + assert "InstanceType" in instance + assert instance["InstanceType"] == instance_type + assert "Tags" in instance + assert instance["Tags"] + expected_tag_keys = [ + "io.simcore.autoscaling.version", + "io.simcore.autoscaling.dask-scheduler_url", + "Name", + ] + for tag_dict in instance["Tags"]: + assert "Key" in tag_dict + assert "Value" in tag_dict + + assert tag_dict["Key"] in expected_tag_keys + assert "PrivateDnsName" in instance + instance_private_dns_name = instance["PrivateDnsName"] + assert instance_private_dns_name.endswith(".ec2.internal") + internal_dns_names.append(instance_private_dns_name) + assert "State" in instance + state = instance["State"] + assert "Name" in state + assert state["Name"] == instance_state + + assert "InstanceId" in instance + user_data = await ec2_client.describe_instance_attribute( + Attribute="userData", InstanceId=instance["InstanceId"] + ) + assert "UserData" in user_data + assert "Value" in user_data["UserData"] + user_data = base64.b64decode(user_data["UserData"]["Value"]).decode() + assert user_data.count("docker swarm join") == 1 + return internal_dns_names + + +def _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message: mock.Mock, + app_settings: ApplicationSettings, + app: FastAPI, + scheduler_address: str, + **message_update_kwargs, +): + default_message = RabbitAutoscalingStatusMessage( + origin=f"computational:scheduler_url={scheduler_address}", + nodes_total=0, + nodes_active=0, + nodes_drained=0, + cluster_total_resources=Resources.create_as_empty().dict(), + cluster_used_resources=Resources.create_as_empty().dict(), + instances_pending=0, + instances_running=0, + ) + expected_message = default_message.copy(update=message_update_kwargs) + mock_rabbitmq_post_message.assert_called_once_with( + app, + expected_message, + ) + + +@pytest.fixture +def mock_tag_node(mocker: MockerFixture) -> mock.Mock: + async def fake_tag_node(*args, **kwargs) -> DockerNode: + return args[1] + + return mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.tag_node", + autospec=True, + side_effect=fake_tag_node, + ) + + +@pytest.fixture +def mock_find_node_with_name( + mocker: MockerFixture, fake_node: DockerNode +) -> Iterator[mock.Mock]: + return mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.find_node_with_name", + autospec=True, + return_value=fake_node, + ) + + +@pytest.fixture +def mock_set_node_availability(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_availability", + autospec=True, + ) + + +@pytest.fixture +def mock_cluster_used_resources(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.compute_cluster_used_resources", + autospec=True, + return_value=Resources.create_as_empty(), + ) + + +@pytest.fixture +def mock_compute_node_used_resources(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.compute_node_used_resources", + autospec=True, + return_value=Resources.create_as_empty(), + ) + + +@pytest.fixture +def mock_rabbitmq_post_message(mocker: MockerFixture) -> Iterator[mock.Mock]: + return mocker.patch( + "simcore_service_autoscaling.utils.rabbitmq.post_message", autospec=True + ) + + +@pytest.fixture +def mock_terminate_instances(mocker: MockerFixture) -> Iterator[mock.Mock]: + return mocker.patch( + "simcore_service_autoscaling.modules.ec2.AutoscalingEC2.terminate_instances", + autospec=True, + ) + + +@pytest.fixture +def mock_start_aws_instance( + mocker: MockerFixture, + aws_instance_private_dns: str, + fake_ec2_instance_data: Callable[..., EC2InstanceData], +) -> Iterator[mock.Mock]: + return mocker.patch( + "simcore_service_autoscaling.modules.ec2.AutoscalingEC2.start_aws_instance", + autospec=True, + return_value=fake_ec2_instance_data(aws_private_dns=aws_instance_private_dns), + ) + + +async def test_cluster_scaling_with_no_tasks_does_nothing( + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + mock_start_aws_instance: mock.Mock, + mock_terminate_instances: mock.Mock, + mock_rabbitmq_post_message: mock.Mock, + dask_spec_local_cluster: distributed.SpecCluster, +): + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + mock_start_aws_instance.assert_not_called() + mock_terminate_instances.assert_not_called() + _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message, + app_settings, + initialized_app, + dask_spec_local_cluster.scheduler_address, + ) + + +async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], + mock_start_aws_instance: mock.Mock, + mock_terminate_instances: mock.Mock, + mock_rabbitmq_post_message: mock.Mock, + dask_spec_local_cluster: distributed.SpecCluster, +): + # create a task that needs too much power + dask_future = create_dask_task({"RAM": int(parse_obj_as(ByteSize, "12800GiB"))}) + assert dask_future + + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + mock_start_aws_instance.assert_not_called() + mock_terminate_instances.assert_not_called() + _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message, + app_settings, + initialized_app, + dask_spec_local_cluster.scheduler_address, + ) + + +async def test_cluster_scaling_up( + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], + ec2_client: EC2Client, + mock_tag_node: mock.Mock, + fake_node: DockerNode, + mock_rabbitmq_post_message: mock.Mock, + mock_find_node_with_name: mock.Mock, + mock_set_node_availability: mock.Mock, + mock_compute_node_used_resources: mock.Mock, + mocker: MockerFixture, + dask_spec_local_cluster: distributed.SpecCluster, +): + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # create a task that needs more power + dask_future = create_dask_task({"RAM": int(parse_obj_as(ByteSize, "128GiB"))}) + assert dask_future + + # this should trigger a scaling up as we have no nodes + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + + # check the instance was started and we have exactly 1 + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type="r5n.4xlarge", + instance_state="running", + ) + + # as the new node is already running, but is not yet connected, hence not tagged and drained + mock_find_node_with_name.assert_not_called() + mock_tag_node.assert_not_called() + mock_set_node_availability.assert_not_called() + mock_compute_node_used_resources.assert_not_called() + # check rabbit messages were sent + _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message, + app_settings, + initialized_app, + dask_spec_local_cluster.scheduler_address, + instances_running=0, + instances_pending=1, + ) + mock_rabbitmq_post_message.reset_mock() + + # 2. running this again should not scale again, but tag the node and make it available + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + mock_compute_node_used_resources.assert_not_called() + internal_dns_names = await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type="r5n.4xlarge", + instance_state="running", + ) + assert len(internal_dns_names) == 1 + internal_dns_name = internal_dns_names[0].removesuffix(".ec2.internal") + + # the node is tagged and made active right away since we still have the pending task + mock_find_node_with_name.assert_called_once() + mock_find_node_with_name.reset_mock() + expected_docker_node_tags = {} + mock_tag_node.assert_called_once_with( + get_docker_client(initialized_app), + fake_node, + tags=expected_docker_node_tags, + available=False, + ) + mock_tag_node.reset_mock() + mock_set_node_availability.assert_called_once_with( + get_docker_client(initialized_app), fake_node, available=True + ) + mock_set_node_availability.reset_mock() + # in this case there is no message sent since the worker was not started yet + mock_rabbitmq_post_message.assert_not_called() + + # now we have 1 monitored node that needs to be mocked + auto_scaling_mode = ComputationalAutoscaling() + mocker.patch.object( + auto_scaling_mode, + "get_monitored_nodes", + autospec=True, + return_value=[fake_node], + ) + fake_node.Status = NodeStatus(State=NodeState.ready, Message=None, Addr=None) + assert fake_node.Spec + fake_node.Spec.Availability = Availability.active + assert fake_node.Description + fake_node.Description.Hostname = internal_dns_name + + # 3. calling this multiple times should do nothing + for _ in range(10): + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=auto_scaling_mode + ) + mock_compute_node_used_resources.assert_not_called() + mock_find_node_with_name.assert_not_called() + mock_tag_node.assert_not_called() + mock_set_node_availability.assert_not_called() + # check the number of instances did not change and is still running + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=1, + instance_type="r5n.4xlarge", + instance_state="running", + ) + + # check rabbit messages were sent + # NOTE: we currently have no real dask-worker here + mock_rabbitmq_post_message.assert_not_called() + + +@dataclass(frozen=True) +class _ScaleUpParams: + task_resources: Resources + num_tasks: int + expected_instance_type: str + expected_num_instances: int + + +def _dask_task_resources_from_resources(resources: Resources) -> DaskTaskResources: + return { + res_key.upper(): res_value for res_key, res_value in resources.dict().items() + } + + +@pytest.mark.parametrize( + "scale_up_params", + [ + pytest.param( + _ScaleUpParams( + task_resources=Resources(cpus=5, ram=parse_obj_as(ByteSize, "36Gib")), + num_tasks=10, + expected_instance_type="g3.4xlarge", + expected_num_instances=4, + ), + id="isolve", + ) + ], +) +async def test_cluster_scaling_up_starts_multiple_instances( + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], + ec2_client: EC2Client, + mock_tag_node: mock.Mock, + scale_up_params: _ScaleUpParams, + mock_rabbitmq_post_message: mock.Mock, + mock_find_node_with_name: mock.Mock, + mock_set_node_availability: mock.Mock, + dask_spec_local_cluster: distributed.SpecCluster, +): + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # create several tasks that needs more power + dask_futures = await asyncio.gather( + *( + asyncio.get_event_loop().run_in_executor( + None, + create_dask_task, + _dask_task_resources_from_resources(scale_up_params.task_resources), + ) + for _ in range(scale_up_params.num_tasks) + ) + ) + assert dask_futures + + # run the code + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + + # check the instances were started + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=scale_up_params.expected_num_instances, + instance_type="g3.4xlarge", + instance_state="running", + ) + + # as the new node is already running, but is not yet connected, hence not tagged and drained + mock_find_node_with_name.assert_not_called() + mock_tag_node.assert_not_called() + mock_set_node_availability.assert_not_called() + # check rabbit messages were sent + _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message, + app_settings, + initialized_app, + dask_spec_local_cluster.scheduler_address, + instances_pending=scale_up_params.expected_num_instances, + ) + mock_rabbitmq_post_message.reset_mock() + + +@pytest.fixture +def fake_associated_host_instance( + host_node: DockerNode, + fake_localhost_ec2_instance_data: EC2InstanceData, +) -> AssociatedInstance: + return AssociatedInstance( + host_node, + fake_localhost_ec2_instance_data, + ) + + +async def test__deactivate_empty_nodes( + minimal_configuration: None, + initialized_app: FastAPI, + cluster: Callable[..., Cluster], + host_node: DockerNode, + fake_associated_host_instance: AssociatedInstance, + mock_set_node_availability: mock.Mock, +): + # since we have no service running, we expect the passed node to be set to drain + active_cluster = cluster(active_nodes=[fake_associated_host_instance]) + updated_cluster = await _deactivate_empty_nodes( + initialized_app, active_cluster, ComputationalAutoscaling() + ) + assert not updated_cluster.active_nodes + assert updated_cluster.drained_nodes == active_cluster.active_nodes + mock_set_node_availability.assert_called_once_with( + mock.ANY, host_node, available=False + ) + + +async def test__deactivate_empty_nodes_with_finished_tasks_should_not_deactivate_until_tasks_are_retrieved( + minimal_configuration: None, + initialized_app: FastAPI, + cluster: Callable[..., Cluster], + host_node: DockerNode, + fake_associated_host_instance: AssociatedInstance, + mock_set_node_availability: mock.Mock, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], +): + dask_future = create_dask_task({}) + assert dask_future + # NOTE: this sucks, but it seems that as soon as we use any method of the future it returns the data to the caller + await asyncio.sleep(4) + # since we have result still in memory, the node shall remain active + active_cluster = cluster(active_nodes=[fake_associated_host_instance]) + + updated_cluster = await _deactivate_empty_nodes( + initialized_app, deepcopy(active_cluster), ComputationalAutoscaling() + ) + assert updated_cluster.active_nodes + mock_set_node_availability.assert_not_called() + + # now removing the dask_future shall remove the result from the memory + del dask_future + await asyncio.sleep(4) + updated_cluster = await _deactivate_empty_nodes( + initialized_app, deepcopy(active_cluster), ComputationalAutoscaling() + ) + assert not updated_cluster.active_nodes + assert updated_cluster.drained_nodes == active_cluster.active_nodes + mock_set_node_availability.assert_called_once_with( + mock.ANY, host_node, available=False + ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_task.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_task.py index ae23418e25f..ceb7816531b 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_task.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_task.py @@ -61,3 +61,15 @@ async def test_auto_scaling_task_created_and_deleted_with_dynamic_mode( assert hasattr(initialized_app.state, "autoscaler_task") await asyncio.sleep(5 * _FAST_POLL_INTERVAL) mock_background_task.assert_called() + + +async def test_auto_scaling_task_created_and_deleted_with_computational_mode( + enabled_computational_mode: EnvVarsDict, + mock_background_task: mock.Mock, + initialized_app: FastAPI, + app_settings: ApplicationSettings, +): + assert app_settings.AUTOSCALING_POLL_INTERVAL.total_seconds() == _FAST_POLL_INTERVAL + assert hasattr(initialized_app.state, "autoscaler_task") + await asyncio.sleep(5 * _FAST_POLL_INTERVAL) + mock_background_task.assert_called() diff --git a/services/autoscaling/tests/unit/test_modules_dask.py b/services/autoscaling/tests/unit/test_modules_dask.py new file mode 100644 index 00000000000..561ed1755d3 --- /dev/null +++ b/services/autoscaling/tests/unit/test_modules_dask.py @@ -0,0 +1,306 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +import asyncio +from collections.abc import Callable +from typing import Any, Final + +import distributed +import pytest +from faker import Faker +from pydantic import AnyUrl, ByteSize, parse_obj_as +from simcore_service_autoscaling.core.errors import ( + DaskNoWorkersError, + DaskSchedulerNotFoundError, + DaskWorkerNotFoundError, + Ec2InvalidDnsNameError, +) +from simcore_service_autoscaling.models import ( + DaskTaskId, + DaskTaskResources, + EC2InstanceData, + Resources, +) +from simcore_service_autoscaling.modules.dask import ( + DaskTask, + _scheduler_client, + get_worker_still_has_results_in_memory, + get_worker_used_resources, + list_processing_tasks, + list_unrunnable_tasks, +) +from tenacity import retry, stop_after_delay, wait_fixed + + +async def test__scheduler_client_with_wrong_url(faker: Faker): + with pytest.raises(DaskSchedulerNotFoundError): + async with _scheduler_client( + parse_obj_as(AnyUrl, f"tcp://{faker.ipv4()}:{faker.port_number()}") + ): + ... + + +@pytest.fixture +def scheduler_url(dask_spec_local_cluster: distributed.SpecCluster) -> AnyUrl: + return parse_obj_as(AnyUrl, dask_spec_local_cluster.scheduler_address) + + +@pytest.fixture +def dask_workers_config() -> dict[str, Any]: + # NOTE: override of pytest-simcore dask_workers_config to have only 1 worker + return { + "single-cpu-worker": { + "cls": distributed.Worker, + "options": { + "nthreads": 2, + "resources": {"CPU": 2, "RAM": 48e9}, + }, + } + } + + +async def test__scheduler_client(scheduler_url: AnyUrl): + async with _scheduler_client(scheduler_url): + ... + + +async def test_list_unrunnable_tasks_with_no_workers( + dask_local_cluster_without_workers: distributed.SpecCluster, +): + scheduler_url = parse_obj_as( + AnyUrl, dask_local_cluster_without_workers.scheduler_address + ) + assert await list_unrunnable_tasks(scheduler_url) == [] + + +async def test_list_unrunnable_tasks( + scheduler_url: AnyUrl, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], +): + # we have nothing running now + assert await list_unrunnable_tasks(scheduler_url) == [] + # start a task that cannot run + dask_task_impossible_resources = {"XRAM": 213} + future = create_dask_task(dask_task_impossible_resources) + assert future + assert await list_unrunnable_tasks(scheduler_url) == [ + DaskTask(task_id=future.key, required_resources=dask_task_impossible_resources) + ] + # remove that future, will remove the task + del future + assert await list_unrunnable_tasks(scheduler_url) == [] + + +_REMOTE_FCT_SLEEP_TIME_S: Final[int] = 3 + + +async def test_list_processing_tasks( + scheduler_url: AnyUrl, + dask_spec_cluster_client: distributed.Client, +): + def _add_fct(x: int, y: int) -> int: + import time + + time.sleep(_REMOTE_FCT_SLEEP_TIME_S) + return x + y + + # there is nothing now + assert await list_processing_tasks(url=scheduler_url) == [] + + # this function will be queued and executed as there are no specific resources needed + future_queued_task = dask_spec_cluster_client.submit(_add_fct, 2, 5) + assert future_queued_task + + assert await list_processing_tasks(scheduler_url) == [ + DaskTaskId(future_queued_task.key) + ] + + result = await future_queued_task.result(timeout=_REMOTE_FCT_SLEEP_TIME_S + 4) # type: ignore + assert result == 7 + + # nothing processing anymore + assert await list_processing_tasks(url=scheduler_url) == [] + + +_DASK_SCHEDULER_REACTION_TIME_S: Final[int] = 4 + + +@retry(stop=stop_after_delay(_DASK_SCHEDULER_REACTION_TIME_S), wait=wait_fixed(1)) +async def _wait_for_task_done(future: distributed.Future) -> None: + assert future.done() is True + + +async def _wait_for_dask_scheduler_to_change_state() -> None: + # NOTE: I know this is kind of stupid + await asyncio.sleep(_DASK_SCHEDULER_REACTION_TIME_S) + + +@pytest.fixture +def fake_ec2_instance_data_with_invalid_ec2_name( + fake_ec2_instance_data: Callable[..., EC2InstanceData], faker: Faker +) -> EC2InstanceData: + return fake_ec2_instance_data(aws_private_dns=faker.name()) + + +async def test_get_worker_still_has_results_in_memory_with_invalid_ec2_name_raises( + scheduler_url: AnyUrl, + fake_ec2_instance_data_with_invalid_ec2_name: EC2InstanceData, +): + with pytest.raises(Ec2InvalidDnsNameError): + await get_worker_still_has_results_in_memory( + scheduler_url, fake_ec2_instance_data_with_invalid_ec2_name + ) + + +async def test_get_worker_still_has_results_in_memory_with_no_workers_raises( + dask_local_cluster_without_workers: distributed.SpecCluster, + fake_localhost_ec2_instance_data: EC2InstanceData, +): + scheduler_url = parse_obj_as( + AnyUrl, dask_local_cluster_without_workers.scheduler_address + ) + with pytest.raises(DaskNoWorkersError): + await get_worker_still_has_results_in_memory( + scheduler_url, fake_localhost_ec2_instance_data + ) + + +async def test_get_worker_still_has_results_in_memory_with_invalid_worker_host_raises( + scheduler_url: AnyUrl, + fake_ec2_instance_data: Callable[..., EC2InstanceData], +): + ec2_instance_data = fake_ec2_instance_data() + with pytest.raises(DaskWorkerNotFoundError): + await get_worker_still_has_results_in_memory(scheduler_url, ec2_instance_data) + + +@pytest.mark.parametrize("fct_shall_err", [True, False], ids=str) +async def test_get_worker_still_has_results_in_memory( + scheduler_url: AnyUrl, + dask_spec_cluster_client: distributed.Client, + fake_localhost_ec2_instance_data: EC2InstanceData, + fct_shall_err: bool, +): + # nothing ran, so it's 0 + assert ( + await get_worker_still_has_results_in_memory( + scheduler_url, fake_localhost_ec2_instance_data + ) + == 0 + ) + + # now run something quickly + def _add_fct(x: int, y: int) -> int: + if fct_shall_err: + msg = "BAM" + raise RuntimeError(msg) + return x + y + + # this will run right away and remain in memory until we fetch it + future_queued_task = dask_spec_cluster_client.submit(_add_fct, 2, 5) + assert future_queued_task + await _wait_for_task_done(future_queued_task) + assert ( + await get_worker_still_has_results_in_memory( + scheduler_url, fake_localhost_ec2_instance_data + ) + == 1 + ) + + # get the result will NOT bring the data back + if fct_shall_err: + exc = await future_queued_task.exception( # type: ignore + timeout=_DASK_SCHEDULER_REACTION_TIME_S + ) + assert isinstance(exc, RuntimeError) + else: + result = await future_queued_task.result(timeout=_DASK_SCHEDULER_REACTION_TIME_S) # type: ignore + assert result == 7 + + await _wait_for_dask_scheduler_to_change_state() + assert ( + await get_worker_still_has_results_in_memory( + scheduler_url, fake_localhost_ec2_instance_data + ) + == 1 + ) + + # this should remove the memory + del future_queued_task + await _wait_for_dask_scheduler_to_change_state() + assert ( + await get_worker_still_has_results_in_memory( + scheduler_url, fake_localhost_ec2_instance_data + ) + == 0 + ) + + +async def test_worker_used_resources_with_invalid_ec2_name_raises( + scheduler_url: AnyUrl, + fake_ec2_instance_data_with_invalid_ec2_name: EC2InstanceData, +): + with pytest.raises(Ec2InvalidDnsNameError): + await get_worker_used_resources( + scheduler_url, fake_ec2_instance_data_with_invalid_ec2_name + ) + + +async def test_worker_used_resources_with_no_workers_raises( + dask_local_cluster_without_workers: distributed.SpecCluster, + fake_localhost_ec2_instance_data: EC2InstanceData, +): + scheduler_url = parse_obj_as( + AnyUrl, dask_local_cluster_without_workers.scheduler_address + ) + with pytest.raises(DaskNoWorkersError): + await get_worker_used_resources(scheduler_url, fake_localhost_ec2_instance_data) + + +async def test_worker_used_resources_with_invalid_worker_host_raises( + scheduler_url: AnyUrl, + fake_ec2_instance_data: Callable[..., EC2InstanceData], +): + ec2_instance_data = fake_ec2_instance_data() + with pytest.raises(DaskWorkerNotFoundError): + await get_worker_used_resources(scheduler_url, ec2_instance_data) + + +async def test_worker_used_resources( + scheduler_url: AnyUrl, + dask_spec_cluster_client: distributed.Client, + fake_localhost_ec2_instance_data: EC2InstanceData, +): + # initial state + assert ( + await get_worker_used_resources(scheduler_url, fake_localhost_ec2_instance_data) + == Resources.create_as_empty() + ) + + def _add_fct(x: int, y: int) -> int: + import time + + time.sleep(_DASK_SCHEDULER_REACTION_TIME_S * 2) + return x + y + + # run something that uses resources + num_cpus = 2 + future_queued_task = dask_spec_cluster_client.submit( + _add_fct, 2, 5, resources={"CPU": num_cpus} + ) + assert future_queued_task + await _wait_for_dask_scheduler_to_change_state() + assert await get_worker_used_resources( + scheduler_url, fake_localhost_ec2_instance_data + ) == Resources(cpus=num_cpus, ram=ByteSize(0)) + + result = await future_queued_task.result(timeout=_DASK_SCHEDULER_REACTION_TIME_S) # type: ignore + assert result == 7 + + # back to no use + assert ( + await get_worker_used_resources(scheduler_url, fake_localhost_ec2_instance_data) + == Resources.create_as_empty() + ) diff --git a/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py b/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py index 1e5027ffe79..345a757f575 100644 --- a/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py +++ b/services/autoscaling/tests/unit/test_utils_auto_scaling_core.py @@ -62,9 +62,9 @@ def test_node_host_name_from_ec2_private_dns( def test_node_host_name_from_ec2_private_dns_raises_with_invalid_name( - fake_ec2_instance_data: Callable[..., EC2InstanceData] + fake_ec2_instance_data: Callable[..., EC2InstanceData], faker: Faker ): - instance = fake_ec2_instance_data() + instance = fake_ec2_instance_data(aws_private_dns=faker.name()) with pytest.raises(Ec2InvalidDnsNameError): node_host_name_from_ec2_private_dns(instance) diff --git a/services/autoscaling/tests/unit/test_utils_computational_scaling.py b/services/autoscaling/tests/unit/test_utils_computational_scaling.py new file mode 100644 index 00000000000..76bf27a4857 --- /dev/null +++ b/services/autoscaling/tests/unit/test_utils_computational_scaling.py @@ -0,0 +1,230 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable +# pylint: disable=too-many-arguments + +import datetime +from collections.abc import Callable +from unittest import mock + +import pytest +from faker import Faker +from models_library.generated_models.docker_rest_api import Node as DockerNode +from pydantic import ByteSize, parse_obj_as +from pytest_mock import MockerFixture +from simcore_service_autoscaling.models import ( + AssociatedInstance, + DaskTask, + DaskTaskResources, + EC2InstanceData, + EC2InstanceType, + Resources, +) +from simcore_service_autoscaling.utils.computational_scaling import ( + _DEFAULT_MAX_CPU, + _DEFAULT_MAX_RAM, + get_max_resources_from_dask_task, + try_assigning_task_to_instance_types, + try_assigning_task_to_node, + try_assigning_task_to_pending_instances, +) + + +@pytest.mark.parametrize( + "dask_task, expected_resource", + [ + pytest.param( + DaskTask(task_id="fake", required_resources=DaskTaskResources()), + Resources( + cpus=_DEFAULT_MAX_CPU, ram=parse_obj_as(ByteSize, _DEFAULT_MAX_RAM) + ), + id="missing resources returns defaults", + ), + pytest.param( + DaskTask(task_id="fake", required_resources={"CPU": 2.5}), + Resources(cpus=2.5, ram=parse_obj_as(ByteSize, _DEFAULT_MAX_RAM)), + id="only cpus defined", + ), + pytest.param( + DaskTask( + task_id="fake", + required_resources={"CPU": 2.5, "RAM": 2 * 1024 * 1024 * 1024}, + ), + Resources(cpus=2.5, ram=parse_obj_as(ByteSize, "2GiB")), + id="cpu and ram defined", + ), + pytest.param( + DaskTask( + task_id="fake", + required_resources={"CPU": 2.5, "ram": 2 * 1024 * 1024 * 1024}, + ), + Resources(cpus=2.5, ram=parse_obj_as(ByteSize, _DEFAULT_MAX_RAM)), + id="invalid naming", + ), + ], +) +def test_get_max_resources_from_dask_task( + dask_task: DaskTask, expected_resource: Resources +): + assert get_max_resources_from_dask_task(dask_task) == expected_resource + + +@pytest.fixture +def fake_app(mocker: MockerFixture) -> mock.Mock: + app = mocker.Mock() + app.state.settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME = ( + datetime.timedelta(minutes=1) + ) + return app + + +@pytest.fixture +def fake_task(faker: Faker) -> Callable[..., DaskTask]: + def _creator(**overrides) -> DaskTask: + return DaskTask( + **( + { + "task_id": faker.uuid4(), + "required_resources": DaskTaskResources(faker.pydict()), + } + | overrides + ) + ) + + return _creator + + +async def test_try_assigning_task_to_node_with_no_instances( + fake_task: Callable[..., DaskTask], +): + task = fake_task() + assert try_assigning_task_to_node(task, []) is False + + +@pytest.fixture +def fake_associated_host_instance( + host_node: DockerNode, + fake_ec2_instance_data: Callable[..., EC2InstanceData], +) -> AssociatedInstance: + return AssociatedInstance( + host_node, + fake_ec2_instance_data(), + ) + + +async def test_try_assigning_task_to_node( + fake_task: Callable[..., DaskTask], + fake_associated_host_instance: AssociatedInstance, +): + task = fake_task(required_resources={"CPU": 2}) + assert fake_associated_host_instance.node.Description + assert fake_associated_host_instance.node.Description.Resources + # we set the node to have 4 CPUs + fake_associated_host_instance.node.Description.Resources.NanoCPUs = int(4e9) + instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]] = [ + (fake_associated_host_instance, []) + ] + assert try_assigning_task_to_node(task, instance_to_tasks) is True + assert instance_to_tasks[0][1] == [task] + # this should work again + assert try_assigning_task_to_node(task, instance_to_tasks) is True + assert instance_to_tasks[0][1] == [task, task] + # this should now fail + assert try_assigning_task_to_node(task, instance_to_tasks) is False + assert instance_to_tasks[0][1] == [task, task] + + +async def test_try_assigning_task_to_pending_instances_with_no_instances( + fake_app: mock.Mock, + fake_task: Callable[..., DaskTask], +): + task = fake_task() + assert ( + await try_assigning_task_to_pending_instances( + fake_app, task, [], {}, notify_progress=True + ) + is False + ) + + +async def test_try_assigning_task_to_pending_instances( + fake_app: mock.Mock, + fake_task: Callable[..., DaskTask], + fake_ec2_instance_data: Callable[..., EC2InstanceData], +): + task = fake_task(required_resources={"CPU": 2}) + ec2_instance = fake_ec2_instance_data() + pending_instance_to_tasks: list[tuple[EC2InstanceData, list[DaskTask]]] = [ + (ec2_instance, []) + ] + type_to_instance_map = { + ec2_instance.type: EC2InstanceType( + name=ec2_instance.type, cpus=4, ram=ByteSize(1024 * 1024) + ) + } + # calling once should allow to add that task to the instance + assert ( + await try_assigning_task_to_pending_instances( + fake_app, + task, + pending_instance_to_tasks, + type_to_instance_map, + notify_progress=True, + ) + is True + ) + assert pending_instance_to_tasks[0][1] == [task] + # calling a second time as well should allow to add that task to the instance + assert ( + await try_assigning_task_to_pending_instances( + fake_app, + task, + pending_instance_to_tasks, + type_to_instance_map, + notify_progress=True, + ) + is True + ) + assert pending_instance_to_tasks[0][1] == [task, task] + # calling a third time should fail + assert ( + await try_assigning_task_to_pending_instances( + fake_app, + task, + pending_instance_to_tasks, + type_to_instance_map, + notify_progress=True, + ) + is False + ) + assert pending_instance_to_tasks[0][1] == [task, task] + + +def test_try_assigning_task_to_instance_types_with_empty_types( + fake_task: Callable[..., DaskTask] +): + task = fake_task(required_resources={"CPU": 2}) + assert try_assigning_task_to_instance_types(task, []) is False + + +def test_try_assigning_task_to_instance_types( + fake_task: Callable[..., DaskTask], faker: Faker +): + task = fake_task(required_resources={"CPU": 2}) + # create an instance type with some CPUs + fake_instance_type = EC2InstanceType( + name=faker.name(), cpus=6, ram=parse_obj_as(ByteSize, "2GiB") + ) + instance_type_to_tasks: list[tuple[EC2InstanceType, list[DaskTask]]] = [ + (fake_instance_type, []) + ] + # now this should work 3 times + assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is True + assert instance_type_to_tasks[0][1] == [task] + assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is True + assert instance_type_to_tasks[0][1] == [task, task] + assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is True + assert instance_type_to_tasks[0][1] == [task, task, task] + # now it should fail + assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is False + assert instance_type_to_tasks[0][1] == [task, task, task] diff --git a/services/autoscaling/tests/unit/test_utils_docker.py b/services/autoscaling/tests/unit/test_utils_docker.py index 64f704bbe52..3a5985becf0 100644 --- a/services/autoscaling/tests/unit/test_utils_docker.py +++ b/services/autoscaling/tests/unit/test_utils_docker.py @@ -40,6 +40,7 @@ get_max_resources_from_docker_task, get_monitored_nodes, get_node_total_resources, + get_worker_nodes, pending_service_tasks_with_insufficient_resources, remove_nodes, tag_node, @@ -133,6 +134,14 @@ async def test_get_monitored_nodes_with_valid_label( ) +async def test_worker_nodes( + autoscaling_docker: AutoscalingDocker, + host_node: Node, +): + worker_nodes = await get_worker_nodes(autoscaling_docker) + assert not worker_nodes + + async def test_remove_monitored_down_nodes_with_empty_list_does_nothing( autoscaling_docker: AutoscalingDocker, ):