Skip to content

Commit

Permalink
Merge pull request #87 from seung-lab/remove_workers
Browse files Browse the repository at this point in the history
Accelerate chunkflow and synaptor dags by removing idle workers after task queues are empty.
  • Loading branch information
ranlu authored Apr 13, 2024
2 parents 83211e2 + 8f1954d commit 2b783c5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 15 deletions.
11 changes: 2 additions & 9 deletions dags/chunkflow_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from helper_ops import placeholder_op, mark_done_op, scale_up_cluster_op, scale_down_cluster_op, setup_redis_op, collect_metrics_op

from dag_utils import estimate_worker_instances
from dag_utils import estimate_worker_instances, remove_workers

from cloudvolume import CloudVolume
from cloudvolume.lib import Bbox
Expand Down Expand Up @@ -381,14 +381,6 @@ def setup_env_op(dag, param, queue):
dag=dag
)

def remove_workers():
from time import sleep
param = Variable.get("inference_param", deserialize_json=True)
if param["TASK_NUM"] > 1:
param["TASK_NUM"] = 1
Variable.set("inference_param", param, serialize_json=True)
sleep(60)


def inference_op(dag, param, queue, wid):
from airflow import configuration as conf
Expand Down Expand Up @@ -539,6 +531,7 @@ def process_output(**kwargs):
remove_workers_op = PythonOperator(
task_id="remove_extra_workers",
python_callable=remove_workers,
op_args=("gpu",),
priority_weight=100000,
weight_rule=WeightRule.ABSOLUTE,
queue="manager",
Expand Down
18 changes: 18 additions & 0 deletions dags/dag_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from airflow.utils.db import provide_session


def check_manager_node(ntasks):
import psutil
import humanize
Expand Down Expand Up @@ -60,3 +63,18 @@ def get_connection(conn, default_var=None):
return default_var

return ig_conn


@provide_session
def query_task_instances(queue, session):
from airflow import models
from airflow.utils.state import State
TI = models.TaskInstance
return session.query(TI).filter(TI.queue == queue).filter(TI.state.in_(State.unfinished)).all()


def remove_workers(queue):
from airflow.utils.state import State
tis = query_task_instances(queue=queue)
for ti in tis:
ti.set_state(State.SUCCESS)
10 changes: 4 additions & 6 deletions dags/synaptor_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from helper_ops import scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op
from param_default import synaptor_param_default, default_synaptor_image
from synaptor_ops import manager_op, drain_op
from synaptor_ops import manager_op, drain_op, self_destruct_op
from synaptor_ops import synaptor_op, wait_op, generate_op, nglink_op


Expand Down Expand Up @@ -217,12 +217,10 @@ def add_task(
"""Adds a processing step to a DAG."""

if task.name == "self_destruct":
cluster_size = 1 if tag.startswith("synaptor-seggraph") else MAX_CLUSTER_SIZE
extra_args = {"workercount": str(cluster_size)}
cluster_key = cluster_key_from_tag(tag)
generate = self_destruct_op(dag, queue=cluster_key, tag=tag)
else:
extra_args = None

generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, extra_args=extra_args, tag=tag)
generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, tag=tag)

if tag:
wait = wait_op(dag, f"{task.name}_{tag}")
Expand Down
15 changes: 15 additions & 0 deletions dags/synaptor_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from slack_message import task_failure_alert, task_retry_alert, task_done_alert, slack_message
from nglinks import ImageLayer, SegLayer, generate_ng_payload, wrap_payload
from kombu_helper import drain_messages
from dag_utils import remove_workers

from airflow import configuration as conf

Expand Down Expand Up @@ -326,3 +327,17 @@ def wait_op(dag: DAG, taskname: str) -> PythonOperator:
queue="manager",
dag=dag,
)


def self_destruct_op(dag: DAG, queue: str, tag: str) -> PythonOperator:
"""Remove workers after finishing tasks."""
return PythonOperator(
task_id=f"self_destruct_for_queue_{tag}",
python_callable=remove_workers,
op_args=(queue,),
priority_weight=100_000,
weight_rule=WeightRule.ABSOLUTE,
on_success_callback=task_done_alert,
queue="manager",
dag=dag,
)

0 comments on commit 2b783c5

Please sign in to comment.