Skip to content

Commit

Permalink
Merge branch 'main' into 511_check_undulator_gap_before_collection
Browse files Browse the repository at this point in the history
  • Loading branch information
shihab-dls authored Jan 8, 2025
2 parents 03d191d + 35a64a2 commit 0e9be45
Showing 1 changed file with 25 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pathlib import Path
from typing import Any, Literal
from unittest.mock import DEFAULT, AsyncMock, MagicMock, patch

Expand Down Expand Up @@ -88,11 +89,11 @@ def fake_devices(
yield composite, mock_save_image


def do_grid_and_edge_detect(composite, parameters):
def do_grid_and_edge_detect(composite, parameters, tmp_dir):
yield from grid_detection_plan(
composite,
parameters=parameters,
snapshot_dir="tmp",
snapshot_dir=f"{tmp_dir}",
snapshot_template="test_{angle}",
grid_width_microns=161.2,
box_size_um=20,
Expand All @@ -108,13 +109,14 @@ def test_grid_detection_plan_runs_and_triggers_snapshots(
RE: RunEngine,
test_config_files: dict[str, str],
fake_devices: tuple[OavGridDetectionComposite, MagicMock],
tmp_path: Path,
):
params = OAVParameters("loopCentring", test_config_files["oav_config_json"])
composite, image_save = fake_devices

composite.oav.grid_snapshot._save_image = (mock_save := AsyncMock())

RE(bpp.run_wrapper(do_grid_and_edge_detect(composite, params)))
RE(bpp.run_wrapper(do_grid_and_edge_detect(composite, params, tmp_path)))

assert image_save.await_count == 4
assert mock_save.call_count == 2
Expand All @@ -129,6 +131,7 @@ async def test_grid_detection_plan_gives_warning_error_if_tip_not_found(
RE: RunEngine,
test_config_files: dict[str, str],
fake_devices: tuple[OavGridDetectionComposite, MagicMock],
tmp_path: Path,
):
composite, _ = fake_devices

Expand All @@ -145,7 +148,7 @@ async def test_grid_detection_plan_gives_warning_error_if_tip_not_found(
params = OAVParameters("loopCentring", test_config_files["oav_config_json"])

with pytest.raises(WarningException) as excinfo:
RE(do_grid_and_edge_detect(composite, params))
RE(do_grid_and_edge_detect(composite, params, tmp_path))

assert "No pin found" in excinfo.value.args[0]

Expand All @@ -159,6 +162,7 @@ async def test_given_when_grid_detect_then_start_position_as_expected(
fake_devices: tuple[OavGridDetectionComposite, MagicMock],
RE: RunEngine,
test_config_files: dict[str, str],
tmp_path: Path,
):
params = OAVParameters("loopCentring", test_config_files["oav_config_json"])
box_size_um = 0.2
Expand All @@ -174,7 +178,7 @@ def decorated():
yield from grid_detection_plan(
composite,
parameters=params,
snapshot_dir="tmp",
snapshot_dir=f"{tmp_path}",
snapshot_template="test_{angle}",
grid_width_microns=161.2,
box_size_um=box_size_um,
Expand Down Expand Up @@ -204,13 +208,14 @@ def test_when_grid_detection_plan_run_twice_then_values_do_not_persist_in_callba
fake_devices: tuple[OavGridDetectionComposite, MagicMock],
RE: RunEngine,
test_config_files: dict[str, str],
tmp_path: Path,
):
params = OAVParameters("loopCentring", test_config_files["oav_config_json"])

composite, _ = fake_devices

for _ in range(2):
RE(bpp.run_wrapper(do_grid_and_edge_detect(composite, params)))
RE(bpp.run_wrapper(do_grid_and_edge_detect(composite, params, tmp_path)))


@patch(
Expand All @@ -223,6 +228,7 @@ async def test_when_grid_detection_plan_run_then_ispyb_callback_gets_correct_val
RE: RunEngine,
test_config_files: dict[str, str],
test_fgs_params: HyperionThreeDGridScan,
tmp_path: Path,
):
params = OAVParameters("loopCentring", test_config_files["oav_config_json"])
composite, _ = fake_devices
Expand All @@ -232,7 +238,7 @@ async def test_when_grid_detection_plan_run_then_ispyb_callback_gets_correct_val
with patch.multiple(cb, activity_gated_start=DEFAULT, activity_gated_event=DEFAULT):
RE(
ispyb_activation_wrapper(
do_grid_and_edge_detect(composite, params), test_fgs_params
do_grid_and_edge_detect(composite, params, tmp_path), test_fgs_params
)
)

Expand All @@ -250,9 +256,9 @@ async def test_when_grid_detection_plan_run_then_ispyb_callback_gets_correct_val
"oav-grid_snapshot-box_width": pytest.approx(12, abs=1),
"oav-microns_per_pixel_x": 1.58,
"oav-microns_per_pixel_y": 1.58,
"oav-grid_snapshot-last_path_full_overlay": "tmp/test_0_grid_overlay.png",
"oav-grid_snapshot-last_path_outer": "tmp/test_0_outer_overlay.png",
"oav-grid_snapshot-last_saved_path": "tmp/test_0.png",
"oav-grid_snapshot-last_path_full_overlay": f"{tmp_path}/test_0_grid_overlay.png",
"oav-grid_snapshot-last_path_outer": f"{tmp_path}/test_0_outer_overlay.png",
"oav-grid_snapshot-last_saved_path": f"{tmp_path}/test_0.png",
},
)
assert_event(
Expand All @@ -265,9 +271,9 @@ async def test_when_grid_detection_plan_run_then_ispyb_callback_gets_correct_val
"oav-grid_snapshot-box_width": pytest.approx(12, abs=1),
"oav-microns_per_pixel_x": 1.58,
"oav-microns_per_pixel_y": 1.58,
"oav-grid_snapshot-last_path_full_overlay": "tmp/test_90_grid_overlay.png",
"oav-grid_snapshot-last_path_outer": "tmp/test_90_outer_overlay.png",
"oav-grid_snapshot-last_saved_path": "tmp/test_90.png",
"oav-grid_snapshot-last_path_full_overlay": f"{tmp_path}/test_90_grid_overlay.png",
"oav-grid_snapshot-last_path_outer": f"{tmp_path}/test_90_outer_overlay.png",
"oav-grid_snapshot-last_saved_path": f"{tmp_path}/test_90.png",
},
)

Expand All @@ -282,6 +288,7 @@ def test_when_grid_detection_plan_run_then_grid_detection_callback_gets_correct_
RE: RunEngine,
test_config_files: dict[str, str],
test_fgs_params: HyperionThreeDGridScan,
tmp_path: Path,
):
params = OAVParameters("loopCentring", test_config_files["oav_config_json"])
composite, _ = fake_devices
Expand All @@ -291,7 +298,7 @@ def test_when_grid_detection_plan_run_then_grid_detection_callback_gets_correct_

RE(
ispyb_activation_wrapper(
do_grid_and_edge_detect(composite, params), test_fgs_params
do_grid_and_edge_detect(composite, params, tmp_path), test_fgs_params
)
)

Expand Down Expand Up @@ -327,6 +334,7 @@ async def test_when_detected_grid_has_odd_y_steps_then_add_a_y_step_and_shift_gr
sim_run_engine: RunEngineSimulator,
test_config_files: dict[str, str],
odd: bool,
tmp_path: Path,
):
composite, _ = fake_devices
params = OAVParameters("loopCentring", test_config_files["oav_config_json"])
Expand Down Expand Up @@ -367,7 +375,9 @@ def record_set(msg: Msg):
sim_run_engine.add_read_handler_for(composite.oav.microns_per_pixel_x, 1.58)
sim_run_engine.add_read_handler_for(composite.oav.microns_per_pixel_y, 1.58)

msgs = sim_run_engine.simulate_plan(do_grid_and_edge_detect(composite, params))
msgs = sim_run_engine.simulate_plan(
do_grid_and_edge_detect(composite, params, tmp_path)
)

expected_min_y = initial_min_y - box_size_y_pixels / 2 if odd else initial_min_y
expected_y_steps = 2
Expand Down

0 comments on commit 0e9be45

Please sign in to comment.