diff --git a/software/control/utils.py b/software/control/utils.py index 0e8d6913..8e3b1d7e 100644 --- a/software/control/utils.py +++ b/software/control/utils.py @@ -1,3 +1,4 @@ +import enum import inspect import os import pathlib @@ -62,9 +63,16 @@ def unsigned_to_signed(unsigned_array, N): return signed -def rotate_and_flip_image(image, rotate_image_angle, flip_image): +class FlipVariant(enum.Enum): + # The mixed case is a historical artifact. + VERTICAL = "Vertical" + HORIZONTAL = "Horizontal" + BOTH = "Both" + + +def rotate_and_flip_image(image, rotate_image_angle: Optional[float], flip_image: Optional[FlipVariant]): ret_image = image.copy() - if rotate_image_angle != 0: + if rotate_image_angle and rotate_image_angle != 0: """ # ROTATE_90_CLOCKWISE # ROTATE_90_COUNTERCLOCKWISE @@ -75,18 +83,15 @@ def rotate_and_flip_image(image, rotate_image_angle, flip_image): ret_image = cv2.rotate(ret_image, cv2.ROTATE_90_COUNTERCLOCKWISE) elif rotate_image_angle == 180: ret_image = cv2.rotate(ret_image, cv2.ROTATE_180) + else: + raise ValueError(f"Unhandled rotation: {rotate_image_angle}") if flip_image is not None: - """ - flipcode = 0: flip vertically - flipcode > 0: flip horizontally - flipcode < 0: flip vertically and horizontally - """ - if flip_image == "Vertical": + if flip_image == FlipVariant.VERTICAL: ret_image = cv2.flip(ret_image, 0) - elif flip_image == "Horizontal": + elif flip_image == FlipVariant.HORIZONTAL: ret_image = cv2.flip(ret_image, 1) - elif flip_image == "Both": + elif flip_image == FlipVariant.BOTH: ret_image = cv2.flip(ret_image, -1) return ret_image diff --git a/software/squid/abc.py b/software/squid/abc.py index 0a1e916c..58a6400d 100644 --- a/software/squid/abc.py +++ b/software/squid/abc.py @@ -1,5 +1,18 @@ +import dataclasses from abc import ABC, abstractmethod -from typing import Tuple +from typing import Callable, Optional, Tuple, Sequence, List +import abc +import enum +import time + +import pydantic +import numpy as np +from dataclasses import dataclass + +import squid.logging +from squid.config import AxisConfig, StageConfig, CameraConfig, CameraPixelFormat +from squid.exceptions import SquidTimeout +import control.utils class LightSource(ABC): @@ -112,17 +125,6 @@ def shut_down(self): pass -import abc -import time -from typing import Optional - -import pydantic - -import squid.logging -from squid.config import AxisConfig, StageConfig -from squid.exceptions import SquidTimeout - - class Pos(pydantic.BaseModel): x_mm: float y_mm: float @@ -215,3 +217,422 @@ def wait_for_idle(self, timeout_s): self._log.error(error_message) raise SquidTimeout(error_message) + + +class CameraAcquisitionMode(enum.Enum): + SOFTWARE_TRIGGER = "SOFTWARE_TRIGGER" + HARDWARE_TRIGGER = "HARDWARE_TRIGGER" + CONTINUOUS = "CONTINUOUS" + + +class CameraFrameFormat(enum.Enum): + """ + This is all known camera frame formats in the Cephla world, but not all cameras will + support all of these. + """ + + RAW = "RAW" + RGB = "RGB" + + +class CameraGainRange(pydantic.BaseModel): + min_gain: float + max_gain: float + gain_step: float + + +# NOTE(imo): Dataclass because pydantic does not like the np.array since there's no reasonable default +# we can provide it. +@dataclass +class CameraFrame: + frame_id: int + timestamp: float + frame: np.array + frame_format: CameraFrameFormat + frame_pixel_format: CameraPixelFormat + + def is_color(self): + return CameraPixelFormat.is_color_format(self.frame_pixel_format) + + +class AbstractCamera(metaclass=abc.ABCMeta): + def __init__( + self, + camera_config: CameraConfig, + hw_trigger_fn: Optional[Callable[[Optional[float]], bool]], + hw_set_strobe_delay_ms_fn: Optional[Callable[[float], bool]], + ): + """ + Init should open the camera, configure it as needed based on camera_config and reasonable + defaults, and make it immediately available for use in grabbing frames. + + The hw_trigger_fn arguments are: Optional[float] = illumination time in ms (if None, do not control illumination) + The hw_set_strobe_delay_ms_fn arguments are: float = hardware strobe delay in ms. + + If you plan on using the HARDWARE acquisition mode, you *must* provide the hw_trigger_fn and hw_set_strobe_delay_ms_fn. + Not doing so will result in failure later on when trying to switch acquisition modes. + """ + self._config = camera_config + self._log = squid.logging.get_logger(self.__class__.__name__) + self._hw_trigger_fn: Optional[Callable[[Optional[float]], bool]] = hw_trigger_fn + self._hw_set_strobe_delay_ms_fn: Optional[Callable[[float], bool]] = hw_set_strobe_delay_ms_fn + + # Frame callbacks is a list of (id, callback) managed by add_frame_callback and remove_frame_callback. + # Your frame receiving functions should call self._send_frame_to_callbacks(frame), and doesn't need + # to do more than that. + self._frame_callbacks: List[Tuple[int, Callable[[CameraFrame], None]]] = [] + self._frame_callbacks_enabled = True + + def enable_callbacks(self, enabled: bool): + """ + This enables or disables propagation of frames to all the registered callbacks. This should be used + sparingly since any read_frame() with enable_callbacks = False will be lost to all callbacks. Valid + use cases are things like during-acquisition auto focus (whereby we need to capture a bunch of frames + that aren't a part of the acquisition). This is inherently fragile, though, so all effort should be + made to design a system that has enabled_callbacks(True) as the default! + """ + self._log.debug(f"enable_callbacks: {enabled=}") + self._frame_callbacks_enabled = enabled + + def get_callbacks_enabled(self) -> bool: + return self._frame_callbacks_enabled + + def add_frame_callback(self, frame_callback: Callable[[CameraFrame], None]) -> int: + """ + Adds a new callback that will be called with the receipt of every new frame. This callback + should not block for a long time because it will be called in the frame receiving hot path! + + This np.ndarray is shared with all callbacks, so you should make a copy if you need to modify it. + + Returns the callback ID that can be used to remove the callback later if needed. + """ + try: + next_id = max(t[0] for t in self._frame_callbacks) + 1 + except ValueError: + next_id = 1 + + self._frame_callbacks.append((next_id, frame_callback)) + + return next_id + + def remove_frame_callback(self, callback_id): + try: + idx_to_remove = [t[0] for t in self._frame_callbacks].index(callback_id) + self._log.debug(f"Removing callback with id={callback_id} at idx={idx_to_remove}.") + del self._frame_callbacks[idx_to_remove] + except ValueError: + self._log.warning(f"No callback with id={callback_id}, cannot remove it.") + + def _propogate_frame(self, raw_frame: CameraFrame): + """ + Implementations can call this to propogate a new frame to all registered callbacks. The frame + will be rotated/cropped/etc based on our config, so the callbacks don't need to do that. + """ + if not self._frame_callbacks_enabled: + return + camera_frame = dataclasses.replace( + raw_frame, + frame=control.utils.rotate_and_flip_image( + raw_frame.frame, rotate_image_angle=self._config.rotate_image_angle, flip_image=self._config.flip + ), + ) + for _, cb in self._frame_callbacks: + cb(camera_frame) + + @abc.abstractmethod + def set_exposure_time(self, exposure_time_ms: float): + """ + Sets the exposure time in ms. This should also take care of setting the strobe delay (if needed). If in + HARDWARE acquisition mode, you're guaranteed to have a self._hw_set_strobe_delay_ms_fn to help with this. + """ + pass + + @abc.abstractmethod + def get_exposure_time(self) -> float: + """ + Returns the current exposure time in milliseconds. + """ + pass + + @abc.abstractmethod + def get_exposure_limits(self) -> Tuple[float, float]: + """ + Return the valid range of exposure times in inclusive milliseconds. + """ + pass + + @abc.abstractmethod + def get_strobe_time(self) -> float: + """ + Given the current exposure time we are using, what is the strobe time such that + get_strobe_time() + get_exposure_time() == total frame time. In milliseconds. + """ + + def get_total_frame_time(self) -> float: + """ + The total sensor time for a single frame. This is strobe time + exposure time in ms. + """ + return self.get_exposure_time() + self.get_strobe_time() + + @abc.abstractmethod + def set_frame_format(self, frame_format: CameraFrameFormat): + """ + If this camera supports the given frame format, set it and make sure that + all subsequent frames are in this format. + + If not, throw a ValueError. + """ + pass + + @abc.abstractmethod + def get_frame_format(self) -> CameraFrameFormat: + pass + + @abc.abstractmethod + def set_pixel_format(self, pixel_format: squid.config.CameraPixelFormat): + """ + If this camera supports the given pixel format, enable it and make sure that all + subsequent captures use this pixel format. + + If not, throw a ValueError. + """ + pass + + @abc.abstractmethod + def get_pixel_format(self) -> squid.config.CameraPixelFormat: + pass + + @abc.abstractmethod + def set_resolution(self, width: int, height: int): + """ + If the camera supports this width x height pixel format, set it and make sure + all subsequent frames are of this resolution. + + If not, throw a ValueError. + """ + pass + + @abc.abstractmethod + def get_resolution(self) -> Tuple[int, int]: + """ + Return the (width, height) resolution of captures made by the camera right now. + """ + pass + + @abc.abstractmethod + def get_resolutions(self) -> Sequence[Tuple[int, int]]: + """ + Return all the (width, height) resolutions supported by this camera. + """ + pass + + @abc.abstractmethod + def set_analog_gain(self, analog_gain: float): + """ + Set analog gain as an input multiple. EG 1 = no gain, 100 = 100x gain. + """ + pass + + @abc.abstractmethod + def get_analog_gain(self) -> float: + """ + Returns gain in the same units as set_analog_gain. + """ + pass + + @abc.abstractmethod + def get_gain_range(self) -> CameraGainRange: + """ + Returns the gain range, and minimum gain step, for this camera. + """ + pass + + @abc.abstractmethod + def start_streaming(self): + """ + This starts camera frame streaming. Whether this results in frames immediately depends + on the current triggering mode. If frames require triggering, no frames will come until + triggers are sent. If the camera is in continuous mode, frames will start immediately. + + This should be a noop if the camera is already streaming. + """ + pass + + @abc.abstractmethod + def stop_streaming(self): + """ + Stops camera frame streaming, which means frames will only come in with a call go get_frame + """ + pass + + @abc.abstractmethod + def get_is_streaming(self): + pass + + def _process_raw_frame(self, raw_frame: np.array) -> np.array: + """ + Takes a raw nd array from a camera, and processes it such that it can be used directly in a + CameraFrame as the frame field. This takes care of rotating, resizing, etc the raw frame such that + it respects this camera's settings. + + Your camera's image callback should use this. + """ + return control.utils.rotate_and_flip_image( + raw_frame, rotate_image_angle=self._config.rotate_image_angle, flip_image=self._config.flip + ) + + def read_frame(self) -> np.ndarray: + """ + If needed, send a trigger to request a frame. Then block and wait until the next frame comes in, + and return it. The frame that comes back will be rotated/flipped/etc based on this cameras config, + so the caller can assume all that is done for them. + + These frames will be sent to registered callbacks as well. + + NOTE(imo): We might change this to get_frame to be consistent with everything else here, but + since cameras previously used read_frame this decreases line change noise. + """ + return self.read_camera_frame().frame + + @abc.abstractmethod + def read_camera_frame(self) -> CameraFrame: + """ + This calls read_frame, but also fills in all the information such that you get a CameraFrame. The + frame in the CameraFrame will have had _process_raw_frame called on it already. + """ + pass + + @abc.abstractmethod + def get_frame_id(self) -> int: + """ + Returns the frame id of the current frame. This should increase by 1 with every frame received + from the camera + """ + pass + + @abc.abstractmethod + def get_white_balance_gains(self) -> Tuple[float, float, float]: + """ + Returns the (R, G, B) white balance gains + """ + pass + + @abc.abstractmethod + def set_white_balance_gains(self, red_gain: float, green_gain: float, blue_gain: float): + """ + Set the (R, G, B) white balance gains. + """ + pass + + @abc.abstractmethod + def set_auto_white_balance_gains(self) -> Tuple[float, float, float]: + """ + Runs auto white balance, then returns the resulting updated gains. + """ + pass + + @abc.abstractmethod + def set_black_level(self, black_level: float): + """ + Sets the black level of captured images. + """ + pass + + @abc.abstractmethod + def get_black_level(self) -> float: + """ + Gets the black level set on the camera. + """ + pass + + def set_acquisition_mode(self, acquisition_mode: CameraAcquisitionMode): + """ + Sets the acquisition mode. If you are specifying hardware trigger, and an external + system needs to send the trigger, you must specify a hw_trigger_fn. This function must be callable in such + a way that it immediately sends a hardware trigger, and only returns when the trigger has been sent. + + hw_trigger_fn and hw_set_strobe_delay_ms_fn to the __init__ must have been valid for the duration of this + camera's acquisition mode being set to HARDWARE + """ + if acquisition_mode is CameraAcquisitionMode.HARDWARE_TRIGGER: + if not self._hw_trigger_fn: + raise ValueError( + "Cannot set HARDWARE_TRIGGER camera acquisition mode without a hw_trigger_fn. You must provide one when constructing the camera." + ) + if not self._hw_set_strobe_delay_ms_fn: + raise ValueError( + "Cannot set HARDWARE_TRIGGER camera acquisition mode without a hw_set_strobe_delay_ms_fn. You must provide one when constructing the camera." + ) + + return self._set_acquisition_mode_imp(acquisition_mode=acquisition_mode) + + @abc.abstractmethod + def _set_acquisition_mode_imp(self, acquisition_mode: CameraAcquisitionMode): + """ + Your subclass must implement this such that it switches the camera to this acquisition mode. The top level + set_acquisition_mode handles storing the self._hw_trigger_fn for you so you are guaranteed to have a valid + callable self._hw_trigger_fn if in hardware trigger mode. + + If things like setting a remote strobe, or other settings, are needed when you change the mode you must + handle that here. + """ + pass + + @abc.abstractmethod + def get_acquisition_mode(self) -> CameraAcquisitionMode: + """ + Returns the current acquisition mode. + """ + pass + + @abc.abstractmethod + def send_trigger(self, illumination_time: Optional[float] = None): + """ + If in an acquisition mode that needs triggering, send a trigger. If in HARDWARE_TRIGGER mode, you are + guaranteed to have a self._hw_trigger_fn and should call that. If in CONTINUOUS mode, this can be + a no-op. + + The illumination_time argument can be used for HARDWARE_TRIGGER cases where the hardware trigger mechanism + knows how to control illumination (and may take into account a strobe delay). If not using a hardware + trigger system that controls illumination, a non-None illumination_time is allowed (but will be ignored) + + When this returns, it does not mean it is safe to immediately send another trigger. + """ + pass + + @abc.abstractmethod + def get_ready_for_trigger(self) -> bool: + """ + Returns true if the camera is ready for another trigger, false otherwise. Calling + send_trigger when this is False will result in an exception from send_trigger. + """ + pass + + @abc.abstractmethod + def set_region_of_interest(self, offset_x: int, offset_y: int, width: int, height: int): + """ + Set the region of interest of the camera so that returned frames only contain this subset of the full sensor image. + """ + pass + + @abc.abstractmethod + def get_region_of_interest(self) -> Tuple[int, int, int, int]: + """ + Returns the region of interest as a tuple of (x corner, y corner, width, height) + """ + pass + + @abc.abstractmethod + def set_temperature(self, temperature_deg_c: Optional[float]): + """ + Set the desired temperature of the camera in degrees C. If None is given as input, use + a sane default for the camera. + """ + pass + + @abc.abstractmethod + def get_temperature(self) -> float: + """ + Get the current temperature of the camera in deg C. + """ + pass diff --git a/software/squid/camera/__init__.py b/software/squid/camera/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/software/squid/camera/utils.py b/software/squid/camera/utils.py new file mode 100644 index 00000000..e809f8e6 --- /dev/null +++ b/software/squid/camera/utils.py @@ -0,0 +1,331 @@ +import functools +import threading +import time +from typing import Optional, Tuple, Sequence, Callable + +import numpy as np + +import squid.logging +from squid.config import CameraConfig, CameraPixelFormat, CameraVariant +from squid.abc import AbstractCamera, CameraAcquisitionMode, CameraFrameFormat, CameraFrame, CameraGainRange + +_log = squid.logging.get_logger("squid.camera.utils") + + +def get_camera( + config: CameraConfig, + simulated: bool = False, + hw_trigger_fn: Optional[Callable[[Optional[float]], bool]] = None, + hw_set_strobe_delay_ms_fn: Optional[Callable[[float], bool]] = None, +) -> AbstractCamera: + """ + Try to import, and then build, the requested camera. We import on a case-by-case basis + because some cameras require system level installations, and so in many cases camera + driver imports will fail. + + If you're using a camera implementation with hardware trigger mode, you'll need to provide the functions for + sending a hardware trigger and setting the strobe delay. + + NOTE(imo): While we transition to AbstractCamera, we need to do some hacks here to make the non-transitioned + drivers still work. Hence the embedded helpers here. + """ + + def open_if_needed(camera): + try: + camera.open() + except AttributeError: + pass + + if simulated: + return SimulatedCamera(config, hw_trigger_fn=hw_trigger_fn, hw_set_strobe_delay_ms_fn=hw_set_strobe_delay_ms_fn) + + try: + if config.camera_type == CameraVariant.TOUPCAM: + import control.camera_toupcam + + camera = control.camera_toupcam.Camera(config) + elif config.camera_type == CameraVariant.FLIR: + import control.camera_flir + + camera = control.camera_flir.Camera(config) + elif config.camera_type == CameraVariant.HAMAMATSU: + import control.camera_hamamatsu + + camera = control.camera_hamamatsu.Camera(config) + elif config.camera_type == CameraVariant.IDS: + import control.camera_ids + + camera = control.camera_ids.Camera(config) + elif config.camera_type == CameraVariant.TUCSEN: + import control.camera_tucsen + + camera = control.camera_ids.Camera(config) + elif config.camera_type == CameraVariant.TIS: + import control.camera_TIS + + camera = control.camera_TIS.Camera(config) + else: + import control.camera + + camera = control.camera.Camera(config) + + # NOTE(imo): All of these things are hacks before complete migration to AbstractCamera impls. They can + # be removed once all the cameras conform to the AbstractCamera interface. + open_if_needed(camera) + except ImportError as e: + _log.warning(f"Camera of type: '{config.camera_type}' failed to import. Falling back to default camera impl.") + _log.warning(e) + + import control.camera as camera + + return control.camera.Camera(config) + + raise NotImplementedError(f"Camera of type={config.camera_type} not yet supported.") + + +class SimulatedCamera(AbstractCamera): + @staticmethod + def debug_log(method): + import inspect + + @functools.wraps(method) + def _logged_method(self, *args, **kwargs): + kwargs_pairs = tuple(f"{k}={v}" for (k, v) in kwargs.items()) + args_str = tuple(str(a) for a in args) + self._log.debug( + f"{inspect.getouterframes(inspect.currentframe())[1][3]}({','.join(args_str + kwargs_pairs)})" + ) + return method(self, *args, **kwargs) + + return _logged_method + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._frame_id = 1 + self._current_raw_frame = None + self._current_frame = None + + self._exposure_time = None + self._frame_format = CameraFrameFormat.RAW + self._pixel_format = None + self.set_pixel_format(self._config.default_pixel_format) + self._resolution = None + self.set_resolution(self._config.default_resolution[0], self._config.default_resolution[1]) + self._analog_gain = None + self._white_balance_gains = None + self._black_level = None + self._acquisition_mode = None + self._roi = (0, 0, self.get_resolution()[0], self.get_resolution()[1]) + self._temperature_setpoint = None + self._continue_streaming = False + self._streaming_thread: Optional[threading.Thread] = None + self._last_trigger_timestamp = 0 + + # This is for the migration to AbstractCamera. It helps us find methods/properties that + # some cameras had in the pre-AbstractCamera days. + self._missing_methods = {} + + class MissingAttribImpl: + name_to_val = {} + + def __init__(self, name): + self._log = squid.logging.get_logger(f"MissingAttribImpl({name})") + self._val = self.name_to_val.get(name, None) + + def __get__(self, instance, owner): + self._log.debug("Get") + return self._val + + def __set__(self, instance, value): + self._log.debug(f"Set={value}") + self._val = value + + def __call__(self, *args, **kwargs): + kwarg_pairs = [f"{k}={v}" for (k, v) in kwargs.items()] + args_str = [str(a) for a in args] + self._log.debug(f"Called(*args, **kwargs) -> Called({','.join(args_str)}, {','.join(kwarg_pairs)}") + return self._val + + def __getattr__(self, item): + self._log.warning(f"Creating placeholder missing method: {item}") + return self._missing_methods.get(item, SimulatedCamera.MissingAttribImpl(item)) + + @debug_log + def set_exposure_time(self, exposure_time_ms: float): + self._exposure_time = exposure_time_ms + + @debug_log + def get_exposure_time(self) -> float: + return self._exposure_time + + @debug_log + def get_strobe_time(self): + return 3 # Just some arbitrary non-zero number so we test code that relies on this. + + @debug_log + def get_exposure_limits(self) -> Tuple[float, float]: + return 1, 1000 + + @debug_log + def set_frame_format(self, frame_format: CameraFrameFormat): + self._frame_format = frame_format + + @debug_log + def get_frame_format(self) -> CameraFrameFormat: + return self._frame_format + + @debug_log + def set_pixel_format(self, pixel_format: CameraPixelFormat): + self._pixel_format = pixel_format + + @debug_log + def get_pixel_format(self) -> CameraPixelFormat: + return self._pixel_format + + @debug_log + def set_resolution(self, width: int, height: int): + self._resolution = (width, height) + + @debug_log + def get_resolution(self) -> Tuple[int, int]: + return self._resolution + + @debug_log + def get_resolutions(self) -> Sequence[Tuple[int, int]]: + return [(1920, 1080), (2000, 2000), (3000, 2000)] + + @debug_log + def set_analog_gain(self, analog_gain: float): + valid_range = self.get_gain_range() + if analog_gain > valid_range.max_gain or analog_gain < valid_range.min_gain: + raise ValueError("Gain outside valid range.") + + self._analog_gain = analog_gain + + @debug_log + def get_analog_gain(self) -> float: + return self._analog_gain + + @debug_log + def get_gain_range(self) -> CameraGainRange: + # Arbitrary, just something to test with + return CameraGainRange(min_gain=0.0, max_gain=100.0, gain_step=2.0) + + def _start_streaming_thread(self): + def stream_fn(): + self._log.info("Starting streaming thread...") + last_frame_time = time.time() + while self._continue_streaming: + time_since = time.time() - last_frame_time + if self.get_exposure_time() - time_since > 0: + time.sleep(self.get_exposure_time() - time_since) + self.send_trigger() + last_frame_time = time.time() + self._log.info("Stopping streaming...") + + self._streaming_thread = threading.Thread(target=stream_fn, daemon=True) + self._streaming_thread.start() + + @debug_log + def start_streaming(self): + self._continue_streaming = True + self._start_streaming_thread() + + @debug_log + def stop_streaming(self): + self._continue_streaming = False + + @debug_log + def get_is_streaming(self): + return self._streaming_thread.is_alive() + + @debug_log + def read_camera_frame(self) -> CameraFrame: + self.send_trigger() + return self._current_frame + + @debug_log + def get_white_balance_gains(self) -> Tuple[float, float, float]: + return self._white_balance_gains + + @debug_log + def set_white_balance_gains(self, red_gain: float, green_gain: float, blue_gain: float): + self._white_balance_gains = (red_gain, green_gain, blue_gain) + + @debug_log + def set_auto_white_balance_gains(self) -> Tuple[float, float, float]: + self.set_white_balance_gains(1.0, 1.0, 1.0) + + return self.get_white_balance_gains() + + @debug_log + def set_black_level(self, black_level: float): + self._black_level = black_level + + @debug_log + def get_black_level(self) -> float: + return self._black_level + + @debug_log + def _set_acquisition_mode_imp(self, acquisition_mode: CameraAcquisitionMode): + self._acquisition_mode = acquisition_mode + + @debug_log + def get_acquisition_mode(self) -> CameraAcquisitionMode: + return self._acquisition_mode + + @debug_log + def send_trigger(self, illumination_time: Optional[float] = None): + (height, width) = self.get_resolution() + if self.get_frame_id() == 1: + if self.get_pixel_format() == CameraPixelFormat.MONO8: + self._current_raw_frame = np.random.randint(255, size=(height, width), dtype=np.uint8) + self._current_raw_frame[height // 2 - 99: height // 2 + 100, width // 2 - 99: width // 2 + 100] = 200 + elif self.get_pixel_format() == CameraPixelFormat.MONO12: + self._current_raw_frame = np.random.randint(4095, size=(height, width), dtype=np.uint16) + self._current_raw_frame[height // 2 - 99: height // 2 + 100, width // 2 - 99: width // 2 + 100] = 200 * 16 + self._current_raw_frame = self._current_raw_frame << 4 + elif self.get_pixel_format() == CameraPixelFormat.MONO16: + self._current_raw_frame = np.random.randint(65535, size=(height, width), dtype=np.uint16) + self._current_raw_frame[height // 2 - 99: height // 2 + 100, width // 2 - 99: width // 2 + 100] = ( + 200 * 256 + ) + else: + raise NotImplementedError(f"Simulated camera does not support pixel_format={self.get_pixel_format()}") + else: + self._current_raw_frame = np.roll(self._current_raw_frame, 10, axis=0) + + self._frame_id += 1 + + self._current_frame = CameraFrame( + frame_id=self._frame_id, + timestamp=time.time(), + frame=self._process_raw_frame(self._current_raw_frame), + frame_format=self.get_frame_format(), + frame_pixel_format=self.get_pixel_format(), + ) + + self._propogate_frame(self._current_frame) + + @debug_log + def get_ready_for_trigger(self) -> bool: + return time.time() - self._last_trigger_timestamp > self.get_exposure_time() + + @debug_log + def set_region_of_interest(self, offset_x: int, offset_y: int, width: int, height: int): + self._roi = (offset_x, offset_y, width, height) + + @debug_log + def get_region_of_interest(self) -> Tuple[int, int, int, int]: + return self._roi + + @debug_log + def set_temperature(self, temperature_deg_c: Optional[float]): + self._temperature_setpoint = temperature_deg_c + + @debug_log + def get_temperature(self) -> float: + return self._temperature_setpoint + + def get_frame_id(self) -> int: + return self._frame_id diff --git a/software/squid/config.py b/software/squid/config.py index b4c58068..725ec51b 100644 --- a/software/squid/config.py +++ b/software/squid/config.py @@ -1,10 +1,11 @@ import enum import math -from typing import Optional +from typing import Optional, Tuple import pydantic import control._def as _def +from control.utils import FlipVariant class DirectionSign(enum.IntEnum): @@ -139,10 +140,117 @@ class StageConfig(pydantic.BaseModel): ), ) -""" -Returns the StageConfig that existed at process startup. -""" - -def get_stage_config(): +def get_stage_config() -> StageConfig: + """ + Returns the StageConfig that existed at process startup. + """ return _stage_config + + +class CameraVariant(enum.Enum): + TOUPCAM = "TOUPCAM" + FLIR = "FLIR" + HAMAMATSU = "HAMAMATSU" + IDS = "IDS" + TUCSEN = "TUCSEN" + TIS = "TIS" + GXIPY = "GXIPY" + + +class CameraPixelFormat(enum.Enum): + """ + This is all known Pixel Formats in the Cephla world, but not all cameras will support + all of these. + """ + + MONO8 = "MONO8" + MONO12 = "MONO12" + MONO14 = "MONO14" + MONO16 = "MONO16" + RGB24 = "RGB24" + RGB32 = "RGB32" + RGB48 = "RGB48" + + @staticmethod + def is_color_format(pixel_format): + return pixel_format in (CameraPixelFormat.RGB24, CameraPixelFormat.RGB32, CameraPixelFormat.RGB48) + + +# TODO/NOTE(imo): We may need to add a model attrib here. +class CameraConfig(pydantic.BaseModel): + """ + Most camera parameters are runtime configurable, so CameraConfig is more about defining what + camera must be available and used for a particular function in the system. + + If we want to capture the settings a camera used for a particular capture, another model called + CameraState, or something, might be more appropriate. + """ + + # NOTE(imo): Not "type" because that's a python builtin and can cause confusion + camera_type: CameraVariant + + default_resolution: Tuple[int, int] + + default_pixel_format: CameraPixelFormat + + # The angle the camera should rotate this image right as it comes off the camera, + # and before giving it to the rest of the system. + # + # NOTE(imo): As of 2025-feb-17, this feature is inconsistently implemented! + rotate_image_angle: Optional[float] + + # After rotation, the flip we should do to the image. + # + # NOTE(imo): As of 2025-feb-17, this feature is inconsistently implemented! + flip: Optional[FlipVariant] + + +def _old_camera_variant_to_enum(old_string) -> CameraVariant: + if old_string == "Toupcam": + return CameraVariant.TOUPCAM + elif old_string == "FLIR": + return CameraVariant.FLIR + elif old_string == "Hamamatsu": + return CameraVariant.HAMAMATSU + elif old_string == "iDS": + return CameraVariant.IDS + elif old_string == "TIS": + return CameraVariant.TIS + elif old_string == "Tucsen": + return CameraVariant.TUCSEN + elif old_string == "Default": + return CameraVariant.GXIPY + raise ValueError(f"Unknown old camera type {old_string=}") + + +_camera_config = CameraConfig( + camera_type=_old_camera_variant_to_enum(_def.CAMERA_TYPE), + default_resolution=(_def.CAMERA_CONFIG.ROI_WIDTH_DEFAULT, _def.CAMERA_CONFIG.ROI_HEIGHT_DEFAULT), + default_pixel_format=_def.DEFAULT_PIXEL_FORMAT, + rotate_image_angle=_def.ROTATE_IMAGE_ANGLE, + flip=None, +) + + +def get_camera_config() -> CameraConfig: + """ + Returns the CameraConfig that existed at process startup. + """ + return _camera_config + + +_autofocus_camera_config = CameraConfig( + camera_type=_old_camera_variant_to_enum(_def.FOCUS_CAMERA_TYPE), + default_resolution=(_def.LASER_AF_CROP_WIDTH, _def.LASER_AF_CROP_HEIGHT), + default_pixel_format=CameraPixelFormat.MONO8, + rotate_image_angle=None, + flip=None, +) + + +def get_autofocus_camera_config() -> CameraConfig: + """ + Returns the CameraConfig that existed at startup for the laser autofocus system. + """ + return _autofocus_camera_config diff --git a/software/tests/squid/test_camera.py b/software/tests/squid/test_camera.py new file mode 100644 index 00000000..d834c811 --- /dev/null +++ b/software/tests/squid/test_camera.py @@ -0,0 +1,23 @@ +import squid.camera.utils +import squid.config + + +def test_create_simulated_camera(): + sim_cam = squid.camera.utils.get_camera(squid.config.get_camera_config(), simulated=True) + + +def test_simulated_camera(): + sim_cam = squid.camera.utils.get_camera(squid.config.get_camera_config(), simulated=True) + + # Really basic tests to make sure the simulated camera does what is expected. + assert sim_cam.read_frame() is not None + frame_id = sim_cam.get_frame_id() + assert sim_cam.read_frame() is not None + assert sim_cam.get_frame_id() != frame_id + + frame = sim_cam.read_frame() + (frame_width, frame_height, *_) = frame.shape + (res_width, res_height) = sim_cam.get_resolution() + + assert frame_width == res_width + assert frame_height == res_height