diff --git a/src/dodal/beamlines/i10.py b/src/dodal/beamlines/i10.py new file mode 100644 index 0000000000..f95d97f509 --- /dev/null +++ b/src/dodal/beamlines/i10.py @@ -0,0 +1,257 @@ +from pathlib import Path + +from dodal.common.beamlines.beamline_utils import device_instantiation +from dodal.common.beamlines.beamline_utils import set_beamline as set_utils_beamline +from dodal.devices.apple2_undulator import ( + UndulatorGap, + UndulatorJawPhase, + UndulatorPhaseAxes, +) +from dodal.devices.i10.i10_apple2 import ( + I10Apple2, + I10Apple2PGM, + I10Apple2Pol, + LinearArbitraryAngle, +) +from dodal.devices.i10.i10_setting_data import I10Grating +from dodal.devices.pgm import PGM +from dodal.log import set_beamline as set_log_beamline +from dodal.utils import BeamlinePrefix, get_beamline_name + +BL = get_beamline_name("i10") +set_log_beamline(BL) +set_utils_beamline(BL) + +LOOK_UPTABLE_DIR = "/dls_sw/i10/software/gda/workspace_git/gda-diamond.git/configurations/i10-shared/lookupTables/" +""" +I10 has two insertion devices one up(idu) and one down stream(idd). +It is worth noting that the down stream device is slightly longer, + so it can reach Mn edge for linear arbitrary. + idd == id1 + and + idu == id2. +""" + + +def idd_gap( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> UndulatorGap: + return device_instantiation( + device_factory=UndulatorGap, + name="idd_gap", + prefix=f"{BeamlinePrefix(BL).insertion_prefix}-MO-SERVC-01:", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + bl_prefix=False, + ) + + +def idd_phase_axes( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> UndulatorPhaseAxes: + return device_instantiation( + device_factory=UndulatorPhaseAxes, + name="idd_phase_axes", + prefix=f"{BeamlinePrefix(BL).insertion_prefix}-MO-SERVC-01:", + top_outer="RPQ1", + top_inner="RPQ2", + btm_inner="RPQ3", + btm_outer="RPQ4", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + bl_prefix=False, + ) + + +def idd_jaw( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> UndulatorJawPhase: + return device_instantiation( + device_factory=UndulatorJawPhase, + name="idd_jaw", + prefix=f"{BeamlinePrefix(BL).insertion_prefix}-MO-SERVC-01:", + move_pv="RPQ1", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + bl_prefix=False, + ) + + +def idu_gap( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> UndulatorGap: + return device_instantiation( + device_factory=UndulatorGap, + name="idu_gap", + prefix=f"{BeamlinePrefix(BL).insertion_prefix}-MO-SERVC-21:", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + bl_prefix=False, + ) + + +def idu_phase_axes( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> UndulatorPhaseAxes: + return device_instantiation( + device_factory=UndulatorPhaseAxes, + name="idu_phase_axes", + prefix=f"{BeamlinePrefix(BL).insertion_prefix}-MO-SERVC-21:", + top_outer="RPQ1", + top_inner="RPQ2", + btm_inner="RPQ3", + btm_outer="RPQ4", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + bl_prefix=False, + ) + + +def idu_jaw( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> UndulatorJawPhase: + return device_instantiation( + device_factory=UndulatorJawPhase, + name="idu_jaw", + prefix=f"{BeamlinePrefix(BL).insertion_prefix}-MO-SERVC-21:", + move_pv="RPQ1", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + bl_prefix=False, + ) + + +def pgm(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) -> PGM: + return device_instantiation( + device_factory=PGM, + name="pgm", + prefix="-OP-PGM-01:", + grating=I10Grating, + gratingPv="NLINES2", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + ) + + +def idu_gap_phase( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> I10Apple2: + return device_instantiation( + device_factory=I10Apple2, + id_gap=idu_gap(wait_for_connection, fake_with_ophyd_sim), + id_phase=idu_phase_axes(wait_for_connection, fake_with_ophyd_sim), + id_jaw_phase=idu_jaw(wait_for_connection, fake_with_ophyd_sim), + energy_gap_table_path=Path( + LOOK_UPTABLE_DIR + "IDEnergy2GapCalibrations.csv", + ), + energy_phase_table_path=Path( + LOOK_UPTABLE_DIR + "IDEnergy2PhaseCalibrations.csv", + ), + source=("Source", "idu"), + name="idu_gap_phase", + prefix="", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + ) + + +def idd_gap_phase( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> I10Apple2: + return device_instantiation( + device_factory=I10Apple2, + id_gap=idd_gap(wait_for_connection, fake_with_ophyd_sim), + id_phase=idd_phase_axes(wait_for_connection, fake_with_ophyd_sim), + id_jaw_phase=idd_jaw(wait_for_connection, fake_with_ophyd_sim), + energy_gap_table_path=Path( + LOOK_UPTABLE_DIR + "IDEnergy2GapCalibrations.csv", + ), + energy_phase_table_path=Path( + LOOK_UPTABLE_DIR + "IDEnergy2PhaseCalibrations.csv", + ), + source=("Source", "idd"), + name="idd_gap_phase", + prefix="", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + ) + + +def idu_pol( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> I10Apple2Pol: + return device_instantiation( + device_factory=I10Apple2Pol, + prefix="", + id=idu_gap_phase(wait_for_connection, fake_with_ophyd_sim), + name="idu_pol", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + ) + + +def idd_pol( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> I10Apple2Pol: + return device_instantiation( + device_factory=I10Apple2Pol, + prefix="", + id=idd_gap_phase(wait_for_connection, fake_with_ophyd_sim), + name="idd_pol", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + ) + + +def idu( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> I10Apple2PGM: + return device_instantiation( + device_factory=I10Apple2PGM, + prefix="", + id=idu_gap_phase(wait_for_connection, fake_with_ophyd_sim), + pgm=pgm(wait_for_connection, fake_with_ophyd_sim), + name="idu", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + ) + + +def idd( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> I10Apple2PGM: + return device_instantiation( + device_factory=I10Apple2PGM, + prefix="", + id=idd_gap_phase(wait_for_connection, fake_with_ophyd_sim), + pgm=pgm(wait_for_connection, fake_with_ophyd_sim), + name="idd", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + ) + + +def idu_la_angle( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> LinearArbitraryAngle: + return device_instantiation( + device_factory=LinearArbitraryAngle, + prefix="", + id=idu(wait_for_connection, fake_with_ophyd_sim), + name="idu_la_angle", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + ) + + +def idd_la_angle( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> LinearArbitraryAngle: + return device_instantiation( + device_factory=LinearArbitraryAngle, + prefix="", + id=idu(wait_for_connection, fake_with_ophyd_sim), + name="idd_la_angle", + wait=wait_for_connection, + fake=fake_with_ophyd_sim, + ) diff --git a/src/dodal/devices/apple2_undulator.py b/src/dodal/devices/apple2_undulator.py new file mode 100644 index 0000000000..7baa9a1f86 --- /dev/null +++ b/src/dodal/devices/apple2_undulator.py @@ -0,0 +1,602 @@ +import abc +import asyncio +from dataclasses import dataclass +from enum import Enum +from typing import Any + +import numpy as np +from bluesky.protocols import Movable +from ophyd_async.core import ( + AsyncStatus, + ConfigSignal, + HintedSignal, + StandardReadable, + soft_signal_r_and_setter, + wait_for_value, +) +from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw, epics_signal_w +from pydantic import BaseModel, ConfigDict, RootModel + +from dodal.log import LOGGER + + +class UndulatorGateStatus(str, Enum): + open = "Open" + close = "Closed" + + +@dataclass +class Apple2PhasesVal: + top_outer: str + top_inner: str + btm_inner: str + btm_outer: str + + +@dataclass +class Apple2Val: + gap: str + top_outer: str + top_inner: str + btm_inner: str + btm_outer: str + + +class EnergyMinMax(BaseModel): + Minimum: float + Maximum: float + + +class EnergyCoverageEntry(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + Low: float + High: float + Poly: np.poly1d + + +class EnergyCoverage(RootModel): + root: dict[str, EnergyCoverageEntry] + + +class LookupTableEntries(BaseModel): + Energies: EnergyCoverage + Limit: EnergyMinMax + + +class Lookuptable(RootModel): + """ + BaseModel class for the lookup table. + Apple2 lookup table should be in this format. + + {mode: {'Energies': {Any: {'Low': float, + 'High': float, + 'Poly':np.poly1d + } + } + 'Limit': {'Minimum': float, + 'Maximum': float + } + } + } + """ + + root: dict[str, LookupTableEntries] + + +ROW_PHASE_MOTOR_TOLERANCE = 0.004 +MAXIMUM_ROW_PHASE_MOTOR_POSITION = 24.0 +MAXIMUM_GAP_MOTOR_POSITION = 100 + + +class UndulatorGap(StandardReadable, Movable): + """A device with a collection of epics signals to set Apple 2 undulator gap motion. + Only PV used by beamline are added the full list is here: + /dls_sw/work/R3.14.12.7/support/insertionDevice/db/IDGapVelocityControl.template + /dls_sw/work/R3.14.12.7/support/insertionDevice/db/IDPhaseSoftMotor.template + """ + + def __init__(self, prefix: str, name: str = ""): + """ + + Parameters + ---------- + prefix : str + Beamline specific part of the PV + name : str + Name of the Id device + + """ + + # Gap demand set point and readback + self.user_setpoint = epics_signal_rw( + str, prefix + "GAPSET.B", prefix + "BLGSET" + ) + # Nothing move until this is set to 1 and it will return to 0 when done + self.set_move = epics_signal_rw(int, prefix + "BLGSETP") + # Gate keeper open when move is requested, closed when move is completed + self.gate = epics_signal_r(UndulatorGateStatus, prefix + "BLGATE") + # These are gap velocity limit. + self.max_velocity = epics_signal_r(float, prefix + "BLGSETVEL.HOPR") + self.min_velocity = epics_signal_r(float, prefix + "BLGSETVEL.LOPR") + # These are gap limit. + self.high_limit_travel = epics_signal_r(float, prefix + "BLGAPMTR.HLM") + self.low_limit_travel = epics_signal_r(float, prefix + "BLGAPMTR.LLM") + split_pv = prefix.split("-") + self.fault = epics_signal_r( + float, + f"{split_pv[0]}-{split_pv[1]}-STAT-{split_pv[3]}ANYFAULT", + ) + # This is calculated acceleration from speed + self.acceleration_time = epics_signal_r(float, prefix + "IDGSETACC") + with self.add_children_as_readables(ConfigSignal): + # Unit + self.motor_egu = epics_signal_r(str, prefix + "BLGAPMTR.EGU") + # Gap velocity + self.velocity = epics_signal_rw(float, prefix + "BLGSETVEL") + with self.add_children_as_readables(HintedSignal): + # Gap readback value + self.user_readback = epics_signal_r(float, prefix + "CURRGAPD") + super().__init__(name) + + @AsyncStatus.wrap + async def set(self, value) -> None: + LOGGER.info(f"Setting {self.name} to {value}") + await self.check_id_status() + await self.user_setpoint.set(value=str(value)) + timeout = await self._cal_timeout() + LOGGER.info(f"Moving {self.name} to {value} with timeout = {timeout}") + await self.set_move.set(value=1, timeout=timeout) + await wait_for_value(self.gate, UndulatorGateStatus.close, timeout=timeout) + + async def _cal_timeout(self) -> float: + vel = await self.velocity.get_value() + cur_pos = await self.user_readback.get_value() + target_pos = float(await self.user_setpoint.get_value()) + return abs((target_pos - cur_pos) * 2.0 / vel) + 1 + + async def check_id_status(self) -> None: + if await self.fault.get_value() != 0: + raise RuntimeError(f"{self.name} is in fault state") + if await self.gate.get_value() == UndulatorGateStatus.open: + raise RuntimeError(f"{self.name} is already in motion.") + + async def get_timeout(self) -> float: + return await self._cal_timeout() + + +class UndulatorPhaseMotor(StandardReadable): + """A collection of epics signals for ID phase motion. + Only PV used by beamline are added the full list is here: + /dls_sw/work/R3.14.12.7/support/insertionDevice/db/IDPhaseSoftMotor.template + """ + + def __init__(self, prefix: str, infix: str, name: str = ""): + """ + Parameters + ---------- + + prefix : str + The setting prefix PV. + infix: str + Collection of pv that are different between beamlines + name : str + Name of the Id phase device + """ + fullPV = f"{prefix}BL{infix}" + self.user_setpoint = epics_signal_w(str, fullPV + "SET") + self.user_setpoint_demand_readback = epics_signal_r(float, fullPV + "DMD") + + fullPV = fullPV + "MTR" + with self.add_children_as_readables(HintedSignal): + self.user_setpoint_readback = epics_signal_r(float, fullPV + ".RBV") + + with self.add_children_as_readables(ConfigSignal): + self.motor_egu = epics_signal_r(str, fullPV + ".EGU") + self.velocity = epics_signal_rw(float, fullPV + ".VELO") + + self.max_velocity = epics_signal_r(float, fullPV + ".VMAX") + self.acceleration_time = epics_signal_rw(float, fullPV + ".ACCL") + self.precision = epics_signal_r(int, fullPV + ".PREC") + self.deadband = epics_signal_r(float, fullPV + ".RDBD") + self.motor_done_move = epics_signal_r(int, fullPV + ".DMOV") + self.low_limit_travel = epics_signal_rw(float, fullPV + ".LLM") + self.high_limit_travel = epics_signal_rw(float, fullPV + ".HLM") + super().__init__(name=name) + + +class UndulatorPhaseAxes(StandardReadable, Movable): + """ + A collection of 4 phase Motor to make up the full id phase motion. We are using the diamond pv convention. + e.g. top_outer == Q1 + top_inner == Q2 + btm_inner == q3 + btm_outer == q4 + + """ + + def __init__( + self, + prefix: str, + top_outer: str, + top_inner: str, + btm_inner: str, + btm_outer: str, + name: str = "", + ): + # Gap demand set point and readback + with self.add_children_as_readables(): + self.top_outer = UndulatorPhaseMotor(prefix=prefix, infix=top_outer) + self.top_inner = UndulatorPhaseMotor(prefix=prefix, infix=top_inner) + self.btm_inner = UndulatorPhaseMotor(prefix=prefix, infix=btm_inner) + self.btm_outer = UndulatorPhaseMotor(prefix=prefix, infix=btm_outer) + # Nothing move until this is set to 1 and it will return to 0 when done. + self.set_move = epics_signal_rw(int, f"{prefix}BL{top_outer}" + "MOVE") + self.gate = epics_signal_r(UndulatorGateStatus, prefix + "BLGATE") + split_pv = prefix.split("-") + temp_pv = f"{split_pv[0]}-{split_pv[1]}-STAT-{split_pv[3]}ANYFAULT" + self.fault = epics_signal_r(float, temp_pv) + super().__init__(name=name) + + @AsyncStatus.wrap + async def set(self, value: Apple2PhasesVal) -> None: + LOGGER.info(f"Setting {self.name} to {value}") + + await self.check_id_status() + + await asyncio.gather( + self.top_outer.user_setpoint.set(value=value.top_outer), + self.top_inner.user_setpoint.set(value=value.top_inner), + self.btm_inner.user_setpoint.set(value=value.btm_inner), + self.btm_outer.user_setpoint.set(value=value.btm_outer), + ) + timeout = await self._cal_timeout() + await self.set_move.set(value=1, timeout=timeout) + await wait_for_value(self.gate, UndulatorGateStatus.close, timeout=timeout) + + async def _cal_timeout(self) -> float: + """ + Get all four motor speed, current positions and target positions to calculate required timeout. + """ + velos = await asyncio.gather( + self.top_outer.velocity.get_value(), + self.top_inner.velocity.get_value(), + self.btm_inner.velocity.get_value(), + self.btm_outer.velocity.get_value(), + ) + target_pos = await asyncio.gather( + self.top_outer.user_setpoint_demand_readback.get_value(), + self.top_inner.user_setpoint_demand_readback.get_value(), + self.btm_inner.user_setpoint_demand_readback.get_value(), + self.btm_outer.user_setpoint_demand_readback.get_value(), + ) + cur_pos = await asyncio.gather( + self.top_outer.user_setpoint_readback.get_value(), + self.top_inner.user_setpoint_readback.get_value(), + self.btm_inner.user_setpoint_readback.get_value(), + self.btm_outer.user_setpoint_readback.get_value(), + ) + move_distances = tuple(np.subtract(target_pos, cur_pos)) + move_times = np.abs(np.divide(move_distances, velos)) + longest_move_time = np.max(move_times) + return longest_move_time * 2 + 1 + + async def check_id_status(self) -> None: + if await self.fault.get_value() != 0: + raise RuntimeError(f"{self.name} is in fault state") + if await self.gate.get_value() == UndulatorGateStatus.open: + raise RuntimeError(f"{self.name} is already in motion.") + + async def get_timeout(self) -> float: + return await self._cal_timeout() + + +class UndulatorJawPhase(StandardReadable, Movable): + """ + A JawPhase movable, this is use for moving the jaw phase which is use to control the + linear arbitrary polarisation but only one some of the beamline. + """ + + def __init__( + self, + prefix: str, + move_pv: str, + jaw_phase: str = "JAW", + name: str = "", + ): + # Gap demand set point and readback + with self.add_children_as_readables(): + self.jaw_phase = UndulatorPhaseMotor(prefix=prefix, infix=jaw_phase) + # Nothing move until this is set to 1 and it will return to 0 when done + self.set_move = epics_signal_rw(int, f"{prefix}BL{move_pv}" + "MOVE") + self.gate = epics_signal_r(UndulatorGateStatus, prefix + "BLGATE") + split_pv = prefix.split("-") + temp_pv = f"{split_pv[0]}-{split_pv[1]}-STAT-{split_pv[3]}ANYFAULT" + self.fault = epics_signal_r(float, temp_pv) + super().__init__(name=name) + + @AsyncStatus.wrap + async def set(self, value: float) -> None: + LOGGER.info(f"Setting {self.name} to {value}") + + await self.check_id_status() + + await asyncio.gather( + self.jaw_phase.user_setpoint.set(value=str(value)), + ) + timeout = await self._cal_timeout() + await self.set_move.set(value=1, timeout=timeout) + await wait_for_value(self.gate, UndulatorGateStatus.close, timeout=timeout) + + async def _cal_timeout(self) -> float: + """ + Get motor speed, current position and target position to calculate required timeout. + """ + velo, target_pos, cur_pos = await asyncio.gather( + self.jaw_phase.velocity.get_value(), + self.jaw_phase.user_setpoint_demand_readback.get_value(), + self.jaw_phase.user_setpoint_readback.get_value(), + ) + + move_distances = target_pos - cur_pos + move_times = np.abs(move_distances / velo) + + return move_times * 2 + 1 + + async def check_id_status(self) -> None: + if await self.fault.get_value() != 0: + raise RuntimeError(f"{self.name} is in fault state") + if await self.gate.get_value() == UndulatorGateStatus.open: + raise RuntimeError(f"{self.name} is already in motion.") + + async def get_timeout(self) -> float: + return await self._cal_timeout() + + +class Apple2(StandardReadable, Movable): + """ + Apple 2 ID/undulator has 4 extra degrees of freedom compare to the standard Undulator, + each bank of magnet can move independently to each other, + which allow the production of different x-ray polarisation as well as energy. + This type of ID is use on I10, I21, I09, I17 and I06 for soft x-ray. + + A pair of look up tables are needed to provide the conversion between motor position + and energy. + This conversion (update_lookuptable) and the way the id move (set) are two abstract + methods that are beamline specific and need to be implemented. + """ + + def __init__( + self, + id_gap: UndulatorGap, + id_phase: UndulatorPhaseAxes, + prefix: str = "", + name: str = "", + ) -> None: + """ + Parameters + ---------- + id_gap: + An UndulatorGap device. + id_phase: + An UndulatorPhaseAxes device. + prefix: + Not in use but needed for device_instantiation. + name: + Name of the device. + """ + super().__init__(name) + + # Attributes are set after super call so they are not renamed to + # -undulator, etc. + with self.add_children_as_readables(): + self.gap = id_gap + self.phase = id_phase + with self.add_children_as_readables(HintedSignal): + # Store the polarisation for readback. + self.polarisation, self._polarisation_set = soft_signal_r_and_setter( + str, initial_value=None + ) + # Store the set energy for readback. + self.energy, self._energy_set = soft_signal_r_and_setter( + float, initial_value=None + ) + # This store two lookup tables, Gap and Phase in the Lookuptable format + self.lookup_tables: dict[str, dict[str | None, dict[str, dict[str, Any]]]] = { + "Gap": {}, + "Phase": {}, + } + # List of available polarisation according to the lookup table. + self._available_pol = [] + # The polarisation state of the id that are use for internal checking before setting. + self._pol = None + """ + Abstract method that run at start up to load lookup tables into self.lookup_tables + and set available_pol. + """ + self.update_lookuptable() + + @property + def pol(self): + return self._pol + + @pol.setter + def pol(self, pol: str): + # This set the polarisation but does not actually move hardware. + if pol in self._available_pol: + self._pol = pol + else: + raise ValueError( + f"Polarisation {pol} is not available:" + + f"/n Polarisations available: {self._available_pol}" + ) + + async def _set(self, value: Apple2Val, energy: float) -> None: + """ + Check ID is in a movable state and set all the demand value before moving. + + """ + + # Only need to check gap as the phase motors share both fault and gate with gap. + await self.gap.check_id_status() + await asyncio.gather( + self.phase.top_outer.user_setpoint.set(value=value.top_outer), + self.phase.top_inner.user_setpoint.set(value=value.top_inner), + self.phase.btm_inner.user_setpoint.set(value=value.btm_inner), + self.phase.btm_outer.user_setpoint.set(value=value.btm_outer), + self.gap.user_setpoint.set(value=value.gap), + ) + timeout = np.max( + await asyncio.gather(self.gap.get_timeout(), self.phase.get_timeout()) + ) + LOGGER.info( + f"Moving f{self.name} energy and polorisation to {energy}, {self.pol}" + + f"with motor position {value}, timeout = {timeout}" + ) + + await asyncio.gather( + self.gap.set_move.set(value=1, timeout=timeout), + self.phase.set_move.set(value=1, timeout=timeout), + ) + await wait_for_value(self.gap.gate, UndulatorGateStatus.close, timeout=timeout) + self._energy_set(energy) # Update energy for after move for readback. + + def _get_id_gap_phase(self, energy: float) -> tuple[float, float]: + """ + Converts energy and polarisation to gap and phase. + """ + gap_poly = self._get_poly( + lookup_table=self.lookup_tables["Gap"], new_energy=energy + ) + phase_poly = self._get_poly( + lookup_table=self.lookup_tables["Phase"], new_energy=energy + ) + return gap_poly(energy), phase_poly(energy) + + def _get_poly( + self, + new_energy: float, + lookup_table: dict[str | None, dict[str, dict[str, Any]]], + ) -> np.poly1d: + """ + Get the correct polynomial for a given energy form lookuptable + for any given polarisation. + """ + + if ( + new_energy < lookup_table[self.pol]["Limit"]["Minimum"] + or new_energy > lookup_table[self.pol]["Limit"]["Maximum"] + ): + raise ValueError( + "Demanding energy must lie between {} and {} eV!".format( + lookup_table[self.pol]["Limit"]["Minimum"], + lookup_table[self.pol]["Limit"]["Maximum"], + ) + ) + else: + for energy_range in lookup_table[self.pol]["Energies"].values(): + if ( + new_energy >= energy_range["Low"] + and new_energy < energy_range["High"] + ): + return energy_range["Poly"] + + raise ValueError( + """Cannot find polynomial coefficients for your requested energy. + There might be gap in the calibration lookup table.""" + ) + + @abc.abstractmethod + def update_lookuptable(self) -> None: + """ + Abstract method to update the stored lookup tabled from file. + This function should include check to ensure the lookuptable is in the correct format: + # ensure the importing lookup table is the correct format + Lookuptable.model_validate() + + """ + + async def determinePhaseFromHardware(self) -> tuple[str | None, float]: + """ + Try to determine polarisation and phase value using row phase motor position pattern. + However there is no way to return lh3 polarisation or higher harmonic setting. + (May be for future one can use the inverse poly to work out the energy and try to match it with the current energy + to workout the polarisation but during my test the inverse poly is too unstable for general use.) + """ + cur_loc = await self.read() + top_outer = cur_loc[self.phase.top_outer.user_setpoint_readback.name]["value"] + top_inner = cur_loc[self.phase.top_inner.user_setpoint_readback.name]["value"] + btm_inner = cur_loc[self.phase.btm_inner.user_setpoint_readback.name]["value"] + btm_outer = cur_loc[self.phase.btm_outer.user_setpoint_readback.name]["value"] + gap = cur_loc[self.gap.user_readback.name]["value"] + if gap > MAXIMUM_GAP_MOTOR_POSITION: + raise RuntimeError( + f"{self.name} is not in use, close gap or set polarisation to use this ID" + ) + + if all( + motor_position_equal(x, 0.0) + for x in [top_outer, top_inner, btm_inner, btm_outer] + ): + # Linear Horizontal + polarisation = "lh" + phase = 0.0 + return polarisation, phase + if ( + motor_position_equal(top_outer, MAXIMUM_ROW_PHASE_MOTOR_POSITION) + and motor_position_equal(top_inner, 0.0) + and motor_position_equal(btm_inner, MAXIMUM_ROW_PHASE_MOTOR_POSITION) + and motor_position_equal(btm_outer, 0.0) + ): + # Linear Vertical + polarisation = "lv" + phase = MAXIMUM_ROW_PHASE_MOTOR_POSITION + return polarisation, phase + if ( + motor_position_equal(top_outer, btm_inner) + and top_outer > 0.0 + and motor_position_equal(top_inner, 0.0) + and motor_position_equal(btm_outer, 0.0) + ): + # Positive Circular + polarisation = "pc" + phase = top_outer + return polarisation, phase + if ( + motor_position_equal(top_outer, btm_inner) + and top_outer < 0.0 + and motor_position_equal(top_inner, 0.0) + and motor_position_equal(btm_outer, 0.0) + ): + # Negative Circular + polarisation = "nc" + phase = top_outer + return polarisation, phase + if ( + motor_position_equal(top_outer, -btm_inner) + and motor_position_equal(top_inner, 0.0) + and motor_position_equal(btm_outer, 0.0) + ): + # Positive Linear Arbitrary + polarisation = "la" + phase = top_outer + return polarisation, phase + if ( + motor_position_equal(top_inner, -btm_outer) + and motor_position_equal(top_outer, 0.0) + and motor_position_equal(btm_inner, 0.0) + ): + # Negative Linear Arbitrary + polarisation = "la" + phase = top_inner + return polarisation, phase + # UNKNOWN default + polarisation = None + phase = 0.0 + return (polarisation, phase) + + +def motor_position_equal(a, b) -> bool: + """ + Check motor is within tolerance. + """ + return abs(a - b) < ROW_PHASE_MOTOR_TOLERANCE diff --git a/src/dodal/devices/i10/i10_apple2.py b/src/dodal/devices/i10/i10_apple2.py new file mode 100644 index 0000000000..8c750dee40 --- /dev/null +++ b/src/dodal/devices/i10/i10_apple2.py @@ -0,0 +1,398 @@ +import asyncio +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import numpy as np +from bluesky.protocols import Movable +from ophyd_async.core import ( + AsyncStatus, + HintedSignal, + StandardReadable, + soft_signal_r_and_setter, + soft_signal_rw, +) + +from dodal.devices.apple2_undulator import ( + Apple2, + Apple2Val, + Lookuptable, + UndulatorGap, + UndulatorJawPhase, + UndulatorPhaseAxes, +) +from dodal.devices.pgm import PGM +from dodal.log import LOGGER + +ROW_PHASE_MOTOR_TOLERANCE = 0.004 +MAXIMUM_ROW_PHASE_MOTOR_POSITION = 24.0 +MAXIMUM_GAP_MOTOR_POSITION = 100 +DEFAULT_JAW_PHASE_POLY_PARAMS = [1.0 / 7.5, -120.0 / 7.5] +ALPHA_OFFSET = 180 + + +# data class to store the lookup table configuration that is use in convert_csv_to_lookup +@dataclass +class LookupPath: + Gap: Path + Phase: Path + + +@dataclass +class LookupTableConfig: + path: LookupPath + source: tuple[str, str] + mode: str | None + min_energy: str | None + max_energy: str | None + poly_deg: list | None + + +class I10Apple2(Apple2): + """ + I10Apple2 is the i10 version of Apple2 ID. + The set and update_lookuptable should be the only part that is I10 specific. + + A pair of look up tables are needed to provide the conversion + between motor position and energy. + Set is in energy(eV). + """ + + def __init__( + self, + id_gap: UndulatorGap, + id_phase: UndulatorPhaseAxes, + id_jaw_phase: UndulatorJawPhase, + energy_gap_table_path: Path, + energy_phase_table_path: Path, + source: tuple[str, str], + prefix: str = "", + mode: str = "Mode", + min_energy: str = "MinEnergy", + max_energy: str = "MaxEnergy", + poly_deg: list | None = None, + name: str = "", + ) -> None: + """ + Parameters + ---------- + id_gap: + An UndulatorGap device. + id_phase: + An UndulatorPhaseAxes device. + energy_gap_table_path: + The path to id gap look up table. + energy_phase_table_path: + The path to id phase look up table. + source: + The column name and the name of the source in look up table. e.g. ("source", "idu") + mode: + The column name of the mode in look up table. + min_energy: + The column name that contain the maximum energy in look up table. + max_energy: + The column name that contain the maximum energy in look up table. + poly_deg: + The column names for the parameters for the energy conversion polynomial, starting with the least significant. + prefix: + Not in use but needed for device_instantiation. + Name: + Name of the device + """ + + # A dataclass contains the path to the look up table and the expected column names. + self.lookup_table_config = LookupTableConfig( + path=LookupPath(Gap=energy_gap_table_path, Phase=energy_phase_table_path), + source=source, + mode=mode, + min_energy=min_energy, + max_energy=max_energy, + poly_deg=poly_deg, + ) + + super().__init__( + id_gap=id_gap, + id_phase=id_phase, + prefix=prefix, + name=name, + ) + with self.add_children_as_readables(): + self.id_jaw_phase = id_jaw_phase + + @AsyncStatus.wrap + async def set(self, value: float) -> None: + """ + Check polarisation state and use it together with the energy(value) + to calculate the required gap and phases before setting it. + """ + if self.pol is None: + LOGGER.warning("Polarisation not set attempting to read from hardware") + pol, phase = await self.determinePhaseFromHardware() + if pol is None: + raise ValueError(f"Pol is not set for {self.name}") + self.pol = pol + + self._polarisation_set(self.pol) + gap, phase = self._get_id_gap_phase(value) + phase3 = phase * (-1 if self.pol == "la" else (1)) + id_set_val = Apple2Val( + top_outer=str(phase), + top_inner="0.0", + btm_inner=str(phase3), + btm_outer="0.0", + gap=str(gap), + ) + LOGGER.info(f"Setting polarisation to {self.pol}, with {id_set_val}") + await self._set(value=id_set_val, energy=value) + if self.pol != "la": + await self.id_jaw_phase.set(0) + await self.id_jaw_phase.set_move.set(1) + + def update_lookuptable(self): + """ + Update the stored lookup tabled from file. + + """ + LOGGER.info("Updating lookup dictionary from file.") + for key, path in self.lookup_table_config.path.__dict__.items(): + if path.exists(): + self.lookup_tables[key] = convert_csv_to_lookup( + file=path, + source=self.lookup_table_config.source, + mode=self.lookup_table_config.mode, + min_energy=self.lookup_table_config.min_energy, + max_energy=self.lookup_table_config.max_energy, + poly_deg=self.lookup_table_config.poly_deg, + ) + # ensure the importing lookup table is the correct format + Lookuptable.model_validate(self.lookup_tables[key]) + else: + raise FileNotFoundError(f"{key} look up table is not in path: {path}") + + self._available_pol = list(self.lookup_tables["Gap"].keys()) + + +class I10Apple2PGM(StandardReadable, Movable): + """ + Compound device to set both ID and PGM energy at the sample time,poly_deg + + """ + + def __init__( + self, id: I10Apple2, pgm: PGM, prefix: str = "", name: str = "" + ) -> None: + """ + Parameters + ---------- + id: + An Apple2 device. + pgm: + A PGM/mono device. + prefix: + Not in use but needed for device_instantiation. + name: + New device name. + """ + super().__init__(name=name) + with self.add_children_as_readables(): + self.id = id + self.pgm = pgm + with self.add_children_as_readables(HintedSignal): + self.energy_offset = soft_signal_rw(float, initial_value=0) + + @AsyncStatus.wrap + async def set(self, value: float) -> None: + LOGGER.info(f"Moving f{self.name} energy to {value}.") + await asyncio.gather( + self.id.set(value=value + await self.energy_offset.get_value()), + self.pgm.energy.set(value), + ) + + +class I10Apple2Pol(StandardReadable, Movable): + """ + Compound device to set polorisation of ID. + """ + + def __init__(self, id: I10Apple2, prefix: str = "", name: str = "") -> None: + """ + Parameters + ---------- + id: + An I10Apple2 device. + prefix: + Not in use but needed for device_instantiation. + name: + New device name. + """ + super().__init__(name=name) + with self.add_children_as_readables(): + self.id = id + + @AsyncStatus.wrap + async def set(self, value: str) -> None: + self.id.pol = value # change polarisation. + LOGGER.info(f"Changing f{self.name} polarisation to {value}.") + await self.id.set( + await self.id.energy.get_value() + ) # Move id to new polarisation + + +class LinearArbitraryAngle(StandardReadable, Movable): + """ + Device to set polorisation angle of the ID. Linear Arbitrary Angle (laa) + is the direction of the magnetic field which can be change by varying the jaw_phase + in (linear arbitrary (la) mode, + The angle of 0 is equivalent to linear horizontal "lh" (sigma) and + 90 is linear vertical "lv" (pi). + This device require a jaw_phase to angle conversion which is done via a polynomial. + """ + + def __init__( + self, + id: I10Apple2, + prefix: str = "", + name: str = "", + jaw_phase_limit: float = 12.0, + jaw_phase_poly_param: list[float] = DEFAULT_JAW_PHASE_POLY_PARAMS, + angle_threshold_deg=30.0, + ) -> None: + """ + Parameters + ---------- + id: I10Apple2 + An I10Apple2 device. + prefix: str + Not in use but needed for device_instantiation. + name: str + New device name. + jaw_phase_limit: float + The maximum allowed jaw_phase movement. + jaw_phase_poly_param: list + polynomial parameters highest power first. + """ + super().__init__(name=name) + with self.add_children_as_readables(): + self.id = id + self.jaw_phase_from_angle = np.poly1d(jaw_phase_poly_param) + self.angle_threshold_deg = angle_threshold_deg + self.jaw_phase_limit = jaw_phase_limit + with self.add_children_as_readables(HintedSignal): + self.angle, self._angle_set = soft_signal_r_and_setter( + float, initial_value=None + ) + + @AsyncStatus.wrap + async def set(self, value: float) -> None: + pol = self.id.pol + if pol != "la": + raise RuntimeError( + f"Angle control is not available in polarisation {pol} with {self.id.name}" + ) + # Moving to real angle which is 210 to 30. + alpha_real = value if value > self.angle_threshold_deg else value + ALPHA_OFFSET + jaw_phase = self.jaw_phase_from_angle(alpha_real) + if abs(jaw_phase) > self.jaw_phase_limit: + raise RuntimeError( + f"jaw_phase position for angle ({value}) is outside permitted range" + f" [-{self.jaw_phase_limit}, {self.jaw_phase_limit}]" + ) + await self.id.id_jaw_phase.set(jaw_phase) + self._angle_set(value) + + +def convert_csv_to_lookup( + file: str, + source: tuple[str, str], + mode: str | None = "Mode", + min_energy: str | None = "MinEnergy", + max_energy: str | None = "MaxEnergy", + poly_deg: list | None = None, +) -> dict[str | None, dict[str, dict[str, dict[str, Any]]]]: + """ + Convert csv to a dictionary that can be read by Apple2 ID device. + + Parameters + ----------- + file: str + File path. + source: tuple[str, str] + Tuple(column name, source name) + e.g. ("Source", "idu"). + mode: str = "Mode" + Column name for the available modes, "lv","lh","pc","nc" etc + min_energy: str = "MinEnergy": + Column name for min energy for the polynomial. + max_energy: str = "MaxEnergy", + Column name for max energy for the polynomial. + poly_deg: list | None = None, + Column names for the parameters for the polynomial, starting with the least significant. + + return + ------ + return a dictionary that conform to Apple2 lookup table format: + + {mode: {'Energies': {Any: {'Low': float, + 'High': float, + 'Poly':np.poly1d + } + } + 'Limit': {'Minimum': float, + 'Maximum': float + } + } + } + """ + if poly_deg is None: + poly_deg = [ + "7th-order", + "6th-order", + "5th-order", + "4th-order", + "3rd-order", + "2nd-order", + "1st-order", + "b", + ] + look_up_table = {} + pol = [] + + def data2dict(row) -> None: + # logic for the conversion for each row of data. + if row[mode] not in pol: + pol.append(row[mode]) + look_up_table[row[mode]] = {} + look_up_table[row[mode]] = { + "Energies": {}, + "Limit": { + "Minimum": float(row[min_energy]), + "Maximum": float(row[max_energy]), + }, + } + + # create polynomial object for energy to gap/phase + cof = [float(row[x]) for x in poly_deg] + poly = np.poly1d(cof) + + look_up_table[row[mode]]["Energies"][row[min_energy]] = { + "Low": float(row[min_energy]), + "High": float(row[max_energy]), + "Poly": poly, + } + look_up_table[row[mode]]["Limit"]["Minimum"] = min( + look_up_table[row[mode]]["Limit"]["Minimum"], float(row[min_energy]) + ) + look_up_table[row[mode]]["Limit"]["Maximum"] = max( + look_up_table[row[mode]]["Limit"]["Maximum"], float(row[max_energy]) + ) + + with open(file, newline="") as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + # If there are multiple source only convert requested. + if row[source[0]] == source[1]: + data2dict(row=row) + if not look_up_table: + raise RuntimeError(f"Unable to convert lookup table:/n/t{file}") + return look_up_table diff --git a/src/dodal/devices/i10/i10_setting_data.py b/src/dodal/devices/i10/i10_setting_data.py new file mode 100644 index 0000000000..eadbb98fb5 --- /dev/null +++ b/src/dodal/devices/i10/i10_setting_data.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class I10Grating(str, Enum): + Au400 = "400 line/mm Au" + Si400 = "400 line/mm Si" + Au1200 = "1200 line/mm Au" diff --git a/src/dodal/devices/pgm.py b/src/dodal/devices/pgm.py new file mode 100644 index 0000000000..a83ceccbae --- /dev/null +++ b/src/dodal/devices/pgm.py @@ -0,0 +1,41 @@ +from enum import Enum + +from ophyd_async.core import ( + ConfigSignal, + StandardReadable, +) +from ophyd_async.epics.motor import Motor +from ophyd_async.epics.signal import epics_signal_rw + + +class PGM(StandardReadable): + """ + Plane grating monochromator, it is use in soft x-ray beamline to generate monochromic beam. + """ + + def __init__( + self, + prefix: str, + grating: type[Enum], + gratingPv: str, + name: str = "", + ) -> None: + """ + Parameters + ---------- + prefix: + Beamline specific part of the PV + grating: + The Enum for the grating table. + gratingPv: + The suffix pv part of grating Pv + name: + Name of the device + """ + with self.add_children_as_readables(): + self.energy = Motor(prefix + "ENERGY") + with self.add_children_as_readables(ConfigSignal): + self.grating = epics_signal_rw(grating, prefix + gratingPv) + self.cff = epics_signal_rw(float, prefix + "CFF") + + super().__init__(name=name) diff --git a/tests/devices/i10/lookupTables/IDEnergy2GapCalibrations.csv b/tests/devices/i10/lookupTables/IDEnergy2GapCalibrations.csv new file mode 100644 index 0000000000..b32b29af41 --- /dev/null +++ b/tests/devices/i10/lookupTables/IDEnergy2GapCalibrations.csv @@ -0,0 +1,53 @@ +Source,Mode,MinEnergy,MaxEnergy,b,1st-order,2nd-order,3rd-order,4th-order,5th-order,6th-order,7th-order +idu,lh,255.3,513.28,3.88755,0.0641791,-7.52562e-05,4.33435e-08,0,0,0,0 +idu,lh,513.28,771.545,8.1496,0.0393435,-2.61385e-05,1.04394e-08,0,0,0,0 +idu,lh,771.545,1027.52,7.9205,0.0393892,-2.51296e-05,9.55013e-09,0,0,0,0 +idu,lh,1027.52,1282.59,6.60845,0.0430451,-2.84144e-05,1.04932e-08,0,0,0,0 +idu,lh,1282.59,1700.0,-160.378,0.421009,-0.000314196,8.26909e-08,0,0,0,0 +idu,lv,521.99,734.983,6.96008,0.0202971,-6.57463e-06,1.67544e-09,0,0,0,0 +idu,lv,734.983,932.69,7.8749,0.0181492,-5.82037e-06,2.32314e-09,0,0,0,0 +idu,lv,932.69,1129.16,3.24578,0.0331944,-2.22386e-05,8.33737e-09,0,0,0,0 +idu,lv,1129.16,1325.74,-14.1739,0.0777255,-6.01726e-05,1.91056e-08,0,0,0,0 +idu,lv,1325.74,1700.0,-172.027,0.423051,-0.000312082,8.03864e-08,0,0,0,0 +idu,pc,418.566,637.695,5.13046,0.0368795,-2.99885e-05,1.35966e-08,0,0,0,0 +idu,pc,637.695,855.483,6.88328,0.0285808,-1.67934e-05,6.54757e-09,0,0,0,0 +idu,pc,855.483,1071.32,6.03968,0.0309807,-1.90157e-05,7.2133e-09,0,0,0,0 +idu,pc,1071.32,1287.03,-5.94187,0.0630626,-4.76693e-05,1.57511e-08,0,0,0,0 +idu,pc,1287.03,1700.0,-97.3851,0.270486,-0.00020465,5.53938e-08,0,0,0,0 +idu,nc,426.626,646.014,5.26037,0.0361496,-2.86867e-05,1.28076e-08,0,0,0,0 +idu,nc,646.014,863.674,6.39722,0.030442,-1.92019e-05,7.57955e-09,0,0,0,0 +idu,nc,863.674,1079.84,7.21445,0.0273263,-1.52587e-05,5.92247e-09,0,0,0,0 +idu,nc,1079.84,1295.74,-7.52454,0.0668457,-5.06815e-05,1.65398e-08,0,0,0,0 +idu,nc,1295.74,1700.0,-104.872,0.28583,-0.000215111,5.77565e-08,0,0,0,0 +idu,lh3,769.119,1028.01,1.83429,0.0274816,-1.4457e-05,3.62736e-09,0,0,0,0 +idu,lh3,1028.01,1286.16,3.87065,0.0214904,-8.56945e-06,1.69557e-09,0,0,0,0 +idu,lh3,1286.16,1544.17,6.38981,0.0157398,-4.20572e-06,5.94806e-10,0,0,0,0 +idu,lh3,1544.17,1794.62,-11.6837,0.0499948,-2.58804e-05,5.17381e-09,0,0,0,0 +idu,lh3,1794.62,2100,-156.01,0.268796,-0.000135379,2.32221e-08,0,0,0,0 +idu,la,654,1700.0,-43.829175,0.368501,-0.0010666011,1.7879222e-06,-1.7983774e-09,1.0820994e-12,-3.6007756e-16,5.1239781e-20 +idd,lh,249.772,487.644,4.12064,0.0680339,-8.56005e-05,5.23532e-08,0,0,0,0 +idd,lh,487.644,731.037,6.81039,0.0492164,-4.20426e-05,1.89432e-08,0,0,0,0 +idd,lh,731.037,980.599,12.0757,0.027081,-1.05292e-05,3.77061e-09,0,0,0,0 +idd,lh,980.599,1236.31,5.79522,0.0473828,-3.24713e-05,1.16963e-08,0,0,0,0 +idd,lh,1236.31,1700.0,-61.7239,0.206534,-0.000157637,4.4545e-08,0,0,0,0 +idd,lv,496.652,689.325,3.53496,0.0400226,-3.86443e-05,1.90295e-08,0,0,0,0 +idd,lv,689.325,886.548,6.65066,0.0247265,-1.35913e-05,5.35915e-09,0,0,0,0 +idd,lv,886.548,1088.57,1.29591,0.0412981,-3.06327e-05,1.1181e-08,0,0,0,0 +idd,lv,1088.57,1294.24,-8.58906,0.0655113,-5.01767e-05,1.63633e-08,0,0,0,0 +idd,lv,1294.24,1700.0,-91.2008,0.250993,-0.00018901,5.10085e-08,0,0,0,0 +idd,pc,385.505,596.042,4.67408,0.0433433,-4.29176e-05,2.21358e-08,0,0,0,0 +idd,pc,596.042,810.876,6.97271,0.0307363,-1.97097e-05,7.83367e-09,0,0,0,0 +idd,pc,810.876,1030.87,8.69789,0.0244055,-1.19019e-05,4.5975e-09,0,0,0,0 +idd,pc,1030.87,1255.31,6.55313,0.0324129,-2.13844e-05,8.21876e-09,0,0,0,0 +idd,pc,1255.31,1700.0,-49.1463,0.164373,-0.000125734,3.57615e-08,0,0,0,0 +idd,nc,394.958,605.487,4.82301,0.0422041,-4.02467e-05,2.01446e-08,0,0,0,0 +idd,nc,605.487,820.328,7.1421,0.0300373,-1.87797e-05,7.4329e-09,0,0,0,0 +idd,nc,820.328,1040.05,8.04047,0.0266286,-1.44209e-05,5.55491e-09,0,0,0,0 +idd,nc,1040.05,1264.12,-2.79321,0.0572147,-4.32987e-05,1.46731e-08,0,0,0,0 +idd,nc,1264.12,1700.0,-75.8958,0.22423,-0.00017034,4.68429e-08,0,0,0,0 +idd,lh3,710.953,951.552,1.71019,0.0304018,-1.78548e-05,4.92026e-09,0,0,0,0 +idd,lh3,951.552,1193.71,4.05097,0.0230031,-1.00382e-05,2.15916e-09,0,0,0,0 +idd,lh3,1193.71,1438.26,7.78066,0.013952,-2.70169e-06,1.72396e-10,0,0,0,0 +idd,lh3,1438.26,1684.34,6.84516,0.0161913,-4.51536e-06,6.65626e-10,0,0,0,0 +idd,lh3,1684.34,2100,49.5656,-0.0582239,3.85587e-05,-7.61825e-09,0,0,0,0 +idd,la,610,1001.28640795282,800.865724,-7.60298294,0.0309135438,-6.88548924e-05,9.10074539e-08,-7.14462774e-11,3.08703041e-14,-5.66653297e-18 diff --git a/tests/devices/i10/lookupTables/IDEnergy2PhaseCalibrations.csv b/tests/devices/i10/lookupTables/IDEnergy2PhaseCalibrations.csv new file mode 100644 index 0000000000..d9dc679a4a --- /dev/null +++ b/tests/devices/i10/lookupTables/IDEnergy2PhaseCalibrations.csv @@ -0,0 +1,29 @@ +Source,Mode,MinEnergy,MaxEnergy,b,1st-order,2nd-order,3rd-order,4th-order,5th-order,6th-order,7th-order +idu,pc,855.483,1071.32,14.27,0.00321593,-1.46775E-06,5.00523E-10,0,0,0,0 +idu,pc,1071.32,1287.03,15.739,-0.000289772,1.28573E-06,-2.10213E-10,0,0,0,0 +idu,pc,1287.03,1700,11.4359,0.0105901,-7.83598E-06,2.32768E-09,0,0,0,0 +idu,nc,426.626,646.014,-13.8004,-0.00463626,2.71849E-06,-7.28072E-10,0,0,0,0 +idu,nc,646.014,863.674,-14.0729,-0.00364046,1.64441E-06,-4.38739E-10,0,0,0,0 +idu,nc,863.674,1079.84,-13.0961,-0.00679536,5.09164E-06,-1.71735E-09,0,0,0,0 +idu,nc,1079.84,1295.74,-11.888,-0.0095199,7.02495E-06,-2.13009E-09,0,0,0,0 +idu,nc,1295.74,1700,-7.59802,-0.018822,1.37127E-05,-3.72318E-09,0,0,0,0 +idu,la,654,1700,3.1537295,-0.11906434,0.00034602416,-5.6632637E-07,5.5036409E-10,-3.1630855E-13,9.9437208E-17,-1.3207099E-20 +idd,pc,385.505,596.042,13.2841,0.00827853,-9.91799E-06,5.3993E-09,0,0,0,0 +idd,pc,596.042,810.876,14.0914,0.00400033,-2.22442E-06,7.20919E-10,0,0,0,0 +idd,pc,810.876,1030.87,14.8875,0.00143318,5.1847E-07,-2.49769E-10,0,0,0,0 +idd,pc,1030.87,1255.31,15.4992,0.000445666,7.26373E-07,-8.06145E-11,0,0,0,0 +idd,pc,1255.31,1700,14.46,0.00382958,-2.6632E-06,9.97774E-10,0,0,0,0 +idd,nc,394.958,605.487,-13.6686,-0.00588126,4.99786E-06,-2.07139E-09,0,0,0,0 +idd,nc,605.487,820.328,-14.0886,-0.00394451,2.05578E-06,-6.01609E-10,0,0,0,0 +idd,nc,820.328,1040.05,-13.3465,-0.00643408,4.87638E-06,-1.68506E-09,0,0,0,0 +idd,nc,1040.05,1264.12,-12.8173,-0.0075141,5.46726E-06,-1.72449E-09,0,0,0,0 +idd,nc,1264.12,1700,-9.07942,-0.0157672,1.14767E-05,-3.16431E-09,0,0,0,0 +idd,la,610,1001.28640795282,1400.13845,-13.0099589,0.0518055008,-0.000113471444,1.47749738E-07,-1.14420814E-10,4.88195989E-14,-8.85680201E-18 +idd,lh,249.772,1700,0,0,0,0,0,0,0,0 +idu,lh,255.3,1700,0,0,0,0,0,0,0,0 +idd,lv,496.652,1700,24,0,0,0,0,0,0,0 +idu,lv,521.99,1700,24,0,0,0,0,0,0,0 +idd,lh3,710.953,2100,0,0,0,0,0,0,0,0 +idu,lh3,769.119,2100,0,0,0,0,0,0,0,0 +idu,pc,418.566,637.695,13.4873,0.00640997,-6.03096E-06,2.77583E-09,0,0,0,0 +idu,pc,637.695,855.483,14.4496,0.00216711,2.76266E-07,-3.9213E-10,0,0,0,0 diff --git a/tests/devices/i10/lookupTables/expectedIDEnergy2GapCalibrationsIdd.pkl b/tests/devices/i10/lookupTables/expectedIDEnergy2GapCalibrationsIdd.pkl new file mode 100644 index 0000000000..03843b0276 Binary files /dev/null and b/tests/devices/i10/lookupTables/expectedIDEnergy2GapCalibrationsIdd.pkl differ diff --git a/tests/devices/i10/lookupTables/expectedIDEnergy2GapCalibrationsIdu.pkl b/tests/devices/i10/lookupTables/expectedIDEnergy2GapCalibrationsIdu.pkl new file mode 100644 index 0000000000..048b211abe Binary files /dev/null and b/tests/devices/i10/lookupTables/expectedIDEnergy2GapCalibrationsIdu.pkl differ diff --git a/tests/devices/i10/lookupTables/expectedIDEnergy2PhaseCalibrationsidd.pkl b/tests/devices/i10/lookupTables/expectedIDEnergy2PhaseCalibrationsidd.pkl new file mode 100644 index 0000000000..4410b4b7d1 Binary files /dev/null and b/tests/devices/i10/lookupTables/expectedIDEnergy2PhaseCalibrationsidd.pkl differ diff --git a/tests/devices/i10/lookupTables/expectedIDEnergy2PhaseCalibrationsidu.pkl b/tests/devices/i10/lookupTables/expectedIDEnergy2PhaseCalibrationsidu.pkl new file mode 100644 index 0000000000..939b0ef473 Binary files /dev/null and b/tests/devices/i10/lookupTables/expectedIDEnergy2PhaseCalibrationsidu.pkl differ diff --git a/tests/devices/i10/test_i10Apple2.py b/tests/devices/i10/test_i10Apple2.py new file mode 100644 index 0000000000..a02c4c6e4d --- /dev/null +++ b/tests/devices/i10/test_i10Apple2.py @@ -0,0 +1,485 @@ +import pickle +from collections import defaultdict +from pathlib import Path +from unittest import mock +from unittest.mock import Mock + +import numpy as np +import pytest +from bluesky.plans import scan +from bluesky.run_engine import RunEngine +from numpy import poly1d +from ophyd_async.core import ( + DeviceCollector, + assert_emitted, + callback_on_mock_put, + get_mock_put, + set_mock_value, +) + +from dodal.devices.apple2_undulator import ( + UndulatorGap, + UndulatorGateStatus, + UndulatorJawPhase, + UndulatorPhaseAxes, +) +from dodal.devices.i10.i10_apple2 import ( + DEFAULT_JAW_PHASE_POLY_PARAMS, + I10Apple2, + I10Apple2PGM, + I10Apple2Pol, + LinearArbitraryAngle, + convert_csv_to_lookup, +) +from dodal.devices.i10.i10_setting_data import I10Grating +from dodal.devices.pgm import PGM + +ID_GAP_LOOKUP_TABLE = "tests/devices/i10/lookupTables/IDEnergy2GapCalibrations.csv" +ID_PHASE_LOOKUP_TABLE = "tests/devices/i10/lookupTables/IDEnergy2PhaseCalibrations.csv" + + +@pytest.fixture +async def mock_id_gap(prefix: str = "BLXX-EA-DET-007:") -> UndulatorGap: + async with DeviceCollector(mock=True): + mock_id_gap = UndulatorGap(prefix, "mock_id_gap") + assert mock_id_gap.name == "mock_id_gap" + set_mock_value(mock_id_gap.gate, UndulatorGateStatus.close) + set_mock_value(mock_id_gap.velocity, 1) + set_mock_value(mock_id_gap.user_readback, 20) + set_mock_value(mock_id_gap.user_setpoint, "20") + set_mock_value(mock_id_gap.fault, 0) + return mock_id_gap + + +@pytest.fixture +async def mock_phaseAxes(prefix: str = "BLXX-EA-DET-007:") -> UndulatorPhaseAxes: + async with DeviceCollector(mock=True): + mock_phaseAxes = UndulatorPhaseAxes( + prefix=prefix, + top_outer="RPQ1", + top_inner="RPQ2", + btm_outer="RPQ3", + btm_inner="RPQ4", + ) + assert mock_phaseAxes.name == "mock_phaseAxes" + set_mock_value(mock_phaseAxes.gate, UndulatorGateStatus.close) + set_mock_value(mock_phaseAxes.top_outer.velocity, 2) + set_mock_value(mock_phaseAxes.top_inner.velocity, 2) + set_mock_value(mock_phaseAxes.btm_outer.velocity, 2) + set_mock_value(mock_phaseAxes.btm_inner.velocity, 2) + set_mock_value(mock_phaseAxes.top_outer.user_setpoint_readback, 0) + set_mock_value(mock_phaseAxes.top_inner.user_setpoint_readback, 0) + set_mock_value(mock_phaseAxes.btm_outer.user_setpoint_readback, 0) + set_mock_value(mock_phaseAxes.btm_inner.user_setpoint_readback, 0) + set_mock_value(mock_phaseAxes.top_outer.user_setpoint_demand_readback, 0) + set_mock_value(mock_phaseAxes.top_inner.user_setpoint_demand_readback, 0) + set_mock_value(mock_phaseAxes.btm_outer.user_setpoint_demand_readback, 0) + set_mock_value(mock_phaseAxes.btm_inner.user_setpoint_demand_readback, 0) + set_mock_value(mock_phaseAxes.fault, 0) + return mock_phaseAxes + + +@pytest.fixture +async def mock_pgm(prefix: str = "BLXX-EA-DET-007:") -> PGM: + async with DeviceCollector(mock=True): + mock_pgm = PGM(prefix=prefix, grating=I10Grating, gratingPv="NLINES2") + return mock_pgm + + +@pytest.fixture +async def mock_jaw_phase(prefix: str = "BLXX-EA-DET-007:") -> UndulatorJawPhase: + async with DeviceCollector(mock=True): + mock_jaw_phase = UndulatorJawPhase( + prefix=prefix, move_pv="RPQ1", jaw_phase="JAW" + ) + set_mock_value(mock_jaw_phase.gate, UndulatorGateStatus.close) + set_mock_value(mock_jaw_phase.jaw_phase.velocity, 2) + set_mock_value(mock_jaw_phase.jaw_phase.user_setpoint_readback, 0) + set_mock_value(mock_jaw_phase.fault, 0) + return mock_jaw_phase + + +@pytest.fixture +async def mock_id( + mock_phaseAxes: UndulatorPhaseAxes, + mock_id_gap: UndulatorGap, + mock_jaw_phase: UndulatorJawPhase, +) -> I10Apple2: + async with DeviceCollector(mock=True): + mock_id = I10Apple2( + id_gap=mock_id_gap, + id_phase=mock_phaseAxes, + id_jaw_phase=mock_jaw_phase, + energy_gap_table_path=Path(ID_GAP_LOOKUP_TABLE), + energy_phase_table_path=Path(ID_PHASE_LOOKUP_TABLE), + source=("Source", "idu"), + ) + return mock_id + + +@pytest.fixture +async def mock_id_pgm(mock_id: I10Apple2, mock_pgm: PGM) -> I10Apple2PGM: + async with DeviceCollector(mock=True): + mock_id_pgm = I10Apple2PGM(id=mock_id, pgm=mock_pgm) + set_mock_value(mock_id_pgm.pgm.energy.velocity, 1) + return mock_id_pgm + + +@pytest.fixture +async def mock_id_pol(mock_id: I10Apple2) -> I10Apple2Pol: + async with DeviceCollector(mock=True): + mock_id_pol = I10Apple2Pol(id=mock_id) + + return mock_id_pol + + +@pytest.fixture +async def mock_linear_arbitrary_angle( + mock_id: I10Apple2, prefix: str = "BLXX-EA-DET-007:" +) -> LinearArbitraryAngle: + async with DeviceCollector(mock=True): + mock_linear_arbitrary_angle = LinearArbitraryAngle(id=mock_id) + return mock_linear_arbitrary_angle + + +@pytest.mark.parametrize( + "pol, top_outer_phase,top_inner_phase,btm_inner_phase, btm_outer_phase", + [ + ("lh", 0, 0, 0, 0), + ("lv", 24.0, 0, 24.0, 0), + ("pc", 12, 0, 12, 0), + ("nc", -12, 0, -12, 0), + ("la", 12, 0, -12, 0), + ("la", 0, 12, 0, -12), + ("la", -11, 0, 11, 0), + (None, 8, 12, 2, -12), + (None, 11, 0, 10, 0), + ], +) +async def test_I10Apple2_determine_pol( + mock_id: I10Apple2, + pol: None | str, + top_inner_phase: float, + top_outer_phase: float, + btm_inner_phase: float, + btm_outer_phase: float, +): + set_mock_value(mock_id.phase.top_inner.user_setpoint_readback, top_inner_phase) + set_mock_value(mock_id.phase.top_outer.user_setpoint_readback, top_outer_phase) + set_mock_value(mock_id.phase.btm_inner.user_setpoint_readback, btm_inner_phase) + set_mock_value(mock_id.phase.btm_outer.user_setpoint_readback, btm_outer_phase) + + if pol is None: + with pytest.raises(ValueError): + await mock_id.set(800) + else: + await mock_id.set(800) + + assert mock_id.pol == pol + + +async def test_fail_I10Apple2_no_lookup( + mock_phaseAxes: UndulatorPhaseAxes, + mock_id_gap: UndulatorGap, + mock_jaw_phase: UndulatorJawPhase, +): + wrong_path = Path("fnslkfndlsnf") + with pytest.raises(FileNotFoundError) as e: + I10Apple2( + id_gap=mock_id_gap, + id_phase=mock_phaseAxes, + id_jaw_phase=mock_jaw_phase, + energy_gap_table_path=wrong_path, + energy_phase_table_path=Path(ID_PHASE_LOOKUP_TABLE), + source=("Source", "idu"), + ) + assert str(e.value) == f"Gap look up table is not in path: {wrong_path}" + + +@pytest.mark.parametrize("energy", [(100), (2500), (-299)]) +async def test_fail_I10Apple2_set_outside_energy_limits( + mock_id: I10Apple2, energy: float +): + with pytest.raises(ValueError) as e: + await mock_id.set(energy) + assert str(e.value) == "Demanding energy must lie between {} and {} eV!".format( + mock_id.lookup_tables["Gap"][mock_id.pol]["Limit"]["Minimum"], + mock_id.lookup_tables["Gap"][mock_id.pol]["Limit"]["Maximum"], + ) + + +async def test_fail_I10Apple2_set_lookup_gap_pol(mock_id: I10Apple2): + # make gap in energy + mock_id.lookup_tables["Gap"]["lh"]["Energies"] = { + "1": { + "Low": 255.3, + "High": 500, + "Poly": poly1d([4.33435e-08, -7.52562e-05, 6.41791e-02, 3.88755e00]), + } + } + mock_id.lookup_tables["Gap"]["lh"]["Energies"] = { + "2": { + "Low": 600, + "High": 1000, + "Poly": poly1d([4.33435e-08, -7.52562e-05, 6.41791e-02, 3.88755e00]), + } + } + with pytest.raises(ValueError) as e: + await mock_id.set(555) + assert ( + str(e.value) + == """Cannot find polynomial coefficients for your requested energy. + There might be gap in the calibration lookup table.""" + ) + + +async def test_fail_I10Apple2_set_undefined_pol(mock_id: I10Apple2): + set_mock_value(mock_id.gap.user_readback, 101) + with pytest.raises(RuntimeError) as e: + await mock_id.set(600) + assert ( + str(e.value) + == mock_id.name + " is not in use, close gap or set polarisation to use this ID" + ) + + +async def test_fail_I10Apple2_set_id_not_ready(mock_id: I10Apple2): + set_mock_value(mock_id.gap.fault, 1) + with pytest.raises(RuntimeError) as e: + await mock_id.set(600) + assert str(e.value) == mock_id.gap.name + " is in fault state" + set_mock_value(mock_id.gap.fault, 0) + set_mock_value(mock_id.gap.gate, UndulatorGateStatus.open) + with pytest.raises(RuntimeError) as e: + await mock_id.set(600) + assert str(e.value) == mock_id.gap.name + " is already in motion." + + +async def test_I10Apple2_RE_scan(mock_id: I10Apple2, RE: RunEngine): + docs = defaultdict(list) + + def capture_emitted(name, doc): + docs[name].append(doc) + + RE(scan([mock_id], mock_id, 500, 600, num=11), capture_emitted) + assert_emitted(docs, start=1, descriptor=1, event=11, stop=1) + + +async def test_I10Apple2_pgm_RE_scan(mock_id_pgm: I10Apple2PGM, RE: RunEngine): + docs = defaultdict(list) + + def capture_emitted(name, doc): + docs[name].append(doc) + + mock_id_pgm.id.pol = "lh3" + RE(scan([mock_id_pgm], mock_id_pgm, 1700, 1800, num=11), capture_emitted) + assert_emitted(docs, start=1, descriptor=1, event=11, stop=1) + # with enevery offset + docs = defaultdict(list) + await mock_id_pgm.energy_offset.set(20) + rbv_mocks = Mock() + rbv_mocks.get.side_effect = range(1700, 1810, 10) + callback_on_mock_put( + mock_id_pgm.pgm.energy.user_setpoint, + lambda *_, **__: set_mock_value( + mock_id_pgm.pgm.energy.user_readback, rbv_mocks.get() + ), + ) + RE(scan([mock_id_pgm], mock_id_pgm, 1700, 1800, num=11), capture_emitted) + for cnt, data in enumerate(docs["event"]): + assert data["data"]["mock_id_pgm-id-energy"] == 1700 + cnt * 10 + 20 + assert data["data"]["mock_id_pgm-pgm-energy"] == 1700 + cnt * 10 + + +@pytest.mark.parametrize( + "pol,energy, expect_top_outer, expect_top_inner, expect_btm_inner,expect_btm_outer, expect_gap", + [ + ("lh", 500, 0.0, 0.0, 0.0, 0.0, 23.0), + ("lh", 700, 0.0, 0.0, 0.0, 0.0, 26.0), + ("lh", 1000, 0.0, 0.0, 0.0, 0.0, 32.0), + ("lh3", 1000, 0.0, 0.0, 0.0, 0.0, 18.0), + ("lh3", 1400, 0.0, 0.0, 0.0, 0.0, 22.0), + ("lh3", 1900, 0.0, 0.0, 0.0, 0.0, 25.0), + ("lv", 600, 24.0, 0.0, 24.0, 0.0, 17.0), + ("lv", 900, 24.0, 0.0, 24.0, 0.0, 21.0), + ("lv", 1200, 24.0, 0.0, 24.0, 0.0, 25.0), + ("pc", 500, 15.5, 0.0, 15.5, 0.0, 17.0), + ("pc", 700, 16, 0.0, 16, 0.0, 21.0), + ("pc", 1000, 16.5, 0.0, 16.5, 0.0, 25.0), + ("nc", 500, -15.5, 0.0, -15.5, 0.0, 17.0), + ("nc", 800, -16, 0.0, -16, 0.0, 22.0), + ("nc", 1000, -16.5, 0.0, -16.5, 0.0, 25.0), + ("la", 700, -15.2, 0.0, 15.2, 0.0, 16.5), + ("la", 900, -15.6, 0.0, 15.6, 0.0, 19.0), + ("la", 1300, -16.4, 0.0, 16.4, 0.0, 25.0), + ("dsf", 0.0, 0.0, 0.0, 0.0, 0.0, 0.0), + ], +) +async def test_I10Apple2_pol_set( + mock_id_pol: I10Apple2Pol, + pol: str, + energy: float, + expect_top_inner: float, + expect_top_outer: float, + expect_btm_inner: float, + expect_btm_outer: float, + expect_gap: float, +): + mock_id_pol.id._energy_set(energy) + if pol == "dsf": + with pytest.raises(ValueError): + await mock_id_pol.set(pol) + else: + await mock_id_pol.set(pol) + assert mock_id_pol.id.pol == pol + top_inner = get_mock_put(mock_id_pol.id.phase.top_inner.user_setpoint) + top_inner.assert_called_once() + assert float(top_inner.call_args[0][0]) == pytest.approx(expect_top_inner, 0.01) + + top_outer = get_mock_put(mock_id_pol.id.phase.top_outer.user_setpoint) + top_outer.assert_called_once() + assert float(top_outer.call_args[0][0]) == pytest.approx(expect_top_outer, 0.01) + + btm_inner = get_mock_put(mock_id_pol.id.phase.btm_inner.user_setpoint) + btm_inner.assert_called_once() + assert float(btm_inner.call_args[0][0]) == pytest.approx(expect_btm_inner, 0.01) + + btm_outer = get_mock_put(mock_id_pol.id.phase.btm_outer.user_setpoint) + btm_outer.assert_called_once() + assert float(btm_outer.call_args[0][0]) == pytest.approx(expect_btm_outer, 0.01) + + gap = get_mock_put(mock_id_pol.id.gap.user_setpoint) + gap.assert_called_once() + assert float(gap.call_args[0][0]) == pytest.approx(expect_gap, 0.05) + + +async def test_linear_arbitrary_pol_fail( + mock_linear_arbitrary_angle: LinearArbitraryAngle, +): + mock_linear_arbitrary_angle.id.pol = "lh" + with pytest.raises(RuntimeError) as e: + await mock_linear_arbitrary_angle.set(20) + assert str(e.value) == ( + f"Angle control is not available in polarisation" + f" {mock_linear_arbitrary_angle.id.pol} with {mock_linear_arbitrary_angle.id.name}" + ) + + +@pytest.mark.parametrize( + "poly", + [18, -18, 12.01, -12.01], +) +async def test_linear_arbitrary_limit_fail( + mock_linear_arbitrary_angle: LinearArbitraryAngle, poly: float +): + mock_linear_arbitrary_angle.id.pol = "la" + mock_linear_arbitrary_angle.jaw_phase_from_angle = poly1d([poly]) + with pytest.raises(RuntimeError) as e: + await mock_linear_arbitrary_angle.set(20) + assert ( + str(e.value) == f"jaw_phase position for angle (20) is outside permitted range" + f" [-{mock_linear_arbitrary_angle.jaw_phase_limit}, {mock_linear_arbitrary_angle.jaw_phase_limit}]" + ) + + +@pytest.mark.parametrize( + "start, stop, num_point", + [ + (0, 180, 11), + (-20, 170, 31), + (-90, -25, 18), + ], +) +async def test_linear_arbitrary_RE_scan( + mock_linear_arbitrary_angle: LinearArbitraryAngle, + RE: RunEngine, + start: float, + stop: float, + num_point: int, +): + angles = np.linspace(start, stop, num_point, endpoint=True) + docs = defaultdict(list) + + def capture_emitted(name, doc): + docs[name].append(doc) + + mock_linear_arbitrary_angle.id.pol = "la" + + RE( + scan( + [mock_linear_arbitrary_angle], + mock_linear_arbitrary_angle, + start, + stop, + num=num_point, + ), + capture_emitted, + ) + assert_emitted(docs, start=1, descriptor=1, event=num_point, stop=1) + + jaw_phase = get_mock_put( + mock_linear_arbitrary_angle.id.id_jaw_phase.jaw_phase.user_setpoint + ) + + poly = poly1d( + DEFAULT_JAW_PHASE_POLY_PARAMS + ) # default setting for i10 jaw phase to angle + for cnt, data in enumerate(docs["event"]): + temp_angle = angles[cnt] + assert data["data"]["mock_linear_arbitrary_angle-angle"] == temp_angle + alpha_real = ( + temp_angle + if temp_angle > mock_linear_arbitrary_angle.angle_threshold_deg + else temp_angle + 180.0 + ) # convert angle to jawphase. + assert jaw_phase.call_args_list[cnt] == mock.call( + str(poly(alpha_real)), wait=True, timeout=mock.ANY + ) + + +@pytest.mark.parametrize( + "fileName, expected_dict_file_name, source", + [ + ( + ID_GAP_LOOKUP_TABLE, + "expectedIDEnergy2GapCalibrationsIdu.pkl", + ("Source", "idu"), + ), + ( + ID_GAP_LOOKUP_TABLE, + "expectedIDEnergy2GapCalibrationsIdd.pkl", + ("Source", "idd"), + ), + ( + ID_PHASE_LOOKUP_TABLE, + "expectedIDEnergy2PhaseCalibrationsidu.pkl", + ("Source", "idu"), + ), + ( + ID_PHASE_LOOKUP_TABLE, + "expectedIDEnergy2PhaseCalibrationsidd.pkl", + ("Source", "idd"), + ), + ], +) +def test_convert_csv_to_lookup_success( + fileName: str, + expected_dict_file_name: str, + source: tuple[str, str], +): + data = convert_csv_to_lookup( + file=fileName, + source=source, + ) + path = "tests/devices/i10/lookupTables/" + with open(path + expected_dict_file_name, "rb") as f: + loaded_dict = pickle.load(f) + assert data == loaded_dict + + +def test_convert_csv_to_lookup_failed(): + with pytest.raises(RuntimeError): + convert_csv_to_lookup( + file=ID_GAP_LOOKUP_TABLE, + source=("Source", "idw"), + ) diff --git a/tests/devices/unit_tests/test_apple2_undulator.py b/tests/devices/unit_tests/test_apple2_undulator.py new file mode 100644 index 0000000000..9a205e7191 --- /dev/null +++ b/tests/devices/unit_tests/test_apple2_undulator.py @@ -0,0 +1,359 @@ +import asyncio +from collections import defaultdict +from unittest.mock import ANY + +import bluesky.plan_stubs as bps +import pytest +from bluesky.plans import scan +from bluesky.run_engine import RunEngine +from ophyd_async.core import ( + DeviceCollector, + assert_emitted, + callback_on_mock_put, + get_mock_put, + set_mock_value, +) + +from dodal.devices.apple2_undulator import ( + Apple2PhasesVal, + UndulatorGap, + UndulatorGateStatus, + UndulatorJawPhase, + UndulatorPhaseAxes, +) + + +@pytest.fixture +async def mock_id_gap(prefix: str = "BLXX-EA-DET-007:") -> UndulatorGap: + async with DeviceCollector(mock=True): + mock_id_gap = UndulatorGap(prefix, "mock_id_gap") + assert mock_id_gap.name == "mock_id_gap" + set_mock_value(mock_id_gap.gate, UndulatorGateStatus.close) + set_mock_value(mock_id_gap.velocity, 1) + set_mock_value(mock_id_gap.user_readback, 1) + set_mock_value(mock_id_gap.user_setpoint, "1") + set_mock_value(mock_id_gap.fault, 0) + return mock_id_gap + + +@pytest.fixture +async def mock_phaseAxes(prefix: str = "BLXX-EA-DET-007:") -> UndulatorPhaseAxes: + async with DeviceCollector(mock=True): + mock_phaseAxes = UndulatorPhaseAxes( + prefix=prefix, + top_outer="RPQ1", + top_inner="RPQ2", + btm_outer="RPQ3", + btm_inner="RPQ4", + ) + assert mock_phaseAxes.name == "mock_phaseAxes" + set_mock_value(mock_phaseAxes.gate, UndulatorGateStatus.close) + set_mock_value(mock_phaseAxes.top_outer.velocity, 2) + set_mock_value(mock_phaseAxes.top_inner.velocity, 2) + set_mock_value(mock_phaseAxes.btm_outer.velocity, 2) + set_mock_value(mock_phaseAxes.btm_inner.velocity, 2) + set_mock_value(mock_phaseAxes.top_outer.user_setpoint_readback, 2) + set_mock_value(mock_phaseAxes.top_inner.user_setpoint_readback, 2) + set_mock_value(mock_phaseAxes.btm_outer.user_setpoint_readback, 2) + set_mock_value(mock_phaseAxes.btm_inner.user_setpoint_readback, 2) + set_mock_value(mock_phaseAxes.top_outer.user_setpoint_demand_readback, 2) + set_mock_value(mock_phaseAxes.top_inner.user_setpoint_demand_readback, 2) + set_mock_value(mock_phaseAxes.btm_outer.user_setpoint_demand_readback, 2) + set_mock_value(mock_phaseAxes.btm_inner.user_setpoint_demand_readback, 2) + set_mock_value(mock_phaseAxes.fault, 0) + return mock_phaseAxes + + +@pytest.fixture +async def mock_jaw_phase(prefix: str = "BLXX-EA-DET-007:") -> UndulatorJawPhase: + async with DeviceCollector(mock=True): + mock_jaw_phase = UndulatorJawPhase( + prefix=prefix, move_pv="RPQ1", jaw_phase="JAW" + ) + set_mock_value(mock_jaw_phase.gate, UndulatorGateStatus.close) + set_mock_value(mock_jaw_phase.jaw_phase.velocity, 2) + set_mock_value(mock_jaw_phase.jaw_phase.user_setpoint_readback, 0) + set_mock_value(mock_jaw_phase.jaw_phase.user_setpoint_demand_readback, 0) + set_mock_value(mock_jaw_phase.fault, 0) + return mock_jaw_phase + + +async def test_in_motion_error( + mock_id_gap: UndulatorGap, + mock_phaseAxes: UndulatorPhaseAxes, + mock_jaw_phase: UndulatorJawPhase, +): + set_mock_value(mock_id_gap.gate, UndulatorGateStatus.open) + with pytest.raises(RuntimeError): + await mock_id_gap.set("2") + set_mock_value(mock_phaseAxes.gate, UndulatorGateStatus.open) + setValue = Apple2PhasesVal("3", "2", "5", "7") + with pytest.raises(RuntimeError): + await mock_phaseAxes.set(setValue) + set_mock_value(mock_jaw_phase.gate, UndulatorGateStatus.open) + with pytest.raises(RuntimeError): + await mock_jaw_phase.set(2) + + +@pytest.mark.parametrize( + "velocity, readback,target, expected_timeout", + [ + (0.7, 20.1, 5.2, 42.5), + (0.2, 2, 8, 60.0), + (-0.2, 2, 8, 60.0), + ], +) +async def test_gap_cal_timout( + mock_id_gap: UndulatorGap, + velocity: float, + readback: float, + target: float, + expected_timeout: float, +): + set_mock_value(mock_id_gap.velocity, velocity) + set_mock_value(mock_id_gap.user_readback, readback) + set_mock_value(mock_id_gap.user_setpoint, str(target)) + + assert await mock_id_gap.get_timeout() == pytest.approx(expected_timeout, rel=0.1) + + +async def test_gap_time_out_error(mock_id_gap: UndulatorGap, RE: RunEngine): + callback_on_mock_put( + mock_id_gap.user_setpoint, + lambda *_, **__: set_mock_value(mock_id_gap.gate, UndulatorGateStatus.open), + ) + set_mock_value(mock_id_gap.velocity, 1000) + with pytest.raises(asyncio.TimeoutError): + await mock_id_gap.set("2") + + +async def test_gap_status_error(mock_id_gap: UndulatorGap, RE: RunEngine): + setValue = Apple2PhasesVal("3", "2", "5", "7") + set_mock_value(mock_id_gap.fault, 1.0) + with pytest.raises(RuntimeError): + await mock_id_gap.set(setValue) + + +async def test_gap_success_scan(mock_id_gap: UndulatorGap, RE: RunEngine): + callback_on_mock_put( + mock_id_gap.user_setpoint, + lambda *_, **__: set_mock_value(mock_id_gap.gate, UndulatorGateStatus.open), + ) + output = range(0, 11, 1) + + def new_pos(): + yield from output + + pos = new_pos() + + def set_complete_move(): + set_mock_value(mock_id_gap.user_readback, next(pos)) + set_mock_value(mock_id_gap.gate, UndulatorGateStatus.close) + + callback_on_mock_put(mock_id_gap.set_move, lambda *_, **__: set_complete_move()) + docs = defaultdict(list) + + def capture_emitted(name, doc): + docs[name].append(doc) + + RE(scan([mock_id_gap], mock_id_gap, 0, 10, 11), capture_emitted) + assert_emitted(docs, start=1, descriptor=1, event=11, stop=1) + for i in output: + assert docs["event"][i]["data"]["mock_id_gap-user_readback"] == i + + +async def test_phase_time_out_error(mock_phaseAxes: UndulatorPhaseAxes, RE: RunEngine): + setValue = Apple2PhasesVal("3", "2", "5", "7") + + callback_on_mock_put( + mock_phaseAxes.top_outer.user_setpoint, + lambda *_, **__: set_mock_value(mock_phaseAxes.gate, UndulatorGateStatus.open), + ) + set_mock_value(mock_phaseAxes.top_inner.velocity, 1000) + with pytest.raises(asyncio.TimeoutError): + await mock_phaseAxes.set(setValue) + + +async def test_phase_status_error(mock_phaseAxes: UndulatorPhaseAxes, RE: RunEngine): + setValue = Apple2PhasesVal("3", "2", "5", "7") + set_mock_value(mock_phaseAxes.fault, 1.0) + with pytest.raises(RuntimeError): + await mock_phaseAxes.set(setValue) + + +@pytest.mark.parametrize( + "velocity, readback,target, expected_timeout", + [ + ([-1, 2, 3, 4], [5, 2, 3, 4], [-2, 2, 3, 4], 14.0), + ([-1, 0.8, 3, 4], [5, -8.5, 3, 4], [-2, 0, 3, 4], 21.2), + ([-1, 0.8, 0.6, 4], [5, -8.5, 2, 4], [-2, 0, -5.5, 4], 25.0), + ([-1, 0.8, 0.6, 2.7], [5, -8.5, 2, 30], [-2, 0, -5.5, -8.8], 28.7), + ], +) +async def test_phase_cal_timout( + mock_phaseAxes: UndulatorPhaseAxes, + velocity: list, + readback: list, + target: list, + expected_timeout: float, +): + set_mock_value(mock_phaseAxes.top_inner.velocity, velocity[0]) + set_mock_value(mock_phaseAxes.top_outer.velocity, velocity[1]) + set_mock_value(mock_phaseAxes.btm_inner.velocity, velocity[2]) + set_mock_value(mock_phaseAxes.btm_outer.velocity, velocity[3]) + + set_mock_value(mock_phaseAxes.top_inner.user_setpoint_readback, readback[0]) + set_mock_value(mock_phaseAxes.top_outer.user_setpoint_readback, readback[1]) + set_mock_value(mock_phaseAxes.btm_inner.user_setpoint_readback, readback[2]) + set_mock_value(mock_phaseAxes.btm_outer.user_setpoint_readback, readback[3]) + + set_mock_value(mock_phaseAxes.top_inner.user_setpoint_demand_readback, target[0]) + set_mock_value(mock_phaseAxes.top_outer.user_setpoint_demand_readback, target[1]) + set_mock_value(mock_phaseAxes.btm_inner.user_setpoint_demand_readback, target[2]) + set_mock_value(mock_phaseAxes.btm_outer.user_setpoint_demand_readback, target[3]) + + assert await mock_phaseAxes.get_timeout() == pytest.approx( + expected_timeout, rel=0.1 + ) + + +async def test_phase_success_set(mock_phaseAxes: UndulatorPhaseAxes, RE: RunEngine): + set_value = Apple2PhasesVal( + top_inner="3", top_outer="2", btm_inner="5", btm_outer="7" + ) + callback_on_mock_put( + mock_phaseAxes.top_inner.user_setpoint, + lambda *_, **__: set_mock_value(mock_phaseAxes.gate, UndulatorGateStatus.open), + ) + + def set_complete_move(): + set_mock_value( + mock_phaseAxes.top_inner.user_setpoint_readback, + 3, + ) + set_mock_value( + mock_phaseAxes.top_outer.user_setpoint_readback, + 2, + ) + set_mock_value( + mock_phaseAxes.btm_inner.user_setpoint_readback, + 5, + ) + set_mock_value( + mock_phaseAxes.btm_outer.user_setpoint_readback, + 7, + ) + set_mock_value(mock_phaseAxes.gate, UndulatorGateStatus.close) + + callback_on_mock_put(mock_phaseAxes.set_move, lambda *_, **__: set_complete_move()) + RE(bps.abs_set(mock_phaseAxes, set_value, wait=True)) + get_mock_put(mock_phaseAxes.set_move).assert_called_once_with( + 1, wait=True, timeout=ANY + ) + get_mock_put(mock_phaseAxes.top_inner.user_setpoint).assert_called_once_with( + set_value.top_inner, wait=True, timeout=ANY + ) + get_mock_put(mock_phaseAxes.top_outer.user_setpoint).assert_called_once_with( + set_value.top_outer, wait=True, timeout=ANY + ) + get_mock_put(mock_phaseAxes.btm_inner.user_setpoint).assert_called_once_with( + set_value.btm_inner, wait=True, timeout=ANY + ) + get_mock_put(mock_phaseAxes.btm_outer.user_setpoint).assert_called_once_with( + set_value.btm_outer, wait=True, timeout=ANY + ) + + assert await mock_phaseAxes.read() == { + "mock_phaseAxes-top_inner-user_setpoint_readback": { + "value": 3, + "timestamp": ANY, + "alarm_severity": 0, + }, + "mock_phaseAxes-top_outer-user_setpoint_readback": { + "value": 2, + "timestamp": ANY, + "alarm_severity": 0, + }, + "mock_phaseAxes-btm_inner-user_setpoint_readback": { + "value": 5, + "timestamp": ANY, + "alarm_severity": 0, + }, + "mock_phaseAxes-btm_outer-user_setpoint_readback": { + "value": 7, + "timestamp": ANY, + "alarm_severity": 0, + }, + } + + +async def test_jaw_phase_time_out_error(mock_jaw_phase: UndulatorJawPhase): + callback_on_mock_put( + mock_jaw_phase.jaw_phase.user_setpoint, + lambda *_, **__: set_mock_value(mock_jaw_phase.gate, UndulatorGateStatus.open), + ) + set_mock_value(mock_jaw_phase.jaw_phase.velocity, 1000) + with pytest.raises(asyncio.TimeoutError): + await mock_jaw_phase.set(2) + + +async def test_jaw_phase_status_error(mock_jaw_phase: UndulatorJawPhase): + setValue = 5 + set_mock_value(mock_jaw_phase.fault, 1.0) + with pytest.raises(RuntimeError): + await mock_jaw_phase.set(setValue) + + +@pytest.mark.parametrize( + "velocity, readback,target, expected_timeout", + [ + (0.7, 20.1, 5.2, 42.5), + (0.2, 2, 8, 60.0), + (-0.2, 2, 8, 60.0), + ], +) +async def test_jaw_phase_cal_timout( + mock_jaw_phase: UndulatorJawPhase, + velocity: float, + readback: float, + target: float, + expected_timeout: float, +): + set_mock_value(mock_jaw_phase.jaw_phase.velocity, velocity) + set_mock_value(mock_jaw_phase.jaw_phase.user_setpoint_readback, readback) + set_mock_value(mock_jaw_phase.jaw_phase.user_setpoint_demand_readback, target) + + assert await mock_jaw_phase.get_timeout() == pytest.approx( + expected_timeout, rel=0.1 + ) + + +async def test_jaw_phase_success_scan(mock_jaw_phase: UndulatorJawPhase, RE: RunEngine): + callback_on_mock_put( + mock_jaw_phase.jaw_phase.user_setpoint, + lambda *_, **__: set_mock_value(mock_jaw_phase.gate, UndulatorGateStatus.open), + ) + output = range(0, 11, 1) + + def new_pos(): + yield from output + + pos = new_pos() + + def set_complete_move(): + set_mock_value(mock_jaw_phase.jaw_phase.user_setpoint_readback, next(pos)) + set_mock_value(mock_jaw_phase.gate, UndulatorGateStatus.close) + + callback_on_mock_put(mock_jaw_phase.set_move, lambda *_, **__: set_complete_move()) + docs = defaultdict(list) + + def capture_emitted(name, doc): + docs[name].append(doc) + + RE(scan([mock_jaw_phase], mock_jaw_phase, 0, 10, 11), capture_emitted) + assert_emitted(docs, start=1, descriptor=1, event=11, stop=1) + for i in output: + assert ( + docs["event"][i]["data"]["mock_jaw_phase-jaw_phase-user_setpoint_readback"] + == i + )