From 374fe3f50fe35ddf98fdaef47f46dcad23c878c6 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Sun, 10 Mar 2024 18:38:19 -0400 Subject: [PATCH] Do not fetch broker_url for every operator Makeing broker_url global instead --- dags/synaptor_ops.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/dags/synaptor_ops.py b/dags/synaptor_ops.py index 4120770e..6374e592 100644 --- a/dags/synaptor_ops.py +++ b/dags/synaptor_ops.py @@ -15,6 +15,9 @@ from nglinks import ImageLayer, SegLayer, generate_ng_payload, wrap_payload from kombu_helper import drain_messages +from airflow import configuration as conf + +airflow_broker_url = conf.get("celery", "broker_url") # hard-coding these for now MOUNT_POINT = "/root/.cloudvolume/secrets/" @@ -177,15 +180,12 @@ def drain_op( queue: Optional[str] = "manager", ) -> PythonOperator: """Drains leftover messages from the RabbitMQ.""" - from airflow import configuration as conf - - broker_url = conf.get("celery", "broker_url") return PythonOperator( task_id="drain_messages", python_callable=drain_messages, priority_weight=100_000, - op_args=(broker_url, task_queue_name), + op_args=(airflow_broker_url, task_queue_name), weight_rule=WeightRule.ABSOLUTE, on_failure_callback=task_failure_alert, on_success_callback=task_done_alert, @@ -232,14 +232,11 @@ def generate_op( image: str = default_synaptor_image, ) -> BaseOperator: """Generates tasks to run and adds them to the RabbitMQ.""" - from airflow import configuration as conf - - broker_url = conf.get("celery", "broker_url") config_path = os.path.join(MOUNT_POINT, "synaptor_param.json") command = ( f"generate {taskname} {config_path}" - f" --queueurl {broker_url}" + f" --queueurl {airflow_broker_url}" f" --queuename {task_queue_name}" ) if taskname == "self_destruct": @@ -276,14 +273,11 @@ def synaptor_op( image: str = default_synaptor_image, ) -> BaseOperator: """Runs a synaptor worker until it receives a self-destruct task.""" - from airflow import configuration as conf - - broker_url = conf.get("celery", "broker_url") config_path = os.path.join(MOUNT_POINT, "synaptor_param.json") command = ( f"worker --configfilename {config_path}" - f" --queueurl {broker_url} " + f" --queueurl {airflow_broker_url} " f" --queuename {task_queue_name}" " --lease_seconds 300" )