Skip to content

Commit

Permalink
Merge pull request #81 from seung-lab/refresh_dags
Browse files Browse the repository at this point in the history
Wait for dags to refresh before triggering them
  • Loading branch information
ranlu authored Mar 21, 2024
2 parents aeea779 + aa1a534 commit d65426e
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions slackbot/airflow_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,24 @@ def __run_dag(dag_id):
return trigger_dag(dag_id)


def wait_for_dag_refresh(dag_id):
last_parsed_time = None
for _ in range(10):
dag = DagModel.get_dagmodel(dag_id)
if not last_parsed_time:
last_parsed_time = dag.last_parsed_time
else:
if dag.last_parsed_time > last_parsed_time:
print(dag.last_parsed_time)
print(last_parsed_time)
return
time.sleep(5)


def run_dag(dag_id, wait_for_completion=False):
dags_need_refresh = ["segmentation", "chunkflow_worker"]
if dag_id in dags_need_refresh:
wait_for_dag_refresh(dag_id)
dagrun = run_in_executor(__run_dag, dag_id)
if wait_for_completion:
while True:
Expand Down

0 comments on commit d65426e

Please sign in to comment.