diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py index 8913e680..c82b5dcf 100755 --- a/launch_scripts/launch_airflow.py +++ b/launch_scripts/launch_airflow.py @@ -14,7 +14,7 @@ import datetime import logging import argparse -from typing import Dict, Union, List, Any +from typing import Dict, Union, List import requests from requests.auth import HTTPBasicAuth @@ -26,6 +26,19 @@ logger: logging.Logger = logging.getLogger(__name__) + +def _retrieve_pw(instance: str = "prod") -> str: + path: str = "/sdf/group/lcls/ds/tools/lute/airflow_{instance}.txt" + if instance == "prod" or instance == "test": + path = path.format(instance) + else: + raise ValueError('`instance` must be either "test" or "prod"!') + + with open(path, "r") as f: + pw: str = f.readline().strip() + return pw + + if __name__ == "__main__": parser = argparse.ArgumentParser( prog="trigger_airflow_lute_dag", @@ -45,10 +58,13 @@ extra_args: List[str] # Should contain all SLURM arguments! args, extra_args = parser.parse_known_args() airflow_instance: str + instance_str: str if args.test: airflow_instance = "http://172.24.5.190:8080/" + instance_str = "test" else: airflow_instance = "http://172.24.5.247:8080/" + instance_str = "prod" airflow_api_endpoints: Dict[str, str] = { "health": "api/v1/health", @@ -57,7 +73,7 @@ resp: requests.models.Response = requests.get( f"{airflow_instance}/{airflow_api_endpoints['health']}", - auth=HTTPBasicAuth(), # NEED AUTH SOLUTION + auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)), ) resp.raise_for_status() @@ -86,7 +102,7 @@ resp: requests.models.Response = requests.post( f"{airflow_instance}/{airflow_api_endpoints['run_dag']}", json=dag_run_data, - auth=HTTPBasicAuth(), # NEED AUTH SOLUTION + auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)), ) resp.raise_for_status() logger.info(resp.text) diff --git a/workflows/airflow/pyalgos_sfx_phasing.py b/workflows/airflow/pyalgos_sfx_phasing.py index c9217a44..1bdbe02b 100644 --- a/workflows/airflow/pyalgos_sfx_phasing.py +++ b/workflows/airflow/pyalgos_sfx_phasing.py @@ -21,7 +21,9 @@ from lute.operators.jidoperators import JIDSlurmOperator dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" -description: str = "Run SFX processing using PyAlgos peak finding and experimental phasing" +description: str = ( + "Run SFX processing using PyAlgos peak finding and experimental phasing" +) dag: DAG = DAG( dag_id=dag_id, @@ -32,19 +34,29 @@ peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag) -indexer: JIDSlurmOperator = JIDSlurmOperator(max_cores=120, task_id="CrystFELIndexer", dag=dag) +indexer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="CrystFELIndexer", dag=dag +) # Merge -merger: JIDSlurmOperator = JIDSlurmOperator(max_cores=120, task_id="PartialatorMerger", dag=dag) +merger: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="PartialatorMerger", dag=dag +) # Figures of merit -hkl_comparer: JIDSlurmOperator = JIDSlurmOperator(max_cores=8, task_id="HKLComparer", dag=dag) +hkl_comparer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLComparer", dag=dag +) # HKL conversions -hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator(max_cores=8, task_id="HKLManipulator", dag=dag) +hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLManipulator", dag=dag +) # SHELX Tasks -shelxc: JIDSlurmOperator = JIDSlurmOperator(max_cores=20, task_id="SHELXCRunner", dag=dag) +shelxc: JIDSlurmOperator = JIDSlurmOperator( + max_cores=20, task_id="SHELXCRunner", dag=dag +) peak_finder >> indexer >> merger >> hkl_manipulator >> shelxc