From ec12cfc42fe3b79457d3673281b04b7e655ee432 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Tue, 5 Dec 2023 11:41:04 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9BBugfix/autoscaling=20does=20not=20s?= =?UTF-8?q?cale=20above=20limit=20(#5129)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resource_constraints.py | 18 ++ .../tests/test_resource_constraints.py | 34 +++ .../src/simcore_service_autoscaling/models.py | 4 +- .../modules/auto_scaling_core.py | 93 ++++++- .../modules/dask.py | 3 +- .../utils/computational_scaling.py | 12 +- ...test_modules_auto_scaling_computational.py | 236 ++++++++++++++++-- .../modules/dask_client.py | 8 +- 8 files changed, 376 insertions(+), 32 deletions(-) create mode 100644 packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py create mode 100644 packages/dask-task-models-library/tests/test_resource_constraints.py diff --git a/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py b/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py new file mode 100644 index 00000000000..3a81114ef87 --- /dev/null +++ b/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py @@ -0,0 +1,18 @@ +from typing import Any, TypeAlias + +from .constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY + +DaskTaskResources: TypeAlias = dict[str, Any] + + +def create_ec2_resource_constraint_key(ec2_instance_type: str) -> str: + return f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{ec2_instance_type}" + + +def get_ec2_instance_type_from_resources( + task_resources: DaskTaskResources, +) -> str | None: + for resource_name in task_resources: + if resource_name.startswith(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY): + return resource_name.split(":")[-1] + return None diff --git a/packages/dask-task-models-library/tests/test_resource_constraints.py b/packages/dask-task-models-library/tests/test_resource_constraints.py new file mode 100644 index 00000000000..9a2c1e59e26 --- /dev/null +++ b/packages/dask-task-models-library/tests/test_resource_constraints.py @@ -0,0 +1,34 @@ +from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY +from dask_task_models_library.resource_constraints import ( + create_ec2_resource_constraint_key, + get_ec2_instance_type_from_resources, +) +from faker import Faker + + +def test_create_ec2_resource_constraint_key(faker: Faker): + faker_instance_type = faker.pystr() + assert ( + create_ec2_resource_constraint_key(faker_instance_type) + == f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{faker_instance_type}" + ) + + empty_instance_type = "" + assert ( + create_ec2_resource_constraint_key(empty_instance_type) + == f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:" + ) + + +def test_get_ec2_instance_type_from_resources(faker: Faker): + empty_task_resources = {} + assert get_ec2_instance_type_from_resources(empty_task_resources) is None + no_ec2_types_in_resources = {"blahblah": 1} + assert get_ec2_instance_type_from_resources(no_ec2_types_in_resources) is None + + faker_instance_type = faker.pystr() + ec2_type_in_resources = {create_ec2_resource_constraint_key(faker_instance_type): 1} + assert ( + get_ec2_instance_type_from_resources(ec2_type_in_resources) + == faker_instance_type + ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index d9c3bf3978d..de2df37d83c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -1,7 +1,8 @@ from dataclasses import dataclass, field -from typing import Any, TypeAlias +from typing import TypeAlias from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources +from dask_task_models_library.resource_constraints import DaskTaskResources from models_library.generated_models.docker_rest_api import Node @@ -55,7 +56,6 @@ class Cluster: DaskTaskId: TypeAlias = str -DaskTaskResources: TypeAlias = dict[str, Any] @dataclass(frozen=True, kw_only=True) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 1e0c3f28a9e..208f10f5790 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -11,6 +11,7 @@ EC2InstanceConfig, EC2InstanceData, EC2InstanceType, + EC2Tags, Resources, ) from fastapi import FastAPI @@ -441,6 +442,73 @@ async def _find_needed_instances( return num_instances_per_type +async def _cap_needed_instances( + app: FastAPI, needed_instances: dict[EC2InstanceType, int], ec2_tags: EC2Tags +) -> dict[EC2InstanceType, int]: + """caps the needed instances dict[EC2InstanceType, int] to the maximal allowed number of instances by + 1. limiting to 1 per asked type + 2. increasing each by 1 until the maximum allowed number of instances is reached + NOTE: the maximum allowed number of instances contains the current number of running/pending machines + + Raises: + Ec2TooManyInstancesError: raised when the maximum of machines is already running/pending + """ + ec2_client = get_ec2_client(app) + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + current_instances = await ec2_client.get_instances( + key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], + tags=ec2_tags, + ) + current_number_of_instances = len(current_instances) + if ( + current_number_of_instances + >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ): + # ok that is already too much + raise Ec2TooManyInstancesError( + num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ) + + total_number_of_needed_instances = sum(needed_instances.values()) + if ( + current_number_of_instances + total_number_of_needed_instances + <= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ): + # ok that fits no need to do anything here + return needed_instances + + # this is asking for too many, so let's cap them + max_number_of_creatable_instances = ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + - current_number_of_instances + ) + + # we start with 1 machine of each type until the max + capped_needed_instances = { + k: 1 + for count, k in enumerate(needed_instances) + if (count + 1) <= max_number_of_creatable_instances + } + + if len(capped_needed_instances) < len(needed_instances): + # there were too many types for the number of possible instances + return capped_needed_instances + + # all instance types were added, now create more of them if possible + while sum(capped_needed_instances.values()) < max_number_of_creatable_instances: + for instance_type, num_to_create in needed_instances.items(): + if ( + sum(capped_needed_instances.values()) + == max_number_of_creatable_instances + ): + break + if num_to_create > capped_needed_instances[instance_type]: + capped_needed_instances[instance_type] += 1 + + return capped_needed_instances + + async def _start_instances( app: FastAPI, needed_instances: dict[EC2InstanceType, int], @@ -450,14 +518,28 @@ async def _start_instances( ec2_client = get_ec2_client(app) app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + new_instance_tags = auto_scaling_mode.get_ec2_tags(app) + capped_needed_machines = {} + try: + capped_needed_machines = await _cap_needed_instances( + app, needed_instances, new_instance_tags + ) + except Ec2TooManyInstancesError: + await auto_scaling_mode.log_message_from_tasks( + app, + tasks, + "The maximum number of machines in the cluster was reached. Please wait for your running jobs " + "to complete and try again later or contact osparc support if this issue does not resolve.", + level=logging.ERROR, + ) + return [] - instance_tags = auto_scaling_mode.get_ec2_tags(app) results = await asyncio.gather( *[ ec2_client.start_aws_instance( EC2InstanceConfig( type=instance_type, - tags=instance_tags, + tags=new_instance_tags, startup_script=await ec2_startup_script( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ instance_type.name @@ -474,7 +556,7 @@ async def _start_instances( number_of_instances=instance_num, max_number_of_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, ) - for instance_type, instance_num in needed_instances.items() + for instance_type, instance_num in capped_needed_machines.items() ], return_exceptions=True, ) @@ -497,7 +579,10 @@ async def _start_instances( else: new_pending_instances.append(r) - log_message = f"{sum(n for n in needed_instances.values())} new machines launched, it might take up to 3 minutes to start, Please wait..." + log_message = ( + f"{sum(n for n in capped_needed_machines.values())} new machines launched" + ", it might take up to 3 minutes to start, Please wait..." + ) await auto_scaling_mode.log_message_from_tasks( app, tasks, log_message, level=logging.INFO ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index a40dacbb17e..7982672f91d 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -5,6 +5,7 @@ import distributed from aws_library.ec2.models import EC2InstanceData, Resources +from dask_task_models_library.resource_constraints import DaskTaskResources from pydantic import AnyUrl, ByteSize, parse_obj_as from ..core.errors import ( @@ -12,7 +13,7 @@ DaskSchedulerNotFoundError, DaskWorkerNotFoundError, ) -from ..models import AssociatedInstance, DaskTask, DaskTaskId, DaskTaskResources +from ..models import AssociatedInstance, DaskTask, DaskTaskId from ..utils.auto_scaling_core import ( node_host_name_from_ec2_private_dns, node_ip_from_ec2_private_dns, diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py index f7ff8253716..a640ca9b7c8 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py @@ -4,10 +4,11 @@ from typing import Final from aws_library.ec2.models import Resources -from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY +from dask_task_models_library.resource_constraints import ( + get_ec2_instance_type_from_resources, +) from fastapi import FastAPI from servicelib.utils_formatting import timedelta_as_minute_second -from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.settings import get_application_settings from ..models import ( @@ -30,8 +31,11 @@ def get_max_resources_from_dask_task(task: DaskTask) -> Resources: ) -def get_task_instance_restriction(task: DaskTask) -> InstanceTypeType | None: - return task.required_resources.get(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY) +def get_task_instance_restriction(task: DaskTask) -> str | None: + instance_ec2_type: str | None = get_ec2_instance_type_from_resources( + task.required_resources + ) + return instance_ec2_type def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources: diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index 9446bdfb2d2..f204e659cf3 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -10,6 +10,7 @@ import base64 import datetime import logging +from collections import defaultdict from collections.abc import Callable, Iterator from copy import deepcopy from dataclasses import dataclass @@ -19,7 +20,9 @@ import distributed import pytest from aws_library.ec2.models import Resources -from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY +from dask_task_models_library.resource_constraints import ( + create_ec2_resource_constraint_key, +) from faker import Faker from fastapi import FastAPI from models_library.docker import DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY @@ -282,7 +285,7 @@ def _do( } ) if ec2_instance_type is not None: - resources[DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY] = ec2_instance_type + resources[create_ec2_resource_constraint_key(ec2_instance_type)] = 1 return resources return _do @@ -306,6 +309,34 @@ def mock_dask_get_worker_used_resources(mocker: MockerFixture) -> mock.Mock: ) +async def _create_task_with_resources( + ec2_client: EC2Client, + dask_task_imposed_ec2_type: InstanceTypeType | None, + dask_ram: ByteSize | None, + create_dask_task_resources: Callable[..., DaskTaskResources], + create_dask_task: Callable[[DaskTaskResources], distributed.Future], +) -> distributed.Future: + if dask_task_imposed_ec2_type and not dask_ram: + instance_types = await ec2_client.describe_instance_types( + InstanceTypes=[dask_task_imposed_ec2_type] + ) + assert instance_types + assert "InstanceTypes" in instance_types + assert instance_types["InstanceTypes"] + assert "MemoryInfo" in instance_types["InstanceTypes"][0] + assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] + dask_ram = parse_obj_as( + ByteSize, + f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", + ) + dask_task_resources = create_dask_task_resources( + dask_task_imposed_ec2_type, dask_ram + ) + dask_future = create_dask_task(dask_task_resources) + assert dask_future + return dask_future + + @pytest.mark.acceptance_test() @pytest.mark.parametrize( "dask_task_imposed_ec2_type, dask_ram, expected_ec2_type", @@ -362,24 +393,13 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 assert not all_instances["Reservations"] # create a task that needs more power - if dask_task_imposed_ec2_type and not dask_ram: - instance_types = await ec2_client.describe_instance_types( - InstanceTypes=[dask_task_imposed_ec2_type] - ) - assert instance_types - assert "InstanceTypes" in instance_types - assert instance_types["InstanceTypes"] - assert "MemoryInfo" in instance_types["InstanceTypes"][0] - assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] - dask_ram = parse_obj_as( - ByteSize, - f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", - ) - dask_task_resources = create_dask_task_resources( - dask_task_imposed_ec2_type, dask_ram + dask_future = await _create_task_with_resources( + ec2_client, + dask_task_imposed_ec2_type, + dask_ram, + create_dask_task_resources, + create_dask_task, ) - dask_future = create_dask_task(dask_task_resources) - assert dask_future # this should trigger a scaling up as we have no nodes await auto_scale_cluster( @@ -726,6 +746,184 @@ async def test_cluster_scaling_up_starts_multiple_instances( mock_rabbitmq_post_message.reset_mock() +async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and_not_more( + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], + ec2_client: EC2Client, + dask_spec_local_cluster: distributed.SpecCluster, + create_dask_task_resources: Callable[..., DaskTaskResources], + mock_docker_tag_node: mock.Mock, + mock_rabbitmq_post_message: mock.Mock, + mock_docker_find_node_with_name: mock.Mock, + mock_docker_set_node_availability: mock.Mock, + mock_docker_compute_node_used_resources: mock.Mock, + mock_dask_get_worker_has_results_in_memory: mock.Mock, + mock_dask_get_worker_used_resources: mock.Mock, +): + ec2_instance_type = "r5n.8xlarge" + + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES > 0 + num_tasks = 3 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + + # create the tasks + task_futures = await asyncio.gather( + *( + _create_task_with_resources( + ec2_client, + ec2_instance_type, + None, + create_dask_task_resources, + create_dask_task, + ) + for _ in range(num_tasks) + ) + ) + assert all(task_futures) + + # this should trigger a scaling up as we have no nodes + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + instance_type=ec2_instance_type, + instance_state="running", + ) + # as the new node is already running, but is not yet connected, hence not tagged and drained + mock_docker_find_node_with_name.assert_not_called() + mock_docker_tag_node.assert_not_called() + mock_docker_set_node_availability.assert_not_called() + mock_docker_compute_node_used_resources.assert_not_called() + mock_dask_get_worker_has_results_in_memory.assert_not_called() + mock_dask_get_worker_used_resources.assert_not_called() + # check rabbit messages were sent + _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message, + app_settings, + initialized_app, + dask_spec_local_cluster.scheduler_address, + instances_running=0, + instances_pending=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + mock_rabbitmq_post_message.reset_mock() + + # 2. calling this multiple times should do nothing + num_useless_calls = 10 + for _ in range(num_useless_calls): + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + await _assert_ec2_instances( + ec2_client, + num_reservations=1, + num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + instance_type=ec2_instance_type, + instance_state="running", + ) + + +async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_starts_max_instances_and_not_more( + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], + ec2_client: EC2Client, + dask_spec_local_cluster: distributed.SpecCluster, + create_dask_task_resources: Callable[..., DaskTaskResources], + mock_docker_tag_node: mock.Mock, + mock_rabbitmq_post_message: mock.Mock, + mock_docker_find_node_with_name: mock.Mock, + mock_docker_set_node_availability: mock.Mock, + mock_docker_compute_node_used_resources: mock.Mock, + mock_dask_get_worker_has_results_in_memory: mock.Mock, + mock_dask_get_worker_used_resources: mock.Mock, + aws_allowed_ec2_instance_type_names: list[InstanceTypeType], +): + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES > 0 + num_tasks = 3 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + + # create the tasks + task_futures = await asyncio.gather( + *( + _create_task_with_resources( + ec2_client, + ec2_instance_type, + None, + create_dask_task_resources, + create_dask_task, + ) + for ec2_instance_type in aws_allowed_ec2_instance_type_names + for _ in range(num_tasks) + ) + ) + assert all(task_futures) + + # this should trigger a scaling up as we have no nodes + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + + # one of each type is created with some that will have 2 instances + all_instances = await ec2_client.describe_instances() + assert len(all_instances["Reservations"]) == len( + aws_allowed_ec2_instance_type_names + ) + instances_found = defaultdict(int) + for reservation in all_instances["Reservations"]: + assert "Instances" in reservation + for instance in reservation["Instances"]: + assert "InstanceType" in instance + instance_type = instance["InstanceType"] + instances_found[instance_type] += 1 + + assert sorted(instances_found.keys()) == sorted(aws_allowed_ec2_instance_type_names) + assert ( + sum(instances_found.values()) + == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ) + + # as the new node is already running, but is not yet connected, hence not tagged and drained + mock_docker_find_node_with_name.assert_not_called() + mock_docker_tag_node.assert_not_called() + mock_docker_set_node_availability.assert_not_called() + mock_docker_compute_node_used_resources.assert_not_called() + mock_dask_get_worker_has_results_in_memory.assert_not_called() + mock_dask_get_worker_used_resources.assert_not_called() + # check rabbit messages were sent + _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message, + app_settings, + initialized_app, + dask_spec_local_cluster.scheduler_address, + instances_running=0, + instances_pending=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + mock_rabbitmq_post_message.reset_mock() + + # 2. calling this multiple times should do nothing + num_useless_calls = 10 + for _ in range(num_useless_calls): + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + all_instances = await ec2_client.describe_instances() + assert len(all_instances["Reservations"]) == len( + aws_allowed_ec2_instance_type_names + ) + + @pytest.fixture def fake_associated_host_instance( host_node: DockerNode, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 829cd6f0a4f..a2a3ed99780 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -19,7 +19,6 @@ from typing import Any import distributed -from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY from dask_task_models_library.container_tasks.docker import DockerBasicAuth from dask_task_models_library.container_tasks.errors import TaskCancelledError from dask_task_models_library.container_tasks.io import ( @@ -31,6 +30,9 @@ ContainerTaskParameters, LogFileUploadURL, ) +from dask_task_models_library.resource_constraints import ( + create_ec2_resource_constraint_key, +) from distributed.scheduler import TaskStateState as DaskSchedulerTaskState from fastapi import FastAPI from models_library.api_schemas_directorv2.clusters import ClusterDetails, Scheduler @@ -228,7 +230,9 @@ def _comp_sidecar_fct( ) if hardware_info.aws_ec2_instances: dask_resources[ - f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{hardware_info.aws_ec2_instances[0]}" + create_ec2_resource_constraint_key( + hardware_info.aws_ec2_instances[0] + ) ] = 1 dask_utils.check_scheduler_is_still_the_same(