Skip to content

Commit

Permalink
Merge pull request #16 from gadorlhiac/ENH/mod_sfx_dag
Browse files Browse the repository at this point in the history
ENH Complete changes to Psocake/Experimental Phasing DAG
  • Loading branch information
valmar authored Apr 8, 2024
2 parents 885b25b + 75b4483 commit fe2386c
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 18 deletions.
6 changes: 3 additions & 3 deletions lute/io/models/sfx_find_peaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class SZCompressorParameters(BaseModel):
rename_param="o",
)

@validator("out_file")
@validator("out_file", always=True)
def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
if out_file == "":
fname: Path = (
Expand Down Expand Up @@ -268,13 +268,13 @@ class SZParameters(BaseModel):
description="Configuration parameters for SZ Compression", flag_type=""
)

@validator("e")
@validator("e", always=True)
def validate_e(cls, e: str, values: Dict[str, Any]) -> str:
if e == "":
return values["lute_config"].experiment
return e

@validator("r")
@validator("r", always=True)
def validate_r(cls, r: int, values: Dict[str, Any]) -> int:
if r == -1:
return values["lute_config"].run
Expand Down
6 changes: 3 additions & 3 deletions lute/io/models/sfx_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ class ConcatenateStreamFilesParameters(TaskParameters):
description="Path to merged output stream file.",
)

@validator("in_file")
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
stream_file: Optional[str] = read_latest_db_entry(
Expand All @@ -434,7 +434,7 @@ def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
return stream_dir
return in_file

@validator("tag")
@validator("tag", always=True)
def validate_tag(cls, tag: str, values: Dict[str, Any]) -> str:
if tag == "":
stream_file: Optional[str] = read_latest_db_entry(
Expand All @@ -445,7 +445,7 @@ def validate_tag(cls, tag: str, values: Dict[str, Any]) -> str:
return stream_tag
return tag

@validator("out_file")
@validator("out_file", always=True)
def validate_out_file(cls, tag: str, values: Dict[str, Any]) -> str:
if tag == "":
stream_out_file: str = str(
Expand Down
39 changes: 31 additions & 8 deletions lute/io/models/sfx_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Config(BaseBinaryParameters.Config):
"", description="Path to input stream.", flag_type="-", rename_param="i"
)
out_file: str = Field(
"partialator.hkl",
"",
description="Path to output file.",
flag_type="-",
rename_param="o",
Expand Down Expand Up @@ -185,6 +185,29 @@ class Config(BaseBinaryParameters.Config):
rename_param="harvest-file",
)

@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
stream_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}",
"ConcatenateStreamFiles",
"out_file",
)
if stream_file:
return stream_file
return in_file

@validator("out_file", always=True)
def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
if out_file == "":
in_file: str = values["in_file"]
if in_file:
tag: str = in_file.split(".")[0]
return f"{tag}.hkl"
else:
return "partialator.hkl"
return out_file


class CompareHKLParameters(BaseBinaryParameters):
"""Parameters for CrystFEL's `compare_hkl` for calculating figures of merit.
Expand Down Expand Up @@ -267,7 +290,7 @@ class Config(BaseBinaryParameters.Config):
flag_type="--",
)

@validator("in_files")
@validator("in_files", always=True)
def validate_in_files(cls, in_files: str, values: Dict[str, Any]) -> str:
if in_files == "":
partialator_file: Optional[str] = read_latest_db_entry(
Expand All @@ -278,7 +301,7 @@ def validate_in_files(cls, in_files: str, values: Dict[str, Any]) -> str:
return hkls
return in_files

@validator("cell_file")
@validator("cell_file", always=True)
def validate_cell_file(cls, cell_file: str, values: Dict[str, Any]) -> str:
if cell_file == "":
idx_cell_file: Optional[str] = read_latest_db_entry(
Expand All @@ -291,7 +314,7 @@ def validate_cell_file(cls, cell_file: str, values: Dict[str, Any]) -> str:
return idx_cell_file
return cell_file

@validator("symmetry")
@validator("symmetry", always=True)
def validate_symmetry(cls, symmetry: str, values: Dict[str, Any]) -> str:
if symmetry == "":
partialator_sym: Optional[str] = read_latest_db_entry(
Expand All @@ -301,7 +324,7 @@ def validate_symmetry(cls, symmetry: str, values: Dict[str, Any]) -> str:
return partialator_sym
return symmetry

@validator("shell_file")
@validator("shell_file", always=True)
def validate_shell_file(cls, shell_file: str, values: Dict[str, Any]) -> str:
if shell_file == "":
partialator_file: Optional[str] = read_latest_db_entry(
Expand Down Expand Up @@ -421,7 +444,7 @@ class Config(BaseBinaryParameters.Config):
flag_type="--",
)

@validator("in_file")
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
partialator_file: Optional[str] = read_latest_db_entry(
Expand All @@ -431,7 +454,7 @@ def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
return partialator_file
return in_file

@validator("out_file")
@validator("out_file", always=True)
def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
if out_file == "":
partialator_file: Optional[str] = read_latest_db_entry(
Expand All @@ -443,7 +466,7 @@ def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
return mtz_out
return out_file

@validator("cell_file")
@validator("cell_file", always=True)
def validate_cell_file(cls, cell_file: str, values: Dict[str, Any]) -> str:
if cell_file == "":
idx_cell_file: Optional[str] = read_latest_db_entry(
Expand Down
6 changes: 3 additions & 3 deletions lute/io/models/sfx_solve.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class DimpleSolveParameters(BaseBinaryParameters):
rename_param="ItoF-prog",
)

@validator("in_file")
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
get_hkl_file: Optional[str] = read_latest_db_entry(
Expand All @@ -186,7 +186,7 @@ def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
return get_hkl_file
return in_file

@validator("out_dir")
@validator("out_dir", always=True)
def validate_out_dir(cls, out_dir: str, values: Dict[str, Any]) -> str:
if out_dir == "":
get_hkl_file: Optional[str] = read_latest_db_entry(
Expand Down Expand Up @@ -220,7 +220,7 @@ class RunSHELXCParameters(BaseBinaryParameters):
flag_type="",
)

@validator("in_file")
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
# get_hkl needed to be run to produce an XDS format file...
Expand Down
37 changes: 37 additions & 0 deletions workflows/airflow/find_peaks_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""SFX Peak finding and indexing only.
Runs peak finding with psocake followed by CrystFEL indexing.
Note:
The task_id MUST match the managed task name when defining DAGs - it is used
by the operator to properly launch it.
dag_id names must be unique, and they are not namespaced via folder
hierarchy. I.e. all DAGs on an Airflow instance must have unique ids. The
Airflow instance used by LUTE is currently shared by other software - DAG
IDs should always be prefixed with `lute_`. LUTE scripts should append this
internally, so a DAG "lute_test" can be triggered by asking for "test"
"""

from datetime import datetime
import os
from airflow import DAG
from lute.operators.jidoperators import JIDSlurmOperator

dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}"
description: str = "Run SFX peak finding and indexing."

dag: DAG = DAG(
dag_id=dag_id,
start_date=datetime(2024, 3, 18),
schedule_interval=None,
description=description,
)

peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPsocake", dag=dag)

indexer: JIDSlurmOperator = JIDSlurmOperator(
max_cores=120, task_id="CrystFELIndexer", dag=dag
)

peak_finder >> indexer
7 changes: 6 additions & 1 deletion workflows/airflow/psocake_sfx_phasing.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
max_cores=120, task_id="CrystFELIndexer", dag=dag
)

# Concatenate stream files from all previous runs with same tag
stream_concatenator: JIDSlurmOperator = JIDSlurmOperator(
max_cores=2, task_id="StreamFileConcatenator", dag=dag
)

# Merge
merger: JIDSlurmOperator = JIDSlurmOperator(
max_cores=120, task_id="PartialatorMerger", dag=dag
Expand All @@ -59,7 +64,7 @@
)


peak_finder >> indexer >> merger >> hkl_manipulator >> shelxc
peak_finder >> indexer >> stream_concatenator >> merger >> hkl_manipulator >> shelxc
merger >> hkl_comparer

# Run summaries

0 comments on commit fe2386c

Please sign in to comment.