Skip to content

Commit

Permalink
Do not fetch broker_url for every operator
Browse files Browse the repository at this point in the history
Makeing broker_url global instead
  • Loading branch information
ranlu committed Mar 21, 2024
1 parent ff1a8d3 commit 374fe3f
Showing 1 changed file with 6 additions and 12 deletions.
18 changes: 6 additions & 12 deletions dags/synaptor_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from nglinks import ImageLayer, SegLayer, generate_ng_payload, wrap_payload
from kombu_helper import drain_messages

from airflow import configuration as conf

airflow_broker_url = conf.get("celery", "broker_url")

# hard-coding these for now
MOUNT_POINT = "/root/.cloudvolume/secrets/"
Expand Down Expand Up @@ -177,15 +180,12 @@ def drain_op(
queue: Optional[str] = "manager",
) -> PythonOperator:
"""Drains leftover messages from the RabbitMQ."""
from airflow import configuration as conf

broker_url = conf.get("celery", "broker_url")

return PythonOperator(
task_id="drain_messages",
python_callable=drain_messages,
priority_weight=100_000,
op_args=(broker_url, task_queue_name),
op_args=(airflow_broker_url, task_queue_name),
weight_rule=WeightRule.ABSOLUTE,
on_failure_callback=task_failure_alert,
on_success_callback=task_done_alert,
Expand Down Expand Up @@ -232,14 +232,11 @@ def generate_op(
image: str = default_synaptor_image,
) -> BaseOperator:
"""Generates tasks to run and adds them to the RabbitMQ."""
from airflow import configuration as conf

broker_url = conf.get("celery", "broker_url")
config_path = os.path.join(MOUNT_POINT, "synaptor_param.json")

command = (
f"generate {taskname} {config_path}"
f" --queueurl {broker_url}"
f" --queueurl {airflow_broker_url}"
f" --queuename {task_queue_name}"
)
if taskname == "self_destruct":
Expand Down Expand Up @@ -276,14 +273,11 @@ def synaptor_op(
image: str = default_synaptor_image,
) -> BaseOperator:
"""Runs a synaptor worker until it receives a self-destruct task."""
from airflow import configuration as conf

broker_url = conf.get("celery", "broker_url")
config_path = os.path.join(MOUNT_POINT, "synaptor_param.json")

command = (
f"worker --configfilename {config_path}"
f" --queueurl {broker_url} "
f" --queueurl {airflow_broker_url} "
f" --queuename {task_queue_name}"
" --lease_seconds 300"
)
Expand Down

0 comments on commit 374fe3f

Please sign in to comment.