Skip to content

Commit

Permalink
Merge dev and resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
gadorlhiac committed Mar 18, 2024
2 parents 326b2be + ee31424 commit 678006a
Show file tree
Hide file tree
Showing 22 changed files with 2,761 additions and 80 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/black.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: Black-format
on: [push, pull_request]

jobs:
format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable
with:
options: "--verbose"
src: "./lute"
59 changes: 59 additions & 0 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,63 @@ TestBinaryErr:
TestSocket:
array_size: 8000 # Size of arrays to send. 8000 floats ~ 6.4e4
num_arrays: 10 # Number of arrays to send.

+FindPeaksPyAlgos:
+ outdir: ""
+ n_events: 100
+ det_name: "Rayonix"
+ event_receiver: "evr0"
+ tag: "red"
+ event_logic: false
+ psana_mask: false
+ mask_file: null
+ min_peaks: 10
+ max_peaks: 2048
+ npix_min: 2
+ npix_max: 30
+ amax_thr: 40
+ atot_thr: 180
+ son_min: 3.0
+ peak_rank: 3
+ r0: 3.0
+ dr: 2.0
+ nsigm: 10.0
+ compression:
+ compressor: "sz3"
+ abs_error: 10.0
+ bin_size: 2
+ roi_window_size: 9

IndexCrystFEL:
#in_file: "" # Location of a `.lst` file listing CXI files
#out_file: "" # Where to write the output stream file
geometry: "" # Location of a geometry file
indexing: "mosflm" # Indexing methods
int_radius: "4,5,7" # Integration radii
tolerance: "5,5,5,1.5" # Tolerances
multi: True
profile: True
no_revalidate: True

MergePartialator:
#in_file: ""
#out_file: ""
#model: "unity"
#niter: 1
symmetry: "mmm"

CompareHKL:
#in_files: ""
#fom: "Rsplit"
#nshells: 10
#shell_file: ""
#cell_file: ""
symmetry: "mmm"

#ManipulateHKL:
#output_format: "mtz"
#out_file: "..."

DimpleSolve:
pdb: "/path/to/pdb"
...
2 changes: 1 addition & 1 deletion docs/design/database.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ The `Executor` table contains information on the environment provided to the `Ex
| `communicator_desc` | Description of the Communicators used. |
| | |

**NOTE**: The `env` column is currently being ignored while a method is decided on to choose appropriate environment variables to save.
**NOTE**: The `env` column currently only stores variables related to `SLURM` or `LUTE` itself.

### `Task` tables
For every `Task` a table of the following format will be created. The exact number of columns will depend on the specific `Task`, as the number of parameters can vary between them, and each parameter gets its own column. Within a table, multiple experiments and runs can coexist. The experiment and run are not recorded directly. Instead the first two columns point to the id of entries in the general configuration and `Executor` tables respectively. The general configuration table entry will contain the experiment and run information.
Expand Down
309 changes: 309 additions & 0 deletions docs/tutorial/new_task.md

Large diffs are not rendered by default.

108 changes: 100 additions & 8 deletions lute/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""

__all__ = ["BaseExecutor", "Executor"]
__all__ = ["BaseExecutor", "Executor", "MPIExecutor"]
__author__ = "Gabriel Dorlhiac"

import _io
Expand Down Expand Up @@ -208,8 +208,35 @@ def update_environment(
" Options are: prepend, append, overwrite."
)
)
os.environ.update(env)
self._analysis_desc.task_env.update(env)

def shell_source(self, env: str) -> None:
"""Source a script.
Unlike `update_environment` this method sources a new file.
Args:
env (str): Path to the script to source.
"""
import sys

if not os.path.exists(env):
logger.info(f"Cannot source environment from {env}!")
return

script: str = (
f"set -a\n"
f'source "{env}" >/dev/null\n'
f'{sys.executable} -c "import os; print(dict(os.environ))"\n'
)
logger.info(f"Sourcing file {env}")
o, e = subprocess.Popen(
["bash", "-c", script], stdout=subprocess.PIPE
).communicate()
new_environment: Dict[str, str] = eval(o)
self._analysis_desc.task_env = new_environment

def _pre_task(self) -> None:
"""Any actions to be performed before task submission.
Expand Down Expand Up @@ -250,14 +277,11 @@ def _finalize_task(self, proc: subprocess.Popen) -> None:
def execute_task(self) -> None:
"""Run the requested Task as a subprocess."""
lute_path: Optional[str] = os.getenv("LUTE_PATH")
executable_path: str
if lute_path is not None:
executable_path = f"{lute_path}/subprocess_task.py"
else:
logger.debug("Absolute path to subprocess.py not found.")
if lute_path is None:
logger.debug("Absolute path to subprocess_task.py not found.")
lute_path = os.path.abspath(f"{os.path.dirname(__file__)}/../..")
os.environ["LUTE_PATH"] = lute_path
executable_path = f"{lute_path}/subprocess_task.py"
self.update_environment({"LUTE_PATH": lute_path})
executable_path: str = f"{lute_path}/subprocess_task.py"
config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"

Expand Down Expand Up @@ -428,3 +452,71 @@ def _finalize_task(self, proc: subprocess.Popen) -> None:
reporting to third party services, etc.
"""
self._task_loop(proc) # Perform a final read.


class MPIExecutor(Executor):
"""Runs first-party Tasks that require MPI.
This Executor is otherwise identical to the standard Executor, except it
uses `mpirun` for `Task` submission. Currently this Executor assumes a job
has been submitted using SLURM as a first step. It will determine the number
of MPI ranks based on the resources requested. As a fallback, it will try
to determine the number of local cores available for cases where a job has
not been submitted via SLURM. On S3DF, the second determination mechanism
should accurately match the environment variable provided by SLURM indicating
resources allocated.
This Executor will submit the Task to run with a number of processes equal
to the total number of cores available minus 1. A single core is reserved
for the Executor itself.
Methods:
execute_task(): Run the task as a subprocess using `mpirun`.
"""

def execute_task(self) -> None:
"""Run the requested Task as a subprocess."""
lute_path: Optional[str] = os.getenv("LUTE_PATH")
if lute_path is None:
logger.debug("Absolute path to subprocess.py not found.")
lute_path = os.path.abspath(f"{os.path.dirname(__file__)}/../..")
os.environ["LUTE_PATH"] = lute_path
executable_path: str = f"{lute_path}/subprocess_task.py"
config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"

py_cmd: str = ""
nprocs: int = max(
int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1
)
mpi_cmd: str = f"mpirun -np {nprocs}"
if __debug__:
py_cmd = f"python -B -u -m mpi4py.run {executable_path} {params}"
else:
py_cmd = f"python -OB -u -m mpi4py.run {executable_path} {params}"

cmd: str = f"{mpi_cmd} {py_cmd}"
proc: subprocess.Popen = self._submit_task(cmd)

while self._task_is_running(proc):
self._task_loop(proc)
time.sleep(self._analysis_desc.poll_interval)

os.set_blocking(proc.stdout.fileno(), True)
os.set_blocking(proc.stderr.fileno(), True)

self._finalize_task(proc)
proc.stdout.close()
proc.stderr.close()
proc.wait()
if ret := proc.returncode:
logger.info(f"Task failed with return code: {ret}")
self._analysis_desc.task_result.task_status = TaskStatus.FAILED
elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING:
# Ret code is 0, no exception was thrown, task forgot to set status
self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED
logger.debug(f"Task did not change from RUNNING status. Assume COMPLETED.")
self._store_configuration()
for comm in self._communicators:
comm.clear_communicator()

103 changes: 80 additions & 23 deletions lute/execution/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,34 +161,31 @@ def read(self, proc: subprocess.Popen) -> Message:
Returns:
msg (Message): The message read, containing contents and signal.
"""
if self._use_pickle:
signal: bytes = proc.stderr.read()
if signal is not None:
signal: str = signal.decode()
raw_contents: bytes = proc.stdout.read()
if raw_contents:
signal: Optional[str]
contents: Optional[str]
raw_signal: bytes = proc.stderr.read()
raw_contents: bytes = proc.stdout.read()
if raw_signal is not None:
signal = raw_signal.decode()
else:
signal = raw_signal
if raw_contents:
if self._use_pickle:
try:
contents: Any = pickle.loads(raw_contents)
contents = pickle.loads(raw_contents)
except pickle.UnpicklingError as err:
# Can occur if Task switches to unpickled mode before the
# Executor can
logger.debug("PipeCommunicator (Executor) - Set _use_pickle=False")
self._use_pickle = False
contents: str = raw_contents.decode()
contents = self._safe_unpickle_decode(raw_contents)
else:
contents: str = ""
else:
signal: bytes = proc.stderr.read()
if signal is not None:
try:
signal: str = signal.decode()
except UnicodeDecodeError as err:
signal: str = pickle.loads(signal)
contents: bytes = proc.stdout.read()
if contents is not None:
try:
contents: str = contents.decode()
contents = raw_contents.decode()
except UnicodeDecodeError as err:
contents: str = pickle.loads(contents)
logger.debug("PipeCommunicator (Executor) - Set _use_pickle=True")
self._use_pickle = True
contents = self._safe_unpickle_decode(raw_contents)
else:
contents = None

if signal and signal not in LUTE_SIGNALS:
# Some tasks write on stderr
Expand All @@ -198,9 +195,69 @@ def read(self, proc: subprocess.Popen) -> Message:
contents = f"({signal})"
else:
contents = f"{contents} ({signal})"
signal: str = ""
signal = None

return Message(contents=contents, signal=signal)

def _safe_unpickle_decode(self, maybe_mixed: bytes) -> Optional[str]:
"""This method is used to unpickle and/or decode a bytes object.
It attempts to handle cases where contents can be mixed, i.e., part of
the message must be decoded and the other part unpickled. It handles
only two-way splits. If there are more complex arrangements such as:
<pickled>:<unpickled>:<pickled> etc, it will give up.
The simpler two way splits are unlikely to occur in normal usage. They
may arise when debugging if, e.g., `print` statements are mixed with the
usage of the `_report_to_executor` method.
Note that this method works because ONLY text data is assumed to be
sent via the pipes. The method needs to be revised to handle non-text
data if the `Task` is modified to also send that via PipeCommunicator.
The use of pickle is supported to provide for this option if it is
necessary. It may be deprecated in the future.
Be careful when making changes. This method has seemingly redundant
checks because unpickling will not throw an error if a full object can
be retrieved. That is, the library will ignore extraneous bytes. This
method attempts to retrieve that information if the pickled data comes
first in the stream.
Args:
maybe_mixed (bytes): A bytes object which could require unpickling,
decoding, or both.
Returns:
contents (Optional[str]): The unpickled/decoded contents if possible.
Otherwise, None.
"""
contents: Optional[str]
try:
contents = pickle.loads(maybe_mixed)
repickled: bytes = pickle.dumps(contents)
if len(repickled) < len(maybe_mixed):
# Successful unpickling, but pickle stops even if there are more bytes
additional_data: str = maybe_mixed[len(repickled) :].decode()
contents = f"{contents}{additional_data}"
except pickle.UnpicklingError as err:
try:
contents = maybe_mixed.decode()
except UnicodeDecodeError as err2:
try:
contents = maybe_mixed[: err2.start].decode()
contents = f"{contents}{pickle.loads(maybe_mixed[err2.start:])}"
except Exception as err3:
logger.debug(
f"PipeCommunicator unable to decode/parse data! {err3}"
)
contents = None
except UnicodeDecodeError as err3:
missing_bytes: int = len(maybe_mixed) - len(repickled)
logger.debug(
f"PipeCommunicator has truncated message. Unable to retrieve {missing_bytes} bytes."
)
return contents

def write(self, msg: Message) -> None:
"""Write to stdout and stderr.
Expand Down
12 changes: 8 additions & 4 deletions lute/io/_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,14 @@ def _add_row_no_duplicate(

def _select_from_db(
con: sqlite3.Connection, table_name: str, col_name: str, condition: Dict[str, str]
) -> Any:
param, val = next(iter(condition.items()))
sql: str = f"SELECT {col_name} FROM {table_name} WHERE {param} = {val}"
) -> Optional[Any]:
sql: str
if condition:
param, val = next(iter(condition.items()))
sql = f"SELECT {col_name} FROM {table_name} WHERE {param} = {val}"
else:
sql = f"SELECT {col_name} FROM {table_name}"
with con:
res: sqlite3.Cursor = con.execute(sql)
entries: List[Any] = res.fetchall()
return entries[-1][0]
return entries[-1][0] if entries else None
Loading

0 comments on commit 678006a

Please sign in to comment.