Skip to content

Commit

Permalink
ENH Authentication workaround until LDAP is working. Requires launche…
Browse files Browse the repository at this point in the history
…r binary on S3DF
  • Loading branch information
gadorlhiac committed Mar 18, 2024
1 parent 687e6c3 commit 1f6d81a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
22 changes: 19 additions & 3 deletions launch_scripts/launch_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import datetime
import logging
import argparse
from typing import Dict, Union, List, Any
from typing import Dict, Union, List

import requests
from requests.auth import HTTPBasicAuth
Expand All @@ -26,6 +26,19 @@

logger: logging.Logger = logging.getLogger(__name__)


def _retrieve_pw(instance: str = "prod") -> str:
path: str = "/sdf/group/lcls/ds/tools/lute/airflow_{instance}.txt"
if instance == "prod" or instance == "test":
path = path.format(instance)
else:
raise ValueError('`instance` must be either "test" or "prod"!')

with open(path, "r") as f:
pw: str = f.readline().strip()
return pw


if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="trigger_airflow_lute_dag",
Expand All @@ -45,10 +58,13 @@
extra_args: List[str] # Should contain all SLURM arguments!
args, extra_args = parser.parse_known_args()
airflow_instance: str
instance_str: str
if args.test:
airflow_instance = "http://172.24.5.190:8080/"
instance_str = "test"
else:
airflow_instance = "http://172.24.5.247:8080/"
instance_str = "prod"

airflow_api_endpoints: Dict[str, str] = {
"health": "api/v1/health",
Expand All @@ -57,7 +73,7 @@

resp: requests.models.Response = requests.get(
f"{airflow_instance}/{airflow_api_endpoints['health']}",
auth=HTTPBasicAuth(), # NEED AUTH SOLUTION
auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)),
)
resp.raise_for_status()

Expand Down Expand Up @@ -86,7 +102,7 @@
resp: requests.models.Response = requests.post(
f"{airflow_instance}/{airflow_api_endpoints['run_dag']}",
json=dag_run_data,
auth=HTTPBasicAuth(), # NEED AUTH SOLUTION
auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)),
)
resp.raise_for_status()
logger.info(resp.text)
24 changes: 18 additions & 6 deletions workflows/airflow/pyalgos_sfx_phasing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from lute.operators.jidoperators import JIDSlurmOperator

dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}"
description: str = "Run SFX processing using PyAlgos peak finding and experimental phasing"
description: str = (
"Run SFX processing using PyAlgos peak finding and experimental phasing"
)

dag: DAG = DAG(
dag_id=dag_id,
Expand All @@ -32,19 +34,29 @@

peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag)

indexer: JIDSlurmOperator = JIDSlurmOperator(max_cores=120, task_id="CrystFELIndexer", dag=dag)
indexer: JIDSlurmOperator = JIDSlurmOperator(
max_cores=120, task_id="CrystFELIndexer", dag=dag
)

# Merge
merger: JIDSlurmOperator = JIDSlurmOperator(max_cores=120, task_id="PartialatorMerger", dag=dag)
merger: JIDSlurmOperator = JIDSlurmOperator(
max_cores=120, task_id="PartialatorMerger", dag=dag
)

# Figures of merit
hkl_comparer: JIDSlurmOperator = JIDSlurmOperator(max_cores=8, task_id="HKLComparer", dag=dag)
hkl_comparer: JIDSlurmOperator = JIDSlurmOperator(
max_cores=8, task_id="HKLComparer", dag=dag
)

# HKL conversions
hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator(max_cores=8, task_id="HKLManipulator", dag=dag)
hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator(
max_cores=8, task_id="HKLManipulator", dag=dag
)

# SHELX Tasks
shelxc: JIDSlurmOperator = JIDSlurmOperator(max_cores=20, task_id="SHELXCRunner", dag=dag)
shelxc: JIDSlurmOperator = JIDSlurmOperator(
max_cores=20, task_id="SHELXCRunner", dag=dag
)


peak_finder >> indexer >> merger >> hkl_manipulator >> shelxc
Expand Down

0 comments on commit 1f6d81a

Please sign in to comment.