From aa1a534da19d2e4d074565ecb66856c8dfe9be4d Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Wed, 6 Mar 2024 23:34:19 -0500 Subject: [PATCH] Wait for dags to refresh before triggering them For segmentation and chunkflow_worker, the dag will change after sanity check, wait until they are refreshed using the last_parsed_time field --- slackbot/airflow_api.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/slackbot/airflow_api.py b/slackbot/airflow_api.py index 130eab02..a529a5ff 100644 --- a/slackbot/airflow_api.py +++ b/slackbot/airflow_api.py @@ -91,7 +91,24 @@ def __run_dag(dag_id): return trigger_dag(dag_id) +def wait_for_dag_refresh(dag_id): + last_parsed_time = None + for _ in range(10): + dag = DagModel.get_dagmodel(dag_id) + if not last_parsed_time: + last_parsed_time = dag.last_parsed_time + else: + if dag.last_parsed_time > last_parsed_time: + print(dag.last_parsed_time) + print(last_parsed_time) + return + time.sleep(5) + + def run_dag(dag_id, wait_for_completion=False): + dags_need_refresh = ["segmentation", "chunkflow_worker"] + if dag_id in dags_need_refresh: + wait_for_dag_refresh(dag_id) dagrun = run_in_executor(__run_dag, dag_id) if wait_for_completion: while True: