forked from datirium/workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
workflows_create.py
33 lines (25 loc) · 897 Bytes
/
workflows_create.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env python3
import logging
from cwl_airflow.cwldag import CWLDAG
from datetime import timedelta
from biowardrobe_cwl_workflows import available
from cwl_airflow import CWLJobGatherer, CWLJobDispatcher
_logger = logging.getLogger(__name__)
def create_biowardrobe_workflow(workflow):
_workflow_file = available(workflow=workflow)
dag = CWLDAG(default_args={
'owner': 'airflow',
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'pool': 'basic_analysis',
'retries': 10,
'retry_exponential_backoff': True,
'retry_delay': timedelta(minutes=60),
'max_retry_delay': timedelta(minutes=60 * 24)
},
cwl_workflow=_workflow_file)
dag.create()
dag.add(CWLJobDispatcher(dag=dag), to='top')
dag.add(CWLJobGatherer(dag=dag), to='bottom')
return dag