Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds on_crash hook and updates doc on main. #458

Merged
merged 3 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions docs/source/prefect.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,6 @@ In HPC, you can use the following command to create a work pool.

If the prefect server is running, but the workpool is not available, then you can create a workpool by going to the website where the prefect server is hosted. Go to `Work Pools` tab and create a workpool with the name `workpool`. This is the name of the workpool that is defined in [prefect.yaml > definitions > work_pools > name](https://github.com/niaid/image_portal_workflows/pull/353/files#diff-b49a6f022232810a70f1a0c2feffbbe84d018b2418a7996e52430c6063ada3a3R23) file.

Deploying workflows
-------------------

Once the prefect server is running and workpool is created. You can login to respective *BigSky* instance (dev, qa, prod) and deploy the workflows.

.. code-block::

prefect deploy --all

Continuous Deployment (dev to qa to prod)
-----------------------------------------

Expand All @@ -41,3 +32,45 @@ Afterwards, we can deploy the aws infrastructure for `qa` as,
SPACES_SOLUTION_ENV=qa spaces task -f hedwig.spaces-solution.yaml build-deploy

This can then again be applied for the `qa` to `prod` changes.

Managing Prefect Worker
=======================

Deploying workflows
-------------------

Once the prefect server is running and workpool is created. You can login to respective *BigSky* instance (dev, qa, prod) and deploy the workflows.

Make sure the configurations are correct:

1. Update prefect.yaml to change the user and/or directory names

.. code-block::

# by default pull.directory setting is set for prod environment
directory: /gs1/home/hedwig_prod/image_portal_workflows

# change it to dev or qa, based on your environment

2. Check prefect config with view

.. code-block::

prefect config view

# Update config iff required
export PREFECT_API_KEY=xyz
export PREFECT_API_URL=abc.com

3. Deploy flows with prefect deploy

.. code-block::

prefect deploy
# Or deploy all based on prefect.yaml using the following setting
# However, this will also deploy pytest_runner workflow in other envs (where it's not needed)
# prefect deploy --all

4. Run worker (properly via the helper_scripts/.service file)

The service files should restarts the worker when killed. Normally, we would need to do this step
14 changes: 13 additions & 1 deletion docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Workflow servers are deployed in `development <https://prefect2.hedwig-workflow-

In order to submit a workflow job, you will need deployment IDs of each of the workflows, as shown:

.. list-table:: Deployment IDs (**Updated: 12/11/2023**)
.. list-table:: Deployment IDs (**Updated: 01/26/2023**)
:widths: 20 25 25 25
:header-rows: 1

Expand Down Expand Up @@ -302,6 +302,18 @@ The pipeline parameters can be also be observed programatically using following
| jq '.description,.id,.parameter_openapi_schema' > schema.json


Pipeline Response
-----------------

All pipelines responses follow the same schema. Currently, all schema are documented in yaml files as such:

.. literalinclude:: ../../api_schema/PipelineCallback.yaml
:language: yaml
:emphasize-lines: 1-5,13-21
:linenos:

You can explore more on the yaml files in the `Github <https://github.com/niaid/image_portal_workflows/tree/main/api_schema>`_ repo.

CLI/SDK Submission
------------------

Expand Down
4 changes: 4 additions & 0 deletions em_workflows/brt/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,10 @@ def get_callback_result(callback_data: list) -> list:
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
on_crashed=[
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
)
def brt_flow(
# This block of params map are for adoc file specfication.
Expand Down
4 changes: 4 additions & 0 deletions em_workflows/czi/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ def update_file_metadata(file_path: FilePath, callback_with_zarr: Dict) -> Dict:
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
on_crashed=[
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
)
async def czi_flow(
file_share: str,
Expand Down
4 changes: 4 additions & 0 deletions em_workflows/dm_conversion/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ def scale_jpegs(file_path: FilePath, size: str) -> Optional[dict]:
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
on_crashed=[
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
)
def dm_flow(
file_share: str,
Expand Down
4 changes: 4 additions & 0 deletions em_workflows/lrg_2d_rgb/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ def gen_thumb(file_path: FilePath):
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
on_crashed=[
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
)
# run_config=LocalRun(labels=[utils.get_environment()]),
def lrg_2d_flow(
Expand Down
4 changes: 4 additions & 0 deletions em_workflows/sem_tomo/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ def gen_ng_metadata(fp_in: FilePath) -> Dict:
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
on_crashed=[
utils.notify_api_completion,
utils.copy_workdirs_and_cleanup_hook,
],
)
def sem_tomo_flow(
file_share: str,
Expand Down
11 changes: 0 additions & 11 deletions test/test_brt.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,6 @@ def test_brt_server_response(mock_nfs_mount, caplog, mock_callback_data):
assert len(set(asset_paths)) == len(
asset_paths
), "Asset paths should have been different"
# Note: There are many details to check further, but for now we are simply asserting
# entire response comes through. In the future, if the inner details change this
# should be moved to assert checks
# Note REMOVE ME if the test data changes frequenlty enough
expected_response = { "files": [ { "primaryFilePath": "test/input_files/brt/Projects/RT_TOMO/2013-1220-dA30_5-BSC-1_10.mrc", "status": "success", "message": None, "thumbnailIndex": 0, "title": "2013-1220-dA30_5-BSC-1_10", "fileMetadata": None, "imageSet": [ { "imageName": "2013-1220-dA30_5-BSC-1_10", "imageMetadata": None, "assets": [ { "type": "thumbnail", "path": "test/input_files/brt/Assets/RT_TOMO/2013-1220-dA30_5-BSC-1_10/keyimg_2013-1220-dA30_5-BSC-1_10_ali_ali.060_s.jpg", }, { "type": "keyImage", "path": "test/input_files/brt/Assets/RT_TOMO/2013-1220-dA30_5-BSC-1_10/2013-1220-dA30_5-BSC-1_10_ali_ali.060.jpg", }, { "type": "neuroglancerZarr", "path": "test/input_files/brt/Assets/RT_TOMO/2013-1220-dA30_5-BSC-1_10/2013-1220-dA30_5-BSC-1_10.zarr/0", "metadata": { "shader": "Grayscale", "dimensions": "XYZ", "shaderParameters": { "range": [-785, -419], "window": [-1242, -278], }, }, }, { "type": "volume", "path": "test/input_files/brt/Assets/RT_TOMO/2013-1220-dA30_5-BSC-1_10/ave_2013-1220-dA30_5-BSC-1_10_rec.mrc", }, { "type": "averagedVolume", "path": "test/input_files/brt/Assets/RT_TOMO/2013-1220-dA30_5-BSC-1_10/avebin8_ave_2013-1220-dA30_5-BSC-1_10_rec.mrc", }, { "type": "recMovie", "path": "test/input_files/brt/Assets/RT_TOMO/2013-1220-dA30_5-BSC-1_10/ave_2013-1220-dA30_5-BSC-1_10_rec_keyMov.mp4", }, { "type": "tiltMovie", "path": "test/input_files/brt/Assets/RT_TOMO/2013-1220-dA30_5-BSC-1_10/tiltMov_2013-1220-dA30_5-BSC-1_10_ali.mp4", }, ], } ], } ] } # noqa
assert response == expected_response


@pytest.mark.localdata
Expand Down Expand Up @@ -144,8 +138,6 @@ def test_brt_response_partial_failure(mock_nfs_mount, caplog, mock_callback_data
result1, result2 = response["files"]
assert result1["status"] != result2["status"], "One should have been error"

expected_response = { "files": [ { "primaryFilePath": "test/input_files/brt/Projects/RT_TOMO/Partly_Correct/2013-1220-dA30_5-BSC-1_10.mrc", "status": "success", "message": None, "thumbnailIndex": 0, "title": "2013-1220-dA30_5-BSC-1_10", "fileMetadata": None, "imageSet": [ { "imageName": "2013-1220-dA30_5-BSC-1_10", "imageMetadata": None, "assets": [ { "type": "thumbnail", "path": "test/input_files/brt/Assets/RT_TOMO/Partly_Correct/2013-1220-dA30_5-BSC-1_10/keyimg_2013-1220-dA30_5-BSC-1_10_ali_ali.060_s.jpg", }, { "type": "keyImage", "path": "test/input_files/brt/Assets/RT_TOMO/Partly_Correct/2013-1220-dA30_5-BSC-1_10/2013-1220-dA30_5-BSC-1_10_ali_ali.060.jpg", }, { "type": "neuroglancerZarr", "path": "test/input_files/brt/Assets/RT_TOMO/Partly_Correct/2013-1220-dA30_5-BSC-1_10/2013-1220-dA30_5-BSC-1_10.zarr/0", "metadata": { "shader": "Grayscale", "dimensions": "XYZ", "shaderParameters": { "range": [-785, -419], "window": [-1242, -278], }, }, }, { "type": "volume", "path": "test/input_files/brt/Assets/RT_TOMO/Partly_Correct/2013-1220-dA30_5-BSC-1_10/ave_2013-1220-dA30_5-BSC-1_10_rec.mrc", }, { "type": "averagedVolume", "path": "test/input_files/brt/Assets/RT_TOMO/Partly_Correct/2013-1220-dA30_5-BSC-1_10/avebin8_ave_2013-1220-dA30_5-BSC-1_10_rec.mrc", }, { "type": "recMovie", "path": "test/input_files/brt/Assets/RT_TOMO/Partly_Correct/2013-1220-dA30_5-BSC-1_10/ave_2013-1220-dA30_5-BSC-1_10_rec_keyMov.mp4", }, { "type": "tiltMovie", "path": "test/input_files/brt/Assets/RT_TOMO/Partly_Correct/2013-1220-dA30_5-BSC-1_10/tiltMov_2013-1220-dA30_5-BSC-1_10_ali.mp4", }, ], } ], }, { "primaryFilePath": "test/input_files/brt/Projects/RT_TOMO/Partly_Correct/2013-1220-dA30_5-BSC-1_10-Broken.mrc", "status": "error", "message": "Failure in pipeline step: Batchruntomo conversion", "thumbnailIndex": 0, "title": "2013-1220-dA30_5-BSC-1_10-Broken", "fileMetadata": None, "imageSet": [ { "imageName": "2013-1220-dA30_5-BSC-1_10-Broken", "imageMetadata": None, "assets": [], } ], }, ] } # noqa
assert response == expected_response
result_success, result_error = result1, result2
if result1["status"] == "error":
result_success, result_error = result2, result1
Expand Down Expand Up @@ -194,6 +186,3 @@ def test_brt_response_all_failure(mock_nfs_mount, caplog, mock_callback_data):
result = response["files"][0]
assert result["status"] == "error"
assert result["message"], "Error message is empty"
# Asserting an entire structure of the response to the api server
expected_response = { "files": [ { "primaryFilePath": "test/input_files/brt/Projects/RT_TOMO/Failure/2013-1220-dA30_5-BSC-1_10-Broken.mrc", "status": "error", "message": "Failure in pipeline step: Batchruntomo conversion", "thumbnailIndex": 0, "title": "2013-1220-dA30_5-BSC-1_10-Broken", "fileMetadata": None, "imageSet": [ { "imageName": "2013-1220-dA30_5-BSC-1_10-Broken", "imageMetadata": None, "assets": [], } ], } ] } # noqa
assert response == expected_response
6 changes: 5 additions & 1 deletion test/test_czi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from pathlib import Path
import pytest

from prefect.exceptions import UnfinishedRun


@pytest.mark.slow
@pytest.mark.localdata
Expand All @@ -19,6 +21,7 @@ async def test_input_fname(mock_nfs_mount, caplog, mock_reuse_zarr):
assert state.is_completed()


@pytest.mark.asyncio
async def test_no_mount_point_flow_fails(mock_binaries, monkeypatch, caplog):
"""
If mounted path doesn't exist should fail the flow immediately
Expand All @@ -31,7 +34,7 @@ async def test_no_mount_point_flow_fails(mock_binaries, monkeypatch, caplog):

monkeypatch.setattr(config, "NFS_MOUNT", _mock_NFS_MOUNT)

with pytest.raises(RuntimeError):
with pytest.raises(UnfinishedRun):
await czi_flow(
file_share=share_name,
input_dir="test/input_files/IF_czi/Projects/Cropped_Image/",
Expand All @@ -40,6 +43,7 @@ async def test_no_mount_point_flow_fails(mock_binaries, monkeypatch, caplog):
assert f"{share_name} doesn't exist. Failing!" in caplog.text, caplog.text


@pytest.mark.asyncio
async def test_czi_workflow_server_response_structure(
mock_nfs_mount, caplog, mock_reuse_zarr, mock_callback_data
):
Expand Down
5 changes: 1 addition & 4 deletions test/test_lrg_2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ def test_lrg_2d_flow_server_response(mock_nfs_mount, mock_callback_data):
), "not all asset.path is str"
assert all([asset["path"] for asset in assets]), "not all asset.path is valid"

expected_response = { "files": [ { "primaryFilePath": "test/input_files/lrg_ROI_pngs/Projects/even_smaller.png", "status": "success", "message": None, "thumbnailIndex": 0, "title": "even_smaller", "fileMetadata": None, "imageSet": [ { "imageName": "even_smaller", "imageMetadata": None, "assets": [ { "type": "thumbnail", "path": "test/input_files/lrg_ROI_pngs/Assets/even_smaller/even_smaller_sm.jpeg", }, { "type": "keyImage", "path": "test/input_files/lrg_ROI_pngs/Assets/even_smaller/even_smaller_lg.jpeg", }, { "type": "neuroglancerZarr", "path": "test/input_files/lrg_ROI_pngs/Assets/even_smaller/even_smaller.zarr/0", "metadata": { "shader": "RGB", "dimensions": "XY", "shaderParameters": {}, }, }, ], } ], } ] } # noqa
assert response == expected_response, "response and expected response don't match"


def test_lrg_2d_flow_failure_server_response(
monkeypatch, mock_nfs_mount, mock_callback_data
Expand Down Expand Up @@ -104,7 +101,7 @@ def fake_gen_zarr(file_path, input_fname):

state = lrg_2d_flow(
file_share="test",
input_dir="/test/input_files/lrg_ROI_pngs/Projects/Partial_Correct/",
input_dir="/test/input_files/lrg_ROI_pngs/Projects/",
x_no_api=True,
return_state=True,
)
Expand Down
7 changes: 1 addition & 6 deletions test/test_sem.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_sem_server_response(mock_nfs_mount, caplog, mock_callback_data):
x_no_api=True,
return_state=True,
)
assert state.is_completed()
assert state.is_completed(), "Flow run failed"

response = {}
with open(mock_callback_data) as fd:
Expand Down Expand Up @@ -53,11 +53,6 @@ def test_sem_server_response(mock_nfs_mount, caplog, mock_callback_data):
), "not all asset.path is str"
assert all([asset["path"] for asset in assets]), "not all asset.path is valid"

# Note REMOVE ME if the test data changes frequenlty enough
expected_response = { "files": [ { "primaryFilePath": "test/input_files/sem_inputs/Projects/YFV-Asibi/YFV-Asibi-Copy", "status": "success", "message": None, "thumbnailIndex": 0, "title": "YFV-Asibi-Copy", "fileMetadata": None, "imageSet": [ { "imageName": "YFV-Asibi-Copy", "imageMetadata": None, "assets": [ { "type": "thumbnail", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi-Copy/keyimg_sm.jpg", }, { "type": "keyImage", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi-Copy/keyimg.jpg", }, { "type": "neuroglancerZarr", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi-Copy/YFV-Asibi-Copy.zarr/0", "metadata": { "shader": "Grayscale", "dimensions": "XYZ", "shaderParameters": { "range": [-47, 127], "window": [-47, 127], }, }, }, { "type": "averagedVolume", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi-Copy/adjusted.mrc", }, { "type": "recMovie", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi-Copy/YFV-Asibi-Copy_recMovie.mp4", }, ], } ], }, { "primaryFilePath": "test/input_files/sem_inputs/Projects/YFV-Asibi/YFV-Asibi", "status": "success", "message": None, "thumbnailIndex": 0, "title": "YFV-Asibi", "fileMetadata": None, "imageSet": [ { "imageName": "YFV-Asibi", "imageMetadata": None, "assets": [ { "type": "thumbnail", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi/keyimg_sm.jpg", }, { "type": "keyImage", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi/keyimg.jpg", }, { "type": "neuroglancerZarr", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi/YFV-Asibi.zarr/0", "metadata": { "shader": "Grayscale", "dimensions": "XYZ", "shaderParameters": { "range": [-47, 127], "window": [-47, 127], }, }, }, { "type": "averagedVolume", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi/adjusted.mrc", }, { "type": "recMovie", "path": "test/input_files/sem_inputs/Assets/YFV-Asibi/YFV-Asibi/YFV-Asibi_recMovie.mp4", }, ], } ], }, ] } # noqa

assert response == expected_response


@pytest.mark.localdata
@pytest.mark.slow
Expand Down
6 changes: 3 additions & 3 deletions test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ def test_mount_config(mock_nfs_mount):
Limited checks of Config constants
:todo: Rewrite using @pytest.mark.parametrize to limit repetition
"""
proj_dir = Config.proj_dir("test")
assert "image_portal_workflows" in proj_dir
# assert env in proj_dir # Not true in GitHub Actions test
# proj_dir = Config.proj_dir("test")
# assert "image_portal_workflows" in proj_dir # disabled because in test_runner dir
# assert env in proj_dir # Not true in GitHub Actions test

assets_dir = Config.assets_dir(share_name="/mocked")
assert "image_portal_workflows" in assets_dir
Expand Down
Loading