how to specify celery queue dynamic by parmater before dag run #14987
-
Hi,
My purpose is that set the key4 which is the machine ip to the queue param, because i want to the DAG execute
It doesn't work i also add queue to my BashExtendOperator template_fields , doesn't work either.
And use socket.gethostbyname(socket.gethostname()) just give me the same ip which runs the scheduler instance,
FULL DAG
Many thanks to your help! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
Templating does not work as it happen at the moment of execution, so it's already on worker. I would suggest experimenting with def task_policy(task: BaseOperator):
task.queue = task.dag.conf.get("key4", defaul_queue) or if you use it for all task in DAG def dag_policy(dag: DAG):
queue = dag.conf.get("key4", default_queue)
for task in dag.tasks:
task.queue = queue |
Beta Was this translation helpful? Give feedback.
Templating does not work as it happen at the moment of execution, so it's already on worker. I would suggest experimenting with
cluster policies
https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#cluster-policyor if you use it for all task in DAG