From 5da7a719b6019c3a6a6e5598ad2b96d28948d888 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Sat, 13 Apr 2024 17:42:31 -0400 Subject: [PATCH 1/3] Add a function to remove unfinished tasks in a queue Do not require the workers to finish themselves, mark the tasks as success to proceed into the next step. This is faster and more reliable --- dags/dag_utils.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/dags/dag_utils.py b/dags/dag_utils.py index 2589d8be..e2e51f5b 100644 --- a/dags/dag_utils.py +++ b/dags/dag_utils.py @@ -1,3 +1,6 @@ +from airflow.utils.db import provide_session + + def check_manager_node(ntasks): import psutil import humanize @@ -60,3 +63,18 @@ def get_connection(conn, default_var=None): return default_var return ig_conn + + +@provide_session +def query_task_instances(queue, session): + from airflow import models + from airflow.utils.state import State + TI = models.TaskInstance + return session.query(TI).filter(TI.queue == queue).filter(TI.state.in_(State.unfinished)).all() + + +def remove_workers(queue): + from airflow.utils.state import State + tis = query_task_instances(queue=queue) + for ti in tis: + ti.set_state(State.SUCCESS) From 9cdee0a4d7c9cdd776e03a2ae5e1ee5c4d742c20 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Sat, 13 Apr 2024 17:45:21 -0400 Subject: [PATCH 2/3] Use the common remove_workers function Replacing the chunkflow dag specific hack --- dags/chunkflow_dag.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/dags/chunkflow_dag.py b/dags/chunkflow_dag.py index dd3afe34..1a39e384 100644 --- a/dags/chunkflow_dag.py +++ b/dags/chunkflow_dag.py @@ -12,7 +12,7 @@ from helper_ops import placeholder_op, mark_done_op, scale_up_cluster_op, scale_down_cluster_op, setup_redis_op, collect_metrics_op -from dag_utils import estimate_worker_instances +from dag_utils import estimate_worker_instances, remove_workers from cloudvolume import CloudVolume from cloudvolume.lib import Bbox @@ -381,14 +381,6 @@ def setup_env_op(dag, param, queue): dag=dag ) -def remove_workers(): - from time import sleep - param = Variable.get("inference_param", deserialize_json=True) - if param["TASK_NUM"] > 1: - param["TASK_NUM"] = 1 - Variable.set("inference_param", param, serialize_json=True) - sleep(60) - def inference_op(dag, param, queue, wid): from airflow import configuration as conf @@ -539,6 +531,7 @@ def process_output(**kwargs): remove_workers_op = PythonOperator( task_id="remove_extra_workers", python_callable=remove_workers, + op_args=("gpu",), priority_weight=100000, weight_rule=WeightRule.ABSOLUTE, queue="manager", From 8f1954d0fa826e9e444951f880e0451afe440176 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Sat, 13 Apr 2024 17:46:41 -0400 Subject: [PATCH 3/3] Replace self_destruct signals with remove_workers call --- dags/synaptor_dags.py | 10 ++++------ dags/synaptor_ops.py | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/dags/synaptor_dags.py b/dags/synaptor_dags.py index 3c09bad0..6719fa0d 100644 --- a/dags/synaptor_dags.py +++ b/dags/synaptor_dags.py @@ -8,7 +8,7 @@ from helper_ops import scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op from param_default import synaptor_param_default, default_synaptor_image -from synaptor_ops import manager_op, drain_op +from synaptor_ops import manager_op, drain_op, self_destruct_op from synaptor_ops import synaptor_op, wait_op, generate_op, nglink_op @@ -217,12 +217,10 @@ def add_task( """Adds a processing step to a DAG.""" if task.name == "self_destruct": - cluster_size = 1 if tag.startswith("synaptor-seggraph") else MAX_CLUSTER_SIZE - extra_args = {"workercount": str(cluster_size)} + cluster_key = cluster_key_from_tag(tag) + generate = self_destruct_op(dag, queue=cluster_key, tag=tag) else: - extra_args = None - - generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, extra_args=extra_args, tag=tag) + generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, tag=tag) if tag: wait = wait_op(dag, f"{task.name}_{tag}") diff --git a/dags/synaptor_ops.py b/dags/synaptor_ops.py index 414ff971..eadc784b 100644 --- a/dags/synaptor_ops.py +++ b/dags/synaptor_ops.py @@ -14,6 +14,7 @@ from slack_message import task_failure_alert, task_retry_alert, task_done_alert, slack_message from nglinks import ImageLayer, SegLayer, generate_ng_payload, wrap_payload from kombu_helper import drain_messages +from dag_utils import remove_workers from airflow import configuration as conf @@ -326,3 +327,17 @@ def wait_op(dag: DAG, taskname: str) -> PythonOperator: queue="manager", dag=dag, ) + + +def self_destruct_op(dag: DAG, queue: str, tag: str) -> PythonOperator: + """Remove workers after finishing tasks.""" + return PythonOperator( + task_id=f"self_destruct_for_queue_{tag}", + python_callable=remove_workers, + op_args=(queue,), + priority_weight=100_000, + weight_rule=WeightRule.ABSOLUTE, + on_success_callback=task_done_alert, + queue="manager", + dag=dag, + )