Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gcp_sqs_config): adds support for min and max replicas #891

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion zetta_utils/cloud_management/resource_allocation/k8s/keda.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,22 @@ def scaled_job_ctx_mngr(
cluster_info: ClusterInfo,
job_spec: k8s_client.V1JobSpec,
secrets: list[k8s_client.V1Secret],
max_replicas: int,
replicas: int,
sqs_trigger_name: str,
queue: SQSQueue,
max_replicas: int = 0,
namespace: str | None = "default",
):
if max_replicas == 0:
max_replicas = replicas
replicas = 0
configuration, _ = get_cluster_data(cluster_info)
with secrets_ctx_mngr(run_id, secrets, cluster_info, namespace=namespace):
manifest = _get_scaled_job_manifest(
f"{run_id}-{group_name}",
[_get_sqs_trigger(sqs_trigger_name, queue)],
job_spec=job_spec,
min_replicas=replicas,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the deployment start with min replicas or max replicas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we would ever want a deployment that cannot scale to 0, so I would guess min replicas should always be 0

max_replicas=max_replicas,
)
so_name = manifest["metadata"]["name"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,22 @@ def scaled_job_ctx_mngr(
cluster_info: ClusterInfo,
job_spec: k8s_client.V1JobSpec,
secrets: list[k8s_client.V1Secret],
max_replicas: int,
replicas: int,
queue: SQSQueue,
max_replicas: int = 0,
namespace: str | None = "default",
):
if max_replicas == 0:
max_replicas = replicas
replicas = 0
configuration, _ = get_cluster_data(cluster_info)
with secrets_ctx_mngr(run_id, secrets, cluster_info, namespace=namespace):
with sqs_trigger_ctx_mngr(run_id, cluster_info, namespace) as trigger_name:
manifest = _get_scaled_job_manifest(
run_id,
[_get_sqs_trigger(trigger_name, queue)],
job_spec=job_spec,
min_replicas=replicas,
max_replicas=max_replicas,
)
so_name = manifest["metadata"]["name"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_gcp_with_sqs_config(
semaphores_spec: dict[SemaphoreType, int] | None = None,
provisioning_model: Literal["standard", "spot"] = "spot",
idle_worker_timeout: int = 300,
max_worker_replicas: int = 0,
) -> tuple[PushMessageQueue[Task], PullMessageQueue[OutcomeReport], list[AbstractContextManager]]:
work_queue_name = f"run-{execution_id}-work"
outcome_queue_name = f"run-{execution_id}-outcome"
Expand Down Expand Up @@ -114,7 +115,8 @@ def get_gcp_with_sqs_config(
cluster_info=worker_cluster,
job_spec=job_spec,
secrets=secrets,
max_replicas=worker_replicas,
replicas=worker_replicas,
max_replicas=max_worker_replicas,
queue=task_queue,
)
ctx_managers.append(scaled_job_ctx_mngr)
Expand Down Expand Up @@ -171,6 +173,7 @@ def execute_on_gcp_with_sqs( # pylint: disable=too-many-locals
provisioning_model: Literal["standard", "spot"] = "spot",
sqs_based_scaling: bool = True,
idle_worker_timeout: int = 300,
max_worker_replicas: int = 0,
):
if debug and not local_test:
raise ValueError("`debug` can only be set to `True` when `local_test` is also `True`.")
Expand Down Expand Up @@ -240,6 +243,7 @@ def execute_on_gcp_with_sqs( # pylint: disable=too-many-locals
provisioning_model=provisioning_model,
sqs_based_scaling=sqs_based_scaling,
idle_worker_timeout=idle_worker_timeout,
max_worker_replicas=max_worker_replicas,
)

with ExitStack() as stack:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def _ensure_required_env_vars():
class WorkerGroup:
replicas: int
resource_limits: dict[str, int | float | str]
max_replicas: int = 0
queue_tags: list[str] | None = None
num_procs: int = 1
sqs_based_scaling: bool = True
Expand All @@ -75,6 +76,7 @@ class WorkerGroup:
class WorkerGroupDict(TypedDict, total=False):
replicas: int
resource_limits: dict[str, int | float | str]
max_replicas: NotRequired[int]
queue_tags: NotRequired[list[str]]
num_procs: NotRequired[int]
sqs_based_scaling: NotRequired[bool]
Expand Down Expand Up @@ -134,7 +136,8 @@ def _get_group_taskqueue_and_contexts(
job_spec=job_spec,
secrets=[],
sqs_trigger_name=sqs_trigger_name,
max_replicas=group.replicas,
replicas=group.replicas,
max_replicas=group.max_replicas,
queue=task_queue,
)
ctx_managers.append(scaled_job_ctx_mngr)
Expand Down
Loading