Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Autoscaling: Warm buffers do not replace hot buffers #6962

10 changes: 5 additions & 5 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ flag_management:
statuses:
- type: project
target: auto
threshold: 1%
threshold: 2%
- type: patch
target: auto
threshold: 1%
threshold: 2%


component_management:
Expand All @@ -22,7 +22,7 @@ component_management:
statuses:
- type: project
target: auto
threshold: 1%
threshold: 2%
branches:
- "!master"
individual_components:
Expand Down Expand Up @@ -116,12 +116,12 @@ coverage:
project:
default:
informational: true
threshold: 1%
threshold: 2%

patch:
default:
informational: true
threshold: 1%
threshold: 2%

comment:
layout: "header,diff,flags,components,footer"
Expand Down
24 changes: 12 additions & 12 deletions .github/workflows/ci-testing-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ jobs:
if: ${{ !cancelled() }}
run: ./ci/github/unit-testing/catalog.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -879,7 +879,7 @@ jobs:
if: ${{ !cancelled() }}
run: ./ci/github/unit-testing/datcore-adapter.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -930,7 +930,7 @@ jobs:
if: ${{ !cancelled() }}
run: ./ci/github/unit-testing/director.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -981,7 +981,7 @@ jobs:
if: ${{ !cancelled() }}
run: ./ci/github/unit-testing/director-v2.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -1910,7 +1910,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/webserver.bash test 01
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -1974,7 +1974,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/webserver.bash test 02
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2038,7 +2038,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/director-v2.bash test 01
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2111,7 +2111,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/director-v2.bash test 02
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2177,7 +2177,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/dynamic-sidecar.bash test 01
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2241,7 +2241,7 @@ jobs:
- name: test
run: ./ci/github/integration-testing/simcore-sdk.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2330,7 +2330,7 @@ jobs:
- name: test
run: ./ci/github/system-testing/public-api.bash test
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down Expand Up @@ -2395,7 +2395,7 @@ jobs:
name: ${{ github.job }}_services_settings_schemas
path: ./services/**/settings-schema.json
- name: upload failed tests logs
if: ${{ !cancelled() }}
if: ${{ failure() }}
uses: actions/upload-artifact@v4
with:
name: ${{ github.job }}_docker_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ async def assert_autoscaled_dynamic_ec2_instances(
expected_instance_state: InstanceStateNameType,
expected_additional_tag_keys: list[str],
instance_filters: Sequence[FilterTypeDef] | None,
expected_user_data: list[str] | None = None,
) -> list[InstanceTypeDef]:
if expected_user_data is None:
expected_user_data = ["docker swarm join"]
return await assert_ec2_instances(
ec2_client,
expected_num_reservations=expected_num_reservations,
Expand All @@ -54,7 +57,7 @@ async def assert_autoscaled_dynamic_ec2_instances(
"io.simcore.autoscaling.monitored_services_labels",
*expected_additional_tag_keys,
],
expected_user_data=["docker swarm join"],
expected_user_data=expected_user_data,
instance_filters=instance_filters,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,43 @@ async def _activate_drained_nodes(
)


async def _start_buffer_instances(
async def _start_warm_buffer_instances(
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
) -> Cluster:
"""starts warm buffer if there are assigned tasks, or if a hot buffer of the same type is needed"""

app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

instances_to_start = [
i.ec2_instance for i in cluster.buffer_ec2s if i.assigned_tasks
]

if (
len(cluster.buffer_drained_nodes)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
):
# check if we can migrate warm buffers to hot buffers
hot_buffer_instance_type = cast(
InstanceTypeType,
next(
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)
),
)
free_startable_warm_buffers_to_replace_hot_buffers = [
warm_buffer.ec2_instance
for warm_buffer in cluster.buffer_ec2s
if (warm_buffer.ec2_instance.type == hot_buffer_instance_type)
and not warm_buffer.assigned_tasks
]
instances_to_start += free_startable_warm_buffers_to_replace_hot_buffers[
: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
- len(cluster.buffer_drained_nodes)
]

if not instances_to_start:
return cluster
# change the buffer machine to an active one

with log_context(
_logger, logging.INFO, f"start {len(instances_to_start)} buffer machines"
):
Expand Down Expand Up @@ -1187,8 +1215,8 @@ async def _autoscale_cluster(
# 2. activate available drained nodes to cover some of the tasks
cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode)

# 3. start buffer instances to cover the remaining tasks
cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode)
# 3. start warm buffer instances to cover the remaining tasks
cluster = await _start_warm_buffer_instances(app, cluster, auto_scaling_mode)

# 4. scale down unused instances
cluster = await _scale_down_unused_cluster_instances(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _list_processing_tasks_on_worker(
async with _scheduler_client(scheduler_url, authentication) as client:
worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance)

_logger.debug("looking for processing tasksfor %s", f"{worker_url=}")
_logger.debug("looking for processing tasks for %s", f"{worker_url=}")

# now get the used resources
worker_processing_tasks: list[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,14 @@ async def tag_node(
tags: dict[DockerLabelKey, str],
available: bool,
) -> Node:
assert node.spec # nosec
if (node.spec.labels == tags) and (
(node.spec.availability is Availability.active) == available
):
# nothing to do
return node
with log_context(
logger, logging.DEBUG, msg=f"tagging {node.id=} with {tags=} and {available=}"
logger, logging.DEBUG, msg=f"tag {node.id=} with {tags=} and {available=}"
):
assert node.id # nosec

Expand Down
Loading
Loading