diff --git a/src/dodal/beamlines/i22.py b/src/dodal/beamlines/i22.py index a72082a90a..00c939c36f 100644 --- a/src/dodal/beamlines/i22.py +++ b/src/dodal/beamlines/i22.py @@ -5,6 +5,7 @@ from dodal.common.beamlines.beamline_utils import ( device_factory, + device_factory3, device_instantiation, get_directory_provider, set_directory_provider, @@ -43,8 +44,7 @@ @device_factory(lazy=True) -def saxs( -) -> PilatusDetector: +def saxs() -> PilatusDetector: return NXSasPilatus( "-EA-PILAT-01:", name="saxs", @@ -374,3 +374,12 @@ def linkam( wait_for_connection, fake_with_ophyd_sim, ) + + +@device_factory3(lazy=True, fake=True, post_create=None, timeout=10) +def test_device(): + return AravisDetector( + prefix="TEST:", + name="test_device", + directory_provider=get_directory_provider(), + ) diff --git a/src/dodal/common/beamlines/beamline_utils.py b/src/dodal/common/beamlines/beamline_utils.py index dc52eb2093..a011c9ac68 100644 --- a/src/dodal/common/beamlines/beamline_utils.py +++ b/src/dodal/common/beamlines/beamline_utils.py @@ -1,6 +1,5 @@ import inspect from functools import wraps -from typing import Type, Callable, Optional, TypeVar from typing import ( Callable, Dict, @@ -9,6 +8,7 @@ List, Optional, Protocol, + Type, TypeVar, cast, ) @@ -90,7 +90,7 @@ def __call__( F = Callable[[], D] _factory_made_devices: Dict[DeviceFactory, OphydV2Device] = {} -_device_is_lazy: Dict[, bool] = {} +_device_is_lazy: Dict[DeviceFactory, bool] = {} def device_factory(lazy=False, set_name=True) -> Callable[[F], DeviceFactory[D]]: @@ -114,11 +114,11 @@ def factory(connect=False, timeout: float = DEFAULT_TIMEOUT): return wrapper_around_device_init +T2 = TypeVar("T2", bound=OphydV2Device) # Generic type for devices -T = TypeVar('T', bound=OphydV2Device) # Generic type for devices -def device_factory(lazy: bool = False, fake: bool = False, wait: bool = False): - def decorator(func: Callable[..., T]) -> Callable[[], T]: +def device_factory2(lazy: bool = False, fake: bool = False, wait: bool = False): + def decorator(func: Callable[..., T2]) -> Callable[[], T2]: _cache = None @wraps(func) @@ -129,7 +129,9 @@ def wrapper() -> T: device = func() if fake: - device = make_fake_device(device) # Assume make_fake_device modifies the device for simulation. + device = make_fake_device( + device + ) # Assume make_fake_device modifies the device for simulation. if wait: device.wait_for_connection() # Assume wait_for_connection is a method of the device. @@ -137,6 +139,7 @@ def wrapper() -> T: return device return wrapper + return decorator @@ -214,3 +217,61 @@ def get_directory_provider() -> UpdatingDirectoryProvider: "DirectoryProvider has not been set! Ophyd-async StandardDetectors will not be able to write!" ) return DIRECTORY_PROVIDER + + +@skip_device() +def device_factory3( + name: str, + prefix: str, + not_lazy: bool, + fake: bool, + post_create: Optional[Callable[[T], None]] = None, + timeout: float = DEFAULT_CONNECTION_TIMEOUT, + bl_prefix: bool = True, + **kwargs, +) -> T: + """Method to allow generic creation of singleton devices. Meant to be used to easily + define lists of devices in beamline files. Additional keyword arguments are passed + directly to the device constructor. + + Arguments: + device_factory: Callable the device class + name: str the name for ophyd + prefix: str the PV prefix for the most (usually all) components + wait: bool whether to run .wait_for_connection() + fake: bool whether to fake with ophyd.sim + post_create: Callable (optional) a function to be run on the device after + creation + bl_prefix: bool if true, add the beamline prefix when instantiating, if + false the complete PV prefix must be supplied. + Returns: + The instance of the device. + """ + already_existing_device: AnyDevice | None = ACTIVE_DEVICES.get(name) + if fake: + device_factory = cast(Callable[..., T], make_fake_device(device_factory)) + if already_existing_device is None: + device_instance = device_factory( + name=name, + prefix=( + f"{(BeamlinePrefix(BL).beamline_prefix)}{prefix}" + if bl_prefix + else prefix + ), + **kwargs, + ) + ACTIVE_DEVICES[name] = device_instance + if not_lazy: + call_in_bluesky_event_loop(device_instance.connect(timeout=timeout)) + + else: + if not active_device_is_same_type(already_existing_device, device_factory): + raise TypeError( + f"Can't instantiate device of type {device_factory} with the same " + f"name as an existing device. Device name '{name}' already used for " + f"a(n) {type(already_existing_device)}." + ) + device_instance = cast(T, already_existing_device) + if post_create: + post_create(device_instance) + return device_instance