diff --git a/bfabric/experimental/app_interface/app_runner/__main__.py b/bfabric/experimental/app_interface/app_runner/__main__.py new file mode 100644 index 0000000..922c217 --- /dev/null +++ b/bfabric/experimental/app_interface/app_runner/__main__.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import argparse +from pathlib import Path +from typing import Union +from venv import logger + +import yaml +from pydantic import TypeAdapter, BaseModel + +from bfabric.bfabric import Bfabric +from bfabric.cli_formatting import setup_script_logging +from bfabric.experimental.app_interface.app_runner._spec import AppSpec +from bfabric.experimental.app_interface.app_runner.runner import Runner + + +class ChunksFile(BaseModel): + # TODO move to better location + chunks: list[Path] + + +def main() -> None: + setup_script_logging() + client = Bfabric.from_config() + parser = argparse.ArgumentParser() + parser.add_argument("action", default="run", choices=["run"]) + parser.add_argument("--app-spec", type=Path, required=True) + parser.add_argument("--workunit-ref", type=TypeAdapter(Union[int, Path]).validate_strings, required=True) + parser.add_argument("--work-dir", type=Path, required=True) + parser.add_argument("--ssh-user", type=str, required=False) + parser.add_argument("--read-only", action="store_true") + args = parser.parse_args() + + app_spec = AppSpec.model_validate(yaml.safe_load(args.app_spec.read_text())) + runner = Runner(spec=app_spec, client=client, ssh_user=args.ssh_user) + runner.run_dispatch(workunit_ref=args.workunit_ref, work_dir=args.work_dir) + chunks_file = ChunksFile.model_validate(yaml.safe_load((args.work_dir / "chunks.yml").read_text())) + for chunk in chunks_file.chunks: + logger.info(f"Processing chunk {chunk}") + runner.run_prepare_input(chunk_dir=chunk) + runner.run_process(chunk_dir=chunk) + if not args.read_only: + runner.run_register_outputs(chunk_dir=chunk, workunit_ref=args.workunit_ref) + + +if __name__ == "__main__": + main() diff --git a/bfabric/experimental/app_interface/app_runner/runner.py b/bfabric/experimental/app_interface/app_runner/runner.py index 44897c4..a27e662 100644 --- a/bfabric/experimental/app_interface/app_runner/runner.py +++ b/bfabric/experimental/app_interface/app_runner/runner.py @@ -7,6 +7,7 @@ from bfabric.experimental.app_interface.app_runner._spec import AppSpec from bfabric.experimental.app_interface.input_preparation import prepare_folder from bfabric.experimental.app_interface.output_registration import register_outputs +from bfabric.experimental.app_interface.workunit.definition import WorkunitDefinition class Runner: @@ -17,23 +18,24 @@ def __init__(self, spec: AppSpec, client: Bfabric, ssh_user: str | None = None) def run_dispatch(self, workunit_ref: int | Path, work_dir: Path) -> None: subprocess.run( - f"{self._app_spec.dispatch} {shlex.quote(str(workunit_ref))} {shlex.quote(str(work_dir))}", + f"{self._app_spec.commands.dispatch} {shlex.quote(str(workunit_ref))} {shlex.quote(str(work_dir))}", shell=True, check=True, ) def run_prepare_input(self, chunk_dir: Path) -> None: prepare_folder( - inputs_yaml=chunk_dir / "inputs.yaml", target_folder=chunk_dir, client=self._client, ssh_user=self._ssh_user + inputs_yaml=chunk_dir / "inputs.yml", target_folder=chunk_dir, client=self._client, ssh_user=self._ssh_user ) def run_process(self, chunk_dir: Path) -> None: - subprocess.run(f"{self._app_spec.process} {shlex.quote(str(chunk_dir))}", shell=True, check=True) + subprocess.run(f"{self._app_spec.commands.process} {shlex.quote(str(chunk_dir))}", shell=True, check=True) - def run_register_outputs(self, chunk_dir: Path, workunit_id: int) -> None: + def run_register_outputs(self, chunk_dir: Path, workunit_ref: int | Path) -> None: + workunit_definition = WorkunitDefinition.from_ref(workunit_ref, client=self._client) register_outputs( - outputs_yaml=chunk_dir / "outputs.yaml", - workunit_id=workunit_id, + outputs_yaml=chunk_dir / "outputs.yml", + workunit_id=workunit_definition.registration.workunit_id, client=self._client, ssh_user=self._ssh_user, )