Skip to content

Commit

Permalink
split process into process and collect steps
Browse files Browse the repository at this point in the history
  • Loading branch information
leoschwarz committed Sep 23, 2024
1 parent 87b46dd commit efbeca2
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Versioning currently follows `X.Y.Z` where
### Changed

- (internal) migrate to src layout
- (experimental) the former `process` step of the app runner has been split into a `process` and `collect` step where,
the collect step is responsible for generating the `output.yml` file that will then be used to register the results.

## \[1.13.7\] - 2024-09-17

Expand Down
1 change: 1 addition & 0 deletions src/bfabric/experimental/app_interface/app_runner/_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class CommandsSpec(BaseModel):
dispatch: str
process: str
collect: str


class AppSpec(BaseModel):
Expand Down
8 changes: 8 additions & 0 deletions src/bfabric/experimental/app_interface/app_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ def run_prepare_input(self, chunk_dir: Path) -> None:
inputs_yaml=chunk_dir / "inputs.yml", target_folder=chunk_dir, client=self._client, ssh_user=self._ssh_user
)

def run_collect(self, workunit_ref: int | Path, chunk_dir: Path) -> None:
subprocess.run(
f"{self._app_spec.commands.collect} " f"{shlex.quote(str(workunit_ref))} " f"{shlex.quote(str(chunk_dir))}",
shell=True,
check=True,
)

def run_process(self, chunk_dir: Path) -> None:
subprocess.run(f"{self._app_spec.commands.process} {shlex.quote(str(chunk_dir))}", shell=True, check=True)

Expand Down Expand Up @@ -71,6 +78,7 @@ def run_app(
logger.info(f"Processing chunk {chunk}")
runner.run_prepare_input(chunk_dir=chunk)
runner.run_process(chunk_dir=chunk)
runner.run_collect(workunit_ref=workunit_ref, chunk_dir=chunk)
if not read_only:
runner.run_register_outputs(chunk_dir=chunk, workunit_ref=workunit_ref)

Expand Down

0 comments on commit efbeca2

Please sign in to comment.