Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding driver and controller classes for Kinetix and Vimba cameras #216

Merged
merged 16 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ophyd_async/epics/areadetector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .aravis import AravisDetector
from .kinetix import KinetixDetector
from .pilatus import PilatusDetector
from .single_trigger_det import SingleTriggerDet
from .utils import (
Expand All @@ -9,9 +10,12 @@
ad_r,
ad_rw,
)
from .vimba import VimbaDetector

__all__ = [
"AravisDetector",
"KinetixDetector",
"VimbaDetector",
"SingleTriggerDet",
"FileWriteMode",
"ImageMode",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import asyncio
from typing import Optional

from ophyd_async.core import AsyncStatus, DetectorControl, DetectorTrigger
from ophyd_async.epics.areadetector.drivers.ad_base import (
start_acquiring_driver_and_ensure_status,
)

from ..drivers.kinetix_driver import KinetixDriver, KinetixTriggerMode
from ..utils import ImageMode, stop_busy_record

KINETIX_TRIGGER_MODE_MAP = {
DetectorTrigger.internal: KinetixTriggerMode.internal,
DetectorTrigger.constant_gate: KinetixTriggerMode.gate,
DetectorTrigger.variable_gate: KinetixTriggerMode.gate,
DetectorTrigger.edge_trigger: KinetixTriggerMode.edge,
}


class KinetixController(DetectorControl):
def __init__(
self,
driver: KinetixDriver,
) -> None:
self._drv = driver

def get_deadtime(self, exposure: float) -> float:
return 0.001

async def arm(
self,
num: int,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
) -> AsyncStatus:
await asyncio.gather(
self._drv.trigger_mode.set(KINETIX_TRIGGER_MODE_MAP[trigger]),
self._drv.num_images.set(num),
self._drv.image_mode.set(ImageMode.multiple),
)
if exposure is not None and trigger not in [
DetectorTrigger.variable_gate,
DetectorTrigger.constant_gate,
]:
await self._drv.acquire_time.set(exposure)
return await start_acquiring_driver_and_ensure_status(self._drv)

async def disarm(self):
await stop_busy_record(self._drv.acquire, False, timeout=1)
66 changes: 66 additions & 0 deletions src/ophyd_async/epics/areadetector/controllers/vimba_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import asyncio
from typing import Optional

from ophyd_async.core import AsyncStatus, DetectorControl, DetectorTrigger
from ophyd_async.epics.areadetector.drivers.ad_base import (
start_acquiring_driver_and_ensure_status,
)

from ..drivers.vimba_driver import (
VimbaDriver,
VimbaExposeOutMode,
VimbaOnOff,
VimbaTriggerSource,
)
from ..utils import ImageMode, stop_busy_record

TRIGGER_MODE = {
DetectorTrigger.internal: VimbaOnOff.off,
DetectorTrigger.constant_gate: VimbaOnOff.on,
DetectorTrigger.variable_gate: VimbaOnOff.on,
DetectorTrigger.edge_trigger: VimbaOnOff.on,
}

EXPOSE_OUT_MODE = {
DetectorTrigger.internal: VimbaExposeOutMode.timed,
DetectorTrigger.constant_gate: VimbaExposeOutMode.trigger_width,
DetectorTrigger.variable_gate: VimbaExposeOutMode.trigger_width,
DetectorTrigger.edge_trigger: VimbaExposeOutMode.timed,
}


class VimbaController(DetectorControl):
def __init__(
self,
driver: VimbaDriver,
) -> None:
self._drv = driver

def get_deadtime(self, exposure: float) -> float:
return 0.001

async def arm(
self,
num: int,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
) -> AsyncStatus:
await asyncio.gather(
self._drv.trigger_mode.set(TRIGGER_MODE[trigger]),
self._drv.expose_mode.set(EXPOSE_OUT_MODE[trigger]),
self._drv.num_images.set(num),
self._drv.image_mode.set(ImageMode.multiple),
)
if exposure is not None and trigger not in [
DetectorTrigger.variable_gate,
DetectorTrigger.constant_gate,
]:
await self._drv.acquire_time.set(exposure)
if trigger != DetectorTrigger.internal:
self._drv.trig_source.set(VimbaTriggerSource.line1)
else:
self._drv.trig_source.set(VimbaTriggerSource.freerun)
return await start_acquiring_driver_and_ensure_status(self._drv)

async def disarm(self):
await stop_busy_record(self._drv.acquire, False, timeout=1)
4 changes: 4 additions & 0 deletions src/ophyd_async/epics/areadetector/drivers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
start_acquiring_driver_and_ensure_status,
)
from .aravis_driver import AravisDriver
from .kinetix_driver import KinetixDriver
from .pilatus_driver import PilatusDriver
from .vimba_driver import VimbaDriver

__all__ = [
"ADBase",
"ADBaseShapeProvider",
"PilatusDriver",
"AravisDriver",
"KinetixDriver",
"VimbaDriver",
"start_acquiring_driver_and_ensure_status",
"DetectorState",
]
24 changes: 24 additions & 0 deletions src/ophyd_async/epics/areadetector/drivers/kinetix_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from enum import Enum

from ..utils import ad_rw
from .ad_base import ADBase


class KinetixTriggerMode(str, Enum):
internal = "Internal"
edge = "Rising Edge"
gate = "Exp. Gate"


class KinetixReadoutMode(str, Enum):
sensitivity = 1
speed = 2
dynamic_range = 3


class KinetixDriver(ADBase):
def __init__(self, prefix: str, name: str = "") -> None:
# self.pixel_format = ad_rw(PixelFormat, prefix + "PixelFormat")
self.trigger_mode = ad_rw(KinetixTriggerMode, prefix + "TriggerMode")
self.mode = ad_rw(KinetixReadoutMode, prefix + "ReadoutPortIdx")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For areaDetector I've tended to name the attribute after the PV suffix, is there a reason to name this mode rather than readout_port_idx or readout_mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had ReadoutMode originally, but changed it because I couldn't make the linter happy actually...

I didn't want to use ReadoutModeIdx because I think treating it as a ReadoutMode selection rather than an index in the list is better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you point me to the corresponding record in the template please?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these enum strings be pushed down to the template? Then having a ReadoutMode PV would make more sense...

I've tried not to add any translation in the "bucket of PVs" layer, and put the logic in the Controller, but I guess that is difficult here as the attribute isn't used in the controller.

Either way, I think readout_mode for the attribute and KinetixReadoutMode are reasonable names, is the linter happy with those?

super().__init__(prefix, name)
58 changes: 58 additions & 0 deletions src/ophyd_async/epics/areadetector/drivers/vimba_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from enum import Enum

from ..utils import ad_rw
from .ad_base import ADBase


class VimbaPixelFormat(str, Enum):
internal = "Mono8"
ext_enable = "Mono12"
ext_trigger = "Ext. Trigger"
mult_trigger = "Mult. Trigger"
alignment = "Alignment"


class VimbaConvertFormat(str, Enum):
none = "None"
mono8 = "Mono8"
mono16 = "Mono16"
rgb8 = "RGB8"
rgb16 = "RGB16"


class VimbaTriggerSource(str, Enum):
freerun = "Freerun"
line1 = "Line1"
line2 = "Line2"
fixed_rate = "FixedRate"
software = "Software"
action0 = "Action0"
action1 = "Action1"
DiamondJoseph marked this conversation as resolved.
Show resolved Hide resolved


class VimbaOverlap(str, Enum):
off = "Off"
prev_frame = "PreviousFrame"


class VimbaOnOff(str, Enum):
on = "On"
off = "Off"


class VimbaExposeOutMode(str, Enum):
timed = "Timed" # Use ExposureTime PV
trigger_width = "TriggerWidth" # Expose for length of high signal


class VimbaDriver(ADBase):
def __init__(self, prefix: str, name: str = "") -> None:
# self.pixel_format = ad_rw(PixelFormat, prefix + "PixelFormat")
self.convert_format = ad_rw(
VimbaConvertFormat, prefix + "ConvertPixelFormat"
) # Pixel format of data outputted to AD
self.trig_source = ad_rw(VimbaTriggerSource, prefix + "TriggerSource")
self.trigger_mode = ad_rw(VimbaOnOff, prefix + "TriggerMode")
self.overlap = ad_rw(VimbaOverlap, prefix + "TriggerOverlap")
self.expose_mode = ad_rw(VimbaExposeOutMode, prefix + "ExposureMode")
super().__init__(prefix, name)
48 changes: 48 additions & 0 deletions src/ophyd_async/epics/areadetector/kinetix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from bluesky.protocols import HasHints, Hints

from ophyd_async.core import DirectoryProvider, StandardDetector
from ophyd_async.epics.areadetector.controllers.kinetix_controller import (
KinetixController,
)
from ophyd_async.epics.areadetector.drivers import ADBaseShapeProvider
from ophyd_async.epics.areadetector.drivers.kinetix_driver import KinetixDriver
from ophyd_async.epics.areadetector.writers import HDFWriter, NDFileHDF


class KinetixDetector(StandardDetector, HasHints):
"""
Ophyd-async implementation of an ADKinetix Detector.
https://github.com/NSLS-II/ADKinetix
"""

_controller: KinetixController
_writer: HDFWriter

def __init__(
self,
name: str,
directory_provider: DirectoryProvider,
driver: KinetixDriver,
hdf: NDFileHDF,
**scalar_sigs: str,
):
# Must be child of Detector to pick up connect()
self.drv = driver
self.hdf = hdf

super().__init__(
KinetixController(self.drv),
HDFWriter(
self.hdf,
directory_provider,
lambda: self.name,
ADBaseShapeProvider(self.drv),
**scalar_sigs,
),
config_sigs=(self.drv.acquire_time, self.drv.acquire),
name=name,
)

@property
def hints(self) -> Hints:
return self._writer.hints
45 changes: 45 additions & 0 deletions src/ophyd_async/epics/areadetector/vimba.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from bluesky.protocols import HasHints, Hints

from ophyd_async.core import DirectoryProvider, StandardDetector
from ophyd_async.epics.areadetector.controllers.vimba_controller import VimbaController
from ophyd_async.epics.areadetector.drivers import ADBaseShapeProvider
from ophyd_async.epics.areadetector.drivers.vimba_driver import VimbaDriver
from ophyd_async.epics.areadetector.writers import HDFWriter, NDFileHDF


class VimbaDetector(StandardDetector, HasHints):
"""
Ophyd-async implementation of an ADVimba Detector.
"""

_controller: VimbaController
_writer: HDFWriter

def __init__(
self,
name: str,
directory_provider: DirectoryProvider,
driver: VimbaDriver,
hdf: NDFileHDF,
**scalar_sigs: str,
):
# Must be child of Detector to pick up connect()
self.drv = driver
self.hdf = hdf

super().__init__(
VimbaController(self.drv),
HDFWriter(
self.hdf,
directory_provider,
lambda: self.name,
ADBaseShapeProvider(self.drv),
**scalar_sigs,
),
config_sigs=(self.drv.acquire_time, self.drv.acquire),
name=name,
)

@property
def hints(self) -> Hints:
return self._writer.hints
Loading