Skip to content

Commit

Permalink
🐛Bugfix/autoscaling does not scale above limit (ITISFoundation#5129)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Dec 5, 2023
1 parent 7c30bc0 commit ec12cfc
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Any, TypeAlias

from .constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY

DaskTaskResources: TypeAlias = dict[str, Any]


def create_ec2_resource_constraint_key(ec2_instance_type: str) -> str:
return f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{ec2_instance_type}"


def get_ec2_instance_type_from_resources(
task_resources: DaskTaskResources,
) -> str | None:
for resource_name in task_resources:
if resource_name.startswith(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY):
return resource_name.split(":")[-1]
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from dask_task_models_library.resource_constraints import (
create_ec2_resource_constraint_key,
get_ec2_instance_type_from_resources,
)
from faker import Faker


def test_create_ec2_resource_constraint_key(faker: Faker):
faker_instance_type = faker.pystr()
assert (
create_ec2_resource_constraint_key(faker_instance_type)
== f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{faker_instance_type}"
)

empty_instance_type = ""
assert (
create_ec2_resource_constraint_key(empty_instance_type)
== f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:"
)


def test_get_ec2_instance_type_from_resources(faker: Faker):
empty_task_resources = {}
assert get_ec2_instance_type_from_resources(empty_task_resources) is None
no_ec2_types_in_resources = {"blahblah": 1}
assert get_ec2_instance_type_from_resources(no_ec2_types_in_resources) is None

faker_instance_type = faker.pystr()
ec2_type_in_resources = {create_ec2_resource_constraint_key(faker_instance_type): 1}
assert (
get_ec2_instance_type_from_resources(ec2_type_in_resources)
== faker_instance_type
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass, field
from typing import Any, TypeAlias
from typing import TypeAlias

from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from dask_task_models_library.resource_constraints import DaskTaskResources
from models_library.generated_models.docker_rest_api import Node


Expand Down Expand Up @@ -55,7 +56,6 @@ class Cluster:


DaskTaskId: TypeAlias = str
DaskTaskResources: TypeAlias = dict[str, Any]


@dataclass(frozen=True, kw_only=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
EC2InstanceConfig,
EC2InstanceData,
EC2InstanceType,
EC2Tags,
Resources,
)
from fastapi import FastAPI
Expand Down Expand Up @@ -441,6 +442,73 @@ async def _find_needed_instances(
return num_instances_per_type


async def _cap_needed_instances(
app: FastAPI, needed_instances: dict[EC2InstanceType, int], ec2_tags: EC2Tags
) -> dict[EC2InstanceType, int]:
"""caps the needed instances dict[EC2InstanceType, int] to the maximal allowed number of instances by
1. limiting to 1 per asked type
2. increasing each by 1 until the maximum allowed number of instances is reached
NOTE: the maximum allowed number of instances contains the current number of running/pending machines
Raises:
Ec2TooManyInstancesError: raised when the maximum of machines is already running/pending
"""
ec2_client = get_ec2_client(app)
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
current_instances = await ec2_client.get_instances(
key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME],
tags=ec2_tags,
)
current_number_of_instances = len(current_instances)
if (
current_number_of_instances
>= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
):
# ok that is already too much
raise Ec2TooManyInstancesError(
num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
)

total_number_of_needed_instances = sum(needed_instances.values())
if (
current_number_of_instances + total_number_of_needed_instances
<= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
):
# ok that fits no need to do anything here
return needed_instances

# this is asking for too many, so let's cap them
max_number_of_creatable_instances = (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES
- current_number_of_instances
)

# we start with 1 machine of each type until the max
capped_needed_instances = {
k: 1
for count, k in enumerate(needed_instances)
if (count + 1) <= max_number_of_creatable_instances
}

if len(capped_needed_instances) < len(needed_instances):
# there were too many types for the number of possible instances
return capped_needed_instances

# all instance types were added, now create more of them if possible
while sum(capped_needed_instances.values()) < max_number_of_creatable_instances:
for instance_type, num_to_create in needed_instances.items():
if (
sum(capped_needed_instances.values())
== max_number_of_creatable_instances
):
break
if num_to_create > capped_needed_instances[instance_type]:
capped_needed_instances[instance_type] += 1

return capped_needed_instances


async def _start_instances(
app: FastAPI,
needed_instances: dict[EC2InstanceType, int],
Expand All @@ -450,14 +518,28 @@ async def _start_instances(
ec2_client = get_ec2_client(app)
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
new_instance_tags = auto_scaling_mode.get_ec2_tags(app)
capped_needed_machines = {}
try:
capped_needed_machines = await _cap_needed_instances(
app, needed_instances, new_instance_tags
)
except Ec2TooManyInstancesError:
await auto_scaling_mode.log_message_from_tasks(
app,
tasks,
"The maximum number of machines in the cluster was reached. Please wait for your running jobs "
"to complete and try again later or contact osparc support if this issue does not resolve.",
level=logging.ERROR,
)
return []

instance_tags = auto_scaling_mode.get_ec2_tags(app)
results = await asyncio.gather(
*[
ec2_client.start_aws_instance(
EC2InstanceConfig(
type=instance_type,
tags=instance_tags,
tags=new_instance_tags,
startup_script=await ec2_startup_script(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
instance_type.name
Expand All @@ -474,7 +556,7 @@ async def _start_instances(
number_of_instances=instance_num,
max_number_of_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES,
)
for instance_type, instance_num in needed_instances.items()
for instance_type, instance_num in capped_needed_machines.items()
],
return_exceptions=True,
)
Expand All @@ -497,7 +579,10 @@ async def _start_instances(
else:
new_pending_instances.append(r)

log_message = f"{sum(n for n in needed_instances.values())} new machines launched, it might take up to 3 minutes to start, Please wait..."
log_message = (
f"{sum(n for n in capped_needed_machines.values())} new machines launched"
", it might take up to 3 minutes to start, Please wait..."
)
await auto_scaling_mode.log_message_from_tasks(
app, tasks, log_message, level=logging.INFO
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

import distributed
from aws_library.ec2.models import EC2InstanceData, Resources
from dask_task_models_library.resource_constraints import DaskTaskResources
from pydantic import AnyUrl, ByteSize, parse_obj_as

from ..core.errors import (
DaskNoWorkersError,
DaskSchedulerNotFoundError,
DaskWorkerNotFoundError,
)
from ..models import AssociatedInstance, DaskTask, DaskTaskId, DaskTaskResources
from ..models import AssociatedInstance, DaskTask, DaskTaskId
from ..utils.auto_scaling_core import (
node_host_name_from_ec2_private_dns,
node_ip_from_ec2_private_dns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from typing import Final

from aws_library.ec2.models import Resources
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from dask_task_models_library.resource_constraints import (
get_ec2_instance_type_from_resources,
)
from fastapi import FastAPI
from servicelib.utils_formatting import timedelta_as_minute_second
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.settings import get_application_settings
from ..models import (
Expand All @@ -30,8 +31,11 @@ def get_max_resources_from_dask_task(task: DaskTask) -> Resources:
)


def get_task_instance_restriction(task: DaskTask) -> InstanceTypeType | None:
return task.required_resources.get(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY)
def get_task_instance_restriction(task: DaskTask) -> str | None:
instance_ec2_type: str | None = get_ec2_instance_type_from_resources(
task.required_resources
)
return instance_ec2_type


def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources:
Expand Down
Loading

0 comments on commit ec12cfc

Please sign in to comment.