Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Autoscaling: Warm buffers do not replace hot buffers #6962

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ async def assert_autoscaled_dynamic_ec2_instances(
expected_instance_state: InstanceStateNameType,
expected_additional_tag_keys: list[str],
instance_filters: Sequence[FilterTypeDef] | None,
expected_user_data: list[str] | None = None,
) -> list[InstanceTypeDef]:
if expected_user_data is None:
expected_user_data = ["docker swarm join"]
return await assert_ec2_instances(
ec2_client,
expected_num_reservations=expected_num_reservations,
Expand All @@ -54,7 +57,7 @@ async def assert_autoscaled_dynamic_ec2_instances(
"io.simcore.autoscaling.monitored_services_labels",
*expected_additional_tag_keys,
],
expected_user_data=["docker swarm join"],
expected_user_data=expected_user_data,
instance_filters=instance_filters,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,43 @@ async def _activate_drained_nodes(
)


async def _start_buffer_instances(
async def _start_warm_buffer_instances(
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
) -> Cluster:
"""starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed"""

app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

instances_to_start = [
i.ec2_instance for i in cluster.buffer_ec2s if i.assigned_tasks
]

if (
len(cluster.buffer_drained_nodes)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
):
# check if we can migrate warm buffers to hot buffers
hot_buffer_instance_type = cast(
InstanceTypeType,
next(
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)
),
)
free_starteable_warm_buffers_to_replace_hot_buffers = [
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
warm_buffer.ec2_instance
for warm_buffer in cluster.buffer_ec2s
if (warm_buffer.ec2_instance.type == hot_buffer_instance_type)
and not warm_buffer.assigned_tasks
]
instances_to_start += free_starteable_warm_buffers_to_replace_hot_buffers[
: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
- len(cluster.buffer_drained_nodes)
]

if not instances_to_start:
return cluster
# change the buffer machine to an active one

with log_context(
_logger, logging.INFO, f"start {len(instances_to_start)} buffer machines"
):
Expand Down Expand Up @@ -1187,8 +1215,8 @@ async def _autoscale_cluster(
# 2. activate available drained nodes to cover some of the tasks
cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode)

# 3. start buffer instances to cover the remaining tasks
cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode)
# 3. start warm buffer instances to cover the remaining tasks
cluster = await _start_warm_buffer_instances(app, cluster, auto_scaling_mode)

# 4. scale down unused instances
cluster = await _scale_down_unused_cluster_instances(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _list_processing_tasks_on_worker(
async with _scheduler_client(scheduler_url, authentication) as client:
worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance)

_logger.debug("looking for processing tasksfor %s", f"{worker_url=}")
_logger.debug("looking for processing tasks for %s", f"{worker_url=}")

# now get the used resources
worker_processing_tasks: list[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,14 @@ async def tag_node(
tags: dict[DockerLabelKey, str],
available: bool,
) -> Node:
assert node.spec # nosec
if (node.spec.labels == tags) and (
(node.spec.availability is Availability.active) == available
):
# nothing to do
return node
with log_context(
logger, logging.DEBUG, msg=f"tagging {node.id=} with {tags=} and {available=}"
logger, logging.DEBUG, msg=f"tag {node.id=} with {tags=} and {available=}"
):
assert node.id # nosec

Expand Down
202 changes: 195 additions & 7 deletions services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@
EC2InstanceType,
Resources,
)
from common_library.json_serialization import json_dumps
from deepdiff import DeepDiff
from faker import Faker
from fakeredis.aioredis import FakeRedis
from fastapi import FastAPI
from models_library.docker import DockerLabelKey, StandardSimcoreDockerLabels
from models_library.docker import (
DockerGenericTag,
DockerLabelKey,
StandardSimcoreDockerLabels,
)
from models_library.generated_models.docker_rest_api import Availability
from models_library.generated_models.docker_rest_api import Node as DockerNode
from models_library.generated_models.docker_rest_api import (
Expand All @@ -45,7 +50,7 @@
Service,
TaskSpec,
)
from pydantic import ByteSize, PositiveInt, TypeAdapter
from pydantic import ByteSize, NonNegativeInt, PositiveInt, TypeAdapter
from pytest_mock import MockType
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.host import get_localhost_ip
Expand All @@ -57,6 +62,7 @@
)
from settings_library.rabbit import RabbitSettings
from settings_library.ssm import SSMSettings
from simcore_service_autoscaling.constants import PRE_PULLED_IMAGES_EC2_TAG_KEY
from simcore_service_autoscaling.core.application import create_app
from simcore_service_autoscaling.core.settings import (
AUTOSCALING_ENV_PREFIX,
Expand All @@ -71,8 +77,14 @@
DaskTaskResources,
)
from simcore_service_autoscaling.modules import auto_scaling_core
from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import (
DynamicAutoscaling,
)
from simcore_service_autoscaling.modules.docker import AutoscalingDocker
from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API
from simcore_service_autoscaling.utils.buffer_machines_pool_core import (
get_deactivated_buffer_ec2_tags,
)
from simcore_service_autoscaling.utils.utils_docker import (
_OSPARC_SERVICE_READY_LABEL_KEY,
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY,
Expand All @@ -81,7 +93,9 @@
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed
from types_aiobotocore_ec2.literals import InstanceTypeType
from types_aiobotocore_ec2 import EC2Client
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType
from types_aiobotocore_ec2.type_defs import TagTypeDef

pytest_plugins = [
"pytest_simcore.aws_server",
Expand Down Expand Up @@ -991,10 +1005,22 @@ def _creator(


@pytest.fixture
def mock_machines_buffer(monkeypatch: pytest.MonkeyPatch) -> int:
num_machines_in_buffer = 5
monkeypatch.setenv("EC2_INSTANCES_MACHINES_BUFFER", f"{num_machines_in_buffer}")
return num_machines_in_buffer
def num_hot_buffer() -> NonNegativeInt:
return 5


@pytest.fixture
def with_instances_machines_hot_buffer(
num_hot_buffer: int,
app_environment: EnvVarsDict,
monkeypatch: pytest.MonkeyPatch,
) -> EnvVarsDict:
return app_environment | setenvs_from_dict(
monkeypatch,
{
"EC2_INSTANCES_MACHINES_BUFFER": f"{num_hot_buffer}",
},
)


@pytest.fixture
Expand Down Expand Up @@ -1042,3 +1068,165 @@ async def _(
autospec=True,
side_effect=_,
)


@pytest.fixture
def fake_pre_pull_images() -> list[DockerGenericTag]:
return TypeAdapter(list[DockerGenericTag]).validate_python(
[
"nginx:latest",
"itisfoundation/my-very-nice-service:latest",
"simcore/services/dynamic/another-nice-one:2.4.5",
"asd",
]
)


@pytest.fixture
def ec2_instances_allowed_types_with_only_1_buffered(
faker: Faker,
fake_pre_pull_images: list[DockerGenericTag],
external_ec2_instances_allowed_types: None | dict[str, EC2InstanceBootSpecific],
) -> dict[InstanceTypeType, EC2InstanceBootSpecific]:
if not external_ec2_instances_allowed_types:
return {
"t2.micro": EC2InstanceBootSpecific(
ami_id=faker.pystr(),
pre_pull_images=fake_pre_pull_images,
buffer_count=faker.pyint(min_value=1, max_value=10),
)
}

allowed_ec2_types = external_ec2_instances_allowed_types
allowed_ec2_types_with_buffer_defined = dict(
filter(
lambda instance_type_and_settings: instance_type_and_settings[
1
].buffer_count
> 0,
allowed_ec2_types.items(),
)
)
assert (
allowed_ec2_types_with_buffer_defined
), "one type with buffer is needed for the tests!"
assert (
len(allowed_ec2_types_with_buffer_defined) == 1
), "more than one type with buffer is disallowed in this test!"
return {
TypeAdapter(InstanceTypeType).validate_python(k): v
for k, v in allowed_ec2_types_with_buffer_defined.items()
}


@pytest.fixture
def buffer_count(
ec2_instances_allowed_types_with_only_1_buffered: dict[
InstanceTypeType, EC2InstanceBootSpecific
],
) -> int:
def _by_buffer_count(
instance_type_and_settings: tuple[InstanceTypeType, EC2InstanceBootSpecific]
) -> bool:
_, boot_specific = instance_type_and_settings
return boot_specific.buffer_count > 0

allowed_ec2_types = ec2_instances_allowed_types_with_only_1_buffered
allowed_ec2_types_with_buffer_defined = dict(
filter(_by_buffer_count, allowed_ec2_types.items())
)
assert allowed_ec2_types_with_buffer_defined, "you need one type with buffer"
assert (
len(allowed_ec2_types_with_buffer_defined) == 1
), "more than one type with buffer is disallowed in this test!"
return next(iter(allowed_ec2_types_with_buffer_defined.values())).buffer_count


@pytest.fixture
async def create_buffer_machines(
ec2_client: EC2Client,
aws_ami_id: str,
app_settings: ApplicationSettings,
initialized_app: FastAPI,
) -> Callable[
[int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag] | None],
Awaitable[list[str]],
]:
async def _do(
num: int,
instance_type: InstanceTypeType,
instance_state_name: InstanceStateNameType,
pre_pull_images: list[DockerGenericTag] | None,
) -> list[str]:
assert app_settings.AUTOSCALING_EC2_INSTANCES

assert instance_state_name in [
"running",
"stopped",
], "only 'running' and 'stopped' are supported for testing"

resource_tags: list[TagTypeDef] = [
{"Key": tag_key, "Value": tag_value}
for tag_key, tag_value in get_deactivated_buffer_ec2_tags(
initialized_app, DynamicAutoscaling()
).items()
]
if pre_pull_images is not None and instance_state_name == "stopped":
resource_tags.append(
{
"Key": PRE_PULLED_IMAGES_EC2_TAG_KEY,
"Value": f"{json_dumps(pre_pull_images)}",
}
)
with log_context(
logging.INFO, f"creating {num} buffer machines of {instance_type}"
):
instances = await ec2_client.run_instances(
ImageId=aws_ami_id,
MaxCount=num,
MinCount=num,
InstanceType=instance_type,
KeyName=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME,
SecurityGroupIds=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SECURITY_GROUP_IDS,
SubnetId=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_SUBNET_ID,
IamInstanceProfile={
"Arn": app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ATTACHED_IAM_PROFILE
},
TagSpecifications=[
{"ResourceType": "instance", "Tags": resource_tags},
{"ResourceType": "volume", "Tags": resource_tags},
{"ResourceType": "network-interface", "Tags": resource_tags},
],
UserData="echo 'I am pytest'",
)
instance_ids = [
i["InstanceId"] for i in instances["Instances"] if "InstanceId" in i
]

waiter = ec2_client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=instance_ids)
instances = await ec2_client.describe_instances(InstanceIds=instance_ids)
assert "Reservations" in instances
assert instances["Reservations"]
assert "Instances" in instances["Reservations"][0]
assert len(instances["Reservations"][0]["Instances"]) == num
for instance in instances["Reservations"][0]["Instances"]:
assert "State" in instance
assert "Name" in instance["State"]
assert instance["State"]["Name"] == "running"

if instance_state_name == "stopped":
await ec2_client.stop_instances(InstanceIds=instance_ids)
instances = await ec2_client.describe_instances(InstanceIds=instance_ids)
assert "Reservations" in instances
assert instances["Reservations"]
assert "Instances" in instances["Reservations"][0]
assert len(instances["Reservations"][0]["Instances"]) == num
for instance in instances["Reservations"][0]["Instances"]:
assert "State" in instance
assert "Name" in instance["State"]
assert instance["State"]["Name"] == "stopped"

return instance_ids

return _do
Loading
Loading