From 8c995747c180049560f342aa0ed5f84693f15be3 Mon Sep 17 00:00:00 2001 From: You Yan Date: Mon, 6 Jan 2025 20:38:31 -0800 Subject: [PATCH 1/3] Prior stage with abstract stage class (untested) --- software/control/gui_hcs.py | 17 +-- software/squid/stage/prior.py | 196 +++++++++++++++++++++++++++++++--- 2 files changed, 191 insertions(+), 22 deletions(-) diff --git a/software/control/gui_hcs.py b/software/control/gui_hcs.py index b99ad209..5e4ee675 100644 --- a/software/control/gui_hcs.py +++ b/software/control/gui_hcs.py @@ -78,9 +78,6 @@ else: import control.camera as camera_fc -if USE_PRIOR_STAGE: - from control.stage_prior import PriorStage - import control.core.core as core import control.microcontroller as microcontroller import control.serial_peripherals as serial_peripherals @@ -199,9 +196,17 @@ def loadObjects(self, is_simulation): self.liveController = core.LiveController( self.camera, self.microcontroller, self.configurationManager, self.illuminationController, parent=self ) - self.stage: squid.abc.AbstractStage = squid.stage.cephla.CephlaStage( - microcontroller=self.microcontroller, stage_config=squid.config.get_stage_config() - ) + + if USE_PRIOR_STAGE: + self.stage: squid.abc.AbstractStage = squid.stage.prior.PriorStage( + sn=PRIOR_STAGE_SN + ) + + else: + self.stage: squid.abc.AbstractStage = squid.stage.cephla.CephlaStage( + microcontroller=self.microcontroller, stage_config=squid.config.get_stage_config() + ) + self.slidePositionController = core.SlidePositionController( self.stage, self.liveController, is_for_wellplate=True ) diff --git a/software/squid/stage/prior.py b/software/squid/stage/prior.py index 03805444..413c0fcc 100644 --- a/software/squid/stage/prior.py +++ b/software/squid/stage/prior.py @@ -8,42 +8,206 @@ # implemented the cephla stage. As soon as we roll the interface out and get past the point of major refactors # to use it (we want to get past that point as fast as possible!), we'll come back to implement this. class PriorStage(AbstractStage): - def __init__(self, stage_config: StageConfig): - self._not_impl() - - def _not_impl(self): - raise NotImplementedError("The Prior Stage is not yet implemented!") + def __init__(self, sn: str, baudrate: int = 115200, stage_config: StageConfig = None): + # We are not using StageConfig for Prior stage now. Waiting for further update/clarification of this part + super().__init__(stage_config) + + port = [p.device for p in serial.tools.list_ports.comports() if sn == p.serial_number] + self.serial = serial.Serial(port[0], baudrate=baudrate, timeout=timeout) + self.current_baudrate = baudrate + + # Position information + self.x_pos = 0 + self.y_pos = 0 + self.z_pos = 0 # Always 0 for Prior stage + self.theta_pos = 0 # Always 0 for Prior stage + + # Button and switch state + self.button_and_switch_state = 0 + self.joystick_button_pressed = 0 + self.signal_joystick_button_pressed_event = False + self.switch_state = 0 + self.joystick_enabled = False + + # Prior-specific properties + self.stage_microsteps_per_mm = 100000 # Stage property + self.user_unit = None + self.stage_model = None + self.stage_limits = None + self.resolution = 0.1 + self.x_direction = 1 # 1 or -1 + self.y_direction = 1 # 1 or -1 + self.speed = 200 # Default value + self.acceleration = 500 # Default value + + self.serial_lock = threading.Lock() + + self.set_baudrate(baudrate) + + self._initialize() + + def set_baudrate(self, baud: int): + allowed_baudrates = {9600: "96", 19200: "19", 38400: "38", 115200: "115"} + if baud not in allowed_baudrates: + print("Baudrate not allowed. Setting baudrate to 9600") + baud_command = "BAUD 96" + else: + baud_command = "BAUD " + allowed_baudrates[baud] + print(baud_command) + + for bd in allowed_baudrates: + self.serial.baudrate = bd + self.serial.write(b"\r") + time.sleep(0.1) + self.serial.flushInput() + + self._send_command(baud_command) + + self.serial.baudrate = baud + + try: + test_response = self._send_command("$") # Send a simple query command + if not test_response: + raise Exception("No response received after changing baud rate") + else: + self.current_baudrate = baud + print(f"Baud rate successfully changed to {baud}") + return + except Exception as e: + # If verification fails, try to revert to the original baud rate + self.serial.baudrate = self.current_baudrate + print(f"Serial baudrate: {bd}") + print(f"Failed to verify communication at new baud rate: {e}") + + raise Exception("Failed to set baudrate.") + + def _initialize(self): + self._send_command("COMP 0") # Set to standard mode + self._send_command("BLSH 1") # Enable backlash correction + self._send_command("RES,S," + str(self.resolution)) # Set resolution + response = self._send_command("H 0") # Joystick enabled + self.joystick_enabled = True + self.user_unit = self.stage_microsteps_per_mm * self.resolution + self.get_stage_info() + self.set_acceleration(self.acceleration) + self.set_max_speed(self.speed) + + def _send_command(self, command: str) -> str: + with self.serial_lock: + self.serial.write(f"{command}\r".encode()) + response = self.serial.readline().decode().strip() + if response.startswith("E"): + raise Exception(f"Error from controller: {response}") + return response + + def get_stage_info(self): + stage_info = self._send_command("STAGE") + self.stage_model = re.search(r"STAGE\s*=\s*(\S+)", stage_info).group(1) + print("Stage model: ", self.stage_model) + + def mm_to_steps(self, mm: float): + return int(mm * self.user_unit) + + def steps_to_mm(self, steps: int): + return steps / self.user_unit def move_x(self, rel_mm: float, blocking: bool = True): - self._not_impl() + steps = self.mm_to_steps(rel_mm) + steps = steps * self.x_direction + self._send_command(f"GR {steps},0") + if blocking: + self.wait_for_stop() + else: + threading.Thread(target=self.wait_for_stop, daemon=True).start() def move_y(self, rel_mm: float, blocking: bool = True): - self._not_impl() - + steps = self.mm_to_steps(rel_mm) + steps = steps * self.y_direction + self._send_command(f"GR 0,{steps}") + if blocking: + self.wait_for_stop() + else: + threading.Thread(target=self.wait_for_stop, daemon=True).start() + def move_z(self, rel_mm: float, blocking: bool = True): - self._not_impl() + pass def move_x_to(self, abs_mm: float, blocking: bool = True): - self._not_impl() + steps = self.mm_to_steps(abs_mm) + steps = steps * self.x_direction + self._send_command(f"GX {steps}") + if blocking: + self.wait_for_stop() + else: + threading.Thread(target=self.wait_for_stop, daemon=True).start() def move_y_to(self, abs_mm: float, blocking: bool = True): - self._not_impl() + steps = self.mm_to_steps(abs_mm) + steps = steps * self.y_direction + self._send_command(f"GY {steps}") + if blocking: + self.wait_for_stop() + else: + threading.Thread(target=self.wait_for_stop, daemon=True).start() def move_z_to(self, abs_mm: float, blocking: bool = True): - self._not_impl() + pass def get_pos(self) -> Pos: - self._not_impl() + response = self._send_command("P") + x, y, z = map(int, response.split(",")) + self.x_pos = x + self.y_pos = y + x_mm = self.steps_to_mm(x) + y_mm = self.steps_to_mm(y) + return Pos(x_mm=x_mm, y_mm=y_mm, z_mm=0, theta_rad=0) def get_state(self) -> StageStage: - self._not_impl() + if int(self._send_command("$,S")) == 0: + return StageStage(busy=False) + else: + return StageStage(busy=True) def home(self, x: bool, y: bool, z: bool, theta: bool, blocking: bool = True): - self._not_impl() + if x and y: + self._send_command("M") # 'M' command moves stage to (0,0,0) + self.wait_for_stop() + self.x_pos = 0 + self.y_pos = 0 + elif x: + self.move_x(-self.steps_to_mm(self.x_pos)) + self.x_pos = 0 + elif y: + self.move_y(-self.steps_to_mm(self.y_pos)) + self.y_pos = 0 + if blocking: + self.wait_for_stop() + + # We are not using the following for Prior stage yet + ''' + if z: + self._microcontroller.home_z() + if blocking: + self._microcontroller.wait_till_operation_is_completed(z_timeout) + + if theta: + self._microcontroller.home_theta() + if blocking: + self._microcontroller.wait_till_operation_is_completed(theta_timeout) + ''' def zero(self, x: bool, y: bool, z: bool, theta: bool, blocking: bool = True): self._not_impl() + def wait_for_stop(self): + while True: + status = int(self._send_command("$,S")) + if status == 0: + self.get_pos() + print("xy position: ", self.x_pos, self.y_pos) + break + time.sleep(0.05) + def set_limits( self, x_pos_mm: Optional[float] = None, @@ -55,7 +219,7 @@ def set_limits( theta_pos_rad: Optional[float] = None, theta_neg_rad: Optional[float] = None, ): - self._not_impl() + pass def get_config(self) -> StageConfig: return super().get_config() From d2aed0fb640c2d468892f02ab74c5d52afef7bbb Mon Sep 17 00:00:00 2001 From: Alpaca233 Date: Tue, 7 Jan 2025 13:09:25 -0800 Subject: [PATCH 2/3] Prior stage tested --- software/control/gui_hcs.py | 13 ++--- software/control/widgets.py | 6 +-- software/squid/stage/cephla.py | 9 ++++ software/squid/stage/prior.py | 94 +++++++++++++++++++++++++--------- software/squid/stage/utils.py | 21 ++++---- 5 files changed, 96 insertions(+), 47 deletions(-) diff --git a/software/control/gui_hcs.py b/software/control/gui_hcs.py index 5e4ee675..368b4089 100644 --- a/software/control/gui_hcs.py +++ b/software/control/gui_hcs.py @@ -19,8 +19,6 @@ import squid.abc import squid.logging import squid.config -import squid.stage.prior -import squid.stage.cephla import squid.stage.utils import control.microscope from control.microscope import LightSourceType, IntensityControlMode, ShutterControlMode @@ -29,6 +27,10 @@ import control.filterwheel as filterwheel +if USE_PRIOR_STAGE: + import squid.stage.prior +else: + import squid.stage.cephla if CAMERA_TYPE == "Toupcam": try: @@ -414,13 +416,6 @@ def loadHardwareObjects(self): self.log.error("Error initializing Optospin Emission Filter Wheel") raise - if USE_PRIOR_STAGE: - try: - self.priorstage = PriorStage(PRIOR_STAGE_SN, parent=self) - except Exception: - self.log.error("Error initializing Prior Stage") - raise - def setupHardware(self): # Setup hardware components if USE_ZABER_EMISSION_FILTER_WHEEL: diff --git a/software/control/widgets.py b/software/control/widgets.py index deb7fa1f..e74a8173 100644 --- a/software/control/widgets.py +++ b/software/control/widgets.py @@ -1541,17 +1541,17 @@ def move_z_backward(self): self.stage.move_z(-self.entry_dZ.value() / 1000) def set_deltaX(self, value): - mm_per_ustep = 1.0 / self.stage.get_config().X_AXIS.convert_real_units_to_ustep(1.0) + mm_per_ustep = 1.0 / self.stage.x_mm_to_usteps(1.0) deltaX = round(value / mm_per_ustep) * mm_per_ustep self.entry_dX.setValue(deltaX) def set_deltaY(self, value): - mm_per_ustep = 1.0 / self.stage.get_config().Y_AXIS.convert_real_units_to_ustep(1.0) + mm_per_ustep = 1.0 / self.stage.y_mm_to_usteps(1.0) deltaY = round(value / mm_per_ustep) * mm_per_ustep self.entry_dY.setValue(deltaY) def set_deltaZ(self, value): - mm_per_ustep = 1.0 / self.stage.get_config().Z_AXIS.convert_real_units_to_ustep(1.0) + mm_per_ustep = 1.0 / self.stage.z_mm_to_usteps(1.0) deltaZ = round(value / 1000 / mm_per_ustep) * mm_per_ustep * 1000 self.entry_dZ.setValue(deltaZ) diff --git a/software/squid/stage/cephla.py b/software/squid/stage/cephla.py index 90cf9948..b8f306be 100644 --- a/software/squid/stage/cephla.py +++ b/software/squid/stage/cephla.py @@ -38,6 +38,15 @@ def _configure_axis(self, microcontroller_axis_number: int, axis_config: AxisCon ) self._microcontroller.turn_on_stage_pid(microcontroller_axis_number) + def x_mm_to_usteps(self, mm: float): + return self._config.X_AXIS.convert_real_units_to_ustep(mm) + + def y_mm_to_usteps(self, mm: float): + return self._config.Y_AXIS.convert_real_units_to_ustep(mm) + + def z_mm_to_usteps(self, mm: float): + return self._config.Z_AXIS.convert_real_units_to_ustep(mm) + def move_x(self, rel_mm: float, blocking: bool = True): self._microcontroller.move_x_usteps(self._config.X_AXIS.convert_real_units_to_ustep(rel_mm)) if blocking: diff --git a/software/squid/stage/prior.py b/software/squid/stage/prior.py index 413c0fcc..0303cc6e 100644 --- a/software/squid/stage/prior.py +++ b/software/squid/stage/prior.py @@ -1,4 +1,8 @@ from typing import Optional +import serial +import threading +import time +import re from squid.abc import AbstractStage, Pos, StageStage from squid.config import StageConfig @@ -13,7 +17,7 @@ def __init__(self, sn: str, baudrate: int = 115200, stage_config: StageConfig = super().__init__(stage_config) port = [p.device for p in serial.tools.list_ports.comports() if sn == p.serial_number] - self.serial = serial.Serial(port[0], baudrate=baudrate, timeout=timeout) + self.serial = serial.Serial(port[0], baudrate=baudrate, timeout=0.1) self.current_baudrate = baudrate # Position information @@ -37,8 +41,8 @@ def __init__(self, sn: str, baudrate: int = 115200, stage_config: StageConfig = self.resolution = 0.1 self.x_direction = 1 # 1 or -1 self.y_direction = 1 # 1 or -1 - self.speed = 200 # Default value - self.acceleration = 500 # Default value + self.speed = 100 + self.acceleration = 100 self.serial_lock = threading.Lock() @@ -84,11 +88,13 @@ def set_baudrate(self, baud: int): def _initialize(self): self._send_command("COMP 0") # Set to standard mode self._send_command("BLSH 1") # Enable backlash correction + self._send_command('XD -1') # Set direction of X axis move + self._send_command('YD -1') # Set direction of Y axis move self._send_command("RES,S," + str(self.resolution)) # Set resolution response = self._send_command("H 0") # Joystick enabled self.joystick_enabled = True self.user_unit = self.stage_microsteps_per_mm * self.resolution - self.get_stage_info() + # self.get_stage_info() self.set_acceleration(self.acceleration) self.set_max_speed(self.speed) @@ -105,14 +111,60 @@ def get_stage_info(self): self.stage_model = re.search(r"STAGE\s*=\s*(\S+)", stage_info).group(1) print("Stage model: ", self.stage_model) - def mm_to_steps(self, mm: float): + def set_max_speed(self, speed=1000): + """Set the maximum speed of the stage. Range is 1 to 1000.""" + if 1 <= speed <= 1000: + response = self._send_command(f"SMS {speed}") + print(f"Maximum speed set to {speed}. Response: {response}") + else: + raise ValueError("Speed must be between 1 and 1000") + + def get_max_speed(self): + """Get the current maximum speed setting.""" + response = self._send_command("SMS") + print(f"Current maximum speed: {response}") + return int(response) + + def set_acceleration(self, acceleration=1000): + """Set the acceleration of the stage. Range is 1 to 1000.""" + if 1 <= acceleration <= 1000: + response = self._send_command(f"SAS {acceleration}") + self.acceleration = acceleration + print(f"Acceleration set to {acceleration}. Response: {response}") + else: + raise ValueError("Acceleration must be between 1 and 1000") + + def enable_joystick(self): + self._send_command("J") + self.joystick_enabled = True + + def disable_joystick(self): + self._send_command("H") + self.joystick_enabled = False + + def get_acceleration(self): + """Get the current acceleration setting.""" + response = self.send_command("SAS") + print(f"Current acceleration: {response}") + return int(response) + + def _mm_to_steps(self, mm: float): return int(mm * self.user_unit) - def steps_to_mm(self, steps: int): + def _steps_to_mm(self, steps: int): return steps / self.user_unit + def x_mm_to_usteps(self, mm: float): + return self._mm_to_steps(mm) + + def y_mm_to_usteps(self, mm: float): + return self._mm_to_steps(mm) + + def z_mm_to_usteps(self, mm: float): + return 0 + def move_x(self, rel_mm: float, blocking: bool = True): - steps = self.mm_to_steps(rel_mm) + steps = self._mm_to_steps(rel_mm) steps = steps * self.x_direction self._send_command(f"GR {steps},0") if blocking: @@ -121,7 +173,7 @@ def move_x(self, rel_mm: float, blocking: bool = True): threading.Thread(target=self.wait_for_stop, daemon=True).start() def move_y(self, rel_mm: float, blocking: bool = True): - steps = self.mm_to_steps(rel_mm) + steps = self._mm_to_steps(rel_mm) steps = steps * self.y_direction self._send_command(f"GR 0,{steps}") if blocking: @@ -133,7 +185,7 @@ def move_z(self, rel_mm: float, blocking: bool = True): pass def move_x_to(self, abs_mm: float, blocking: bool = True): - steps = self.mm_to_steps(abs_mm) + steps = self._mm_to_steps(abs_mm) steps = steps * self.x_direction self._send_command(f"GX {steps}") if blocking: @@ -142,7 +194,7 @@ def move_x_to(self, abs_mm: float, blocking: bool = True): threading.Thread(target=self.wait_for_stop, daemon=True).start() def move_y_to(self, abs_mm: float, blocking: bool = True): - steps = self.mm_to_steps(abs_mm) + steps = self._mm_to_steps(abs_mm) steps = steps * self.y_direction self._send_command(f"GY {steps}") if blocking: @@ -158,8 +210,8 @@ def get_pos(self) -> Pos: x, y, z = map(int, response.split(",")) self.x_pos = x self.y_pos = y - x_mm = self.steps_to_mm(x) - y_mm = self.steps_to_mm(y) + x_mm = self._steps_to_mm(x) + y_mm = self._steps_to_mm(y) return Pos(x_mm=x_mm, y_mm=y_mm, z_mm=0, theta_rad=0) def get_state(self) -> StageStage: @@ -169,17 +221,7 @@ def get_state(self) -> StageStage: return StageStage(busy=True) def home(self, x: bool, y: bool, z: bool, theta: bool, blocking: bool = True): - if x and y: - self._send_command("M") # 'M' command moves stage to (0,0,0) - self.wait_for_stop() - self.x_pos = 0 - self.y_pos = 0 - elif x: - self.move_x(-self.steps_to_mm(self.x_pos)) - self.x_pos = 0 - elif y: - self.move_y(-self.steps_to_mm(self.y_pos)) - self.y_pos = 0 + self._send_command('SIS') if blocking: self.wait_for_stop() @@ -197,14 +239,16 @@ def home(self, x: bool, y: bool, z: bool, theta: bool, blocking: bool = True): ''' def zero(self, x: bool, y: bool, z: bool, theta: bool, blocking: bool = True): - self._not_impl() + self.send_command("Z") + self.x_pos = 0 + self.y_pos = 0 def wait_for_stop(self): while True: status = int(self._send_command("$,S")) if status == 0: self.get_pos() - print("xy position: ", self.x_pos, self.y_pos) + # print("xy position: ", self.x_pos, self.y_pos) break time.sleep(0.05) diff --git a/software/squid/stage/utils.py b/software/squid/stage/utils.py index c3b32609..0b599b09 100644 --- a/software/squid/stage/utils.py +++ b/software/squid/stage/utils.py @@ -36,16 +36,17 @@ def get_cached_position(cache_path=_DEFAULT_CACHE_PATH) -> Optional[Pos]: def cache_position(pos: Pos, stage_config: StageConfig, cache_path=_DEFAULT_CACHE_PATH): - x_min = stage_config.X_AXIS.MIN_POSITION - x_max = stage_config.X_AXIS.MAX_POSITION - y_min = stage_config.Y_AXIS.MIN_POSITION - y_max = stage_config.Y_AXIS.MAX_POSITION - z_min = stage_config.Z_AXIS.MIN_POSITION - z_max = stage_config.Z_AXIS.MAX_POSITION - if not (x_min <= pos.x_mm <= x_max and y_min <= pos.y_mm <= y_max and z_min <= pos.z_mm <= z_max): - raise ValueError( - f"Position {pos} is not cacheable because it is outside of the min/max of at least one axis. x_range=({x_min}, {x_max}), y_range=({y_min}, {y_max}), z_range=({z_min}, {z_max})" - ) + if stage_config is not None: # StageConfig not implemented for Prior stage + x_min = stage_config.X_AXIS.MIN_POSITION + x_max = stage_config.X_AXIS.MAX_POSITION + y_min = stage_config.Y_AXIS.MIN_POSITION + y_max = stage_config.Y_AXIS.MAX_POSITION + z_min = stage_config.Z_AXIS.MIN_POSITION + z_max = stage_config.Z_AXIS.MAX_POSITION + if not (x_min <= pos.x_mm <= x_max and y_min <= pos.y_mm <= y_max and z_min <= pos.z_mm <= z_max): + raise ValueError( + f"Position {pos} is not cacheable because it is outside of the min/max of at least one axis. x_range=({x_min}, {x_max}), y_range=({y_min}, {y_max}), z_range=({z_min}, {z_max})" + ) with open(cache_path, "w") as f: _log.debug(f"Writing position={pos} to cache path='{cache_path}'") f.write(",".join([str(pos.x_mm), str(pos.y_mm), str(pos.z_mm)])) From 33d906463f966cb00f551c619ff7e2e891809476 Mon Sep 17 00:00:00 2001 From: You Yan Date: Tue, 7 Jan 2025 13:15:34 -0800 Subject: [PATCH 3/3] Remove old Prior stage class --- software/control/stage_prior.py | 272 -------------------------------- 1 file changed, 272 deletions(-) delete mode 100644 software/control/stage_prior.py diff --git a/software/control/stage_prior.py b/software/control/stage_prior.py deleted file mode 100644 index 2739949a..00000000 --- a/software/control/stage_prior.py +++ /dev/null @@ -1,272 +0,0 @@ -import serial -import time -import re -import threading - - -class PriorStage: - def __init__(self, sn, baudrate=115200, timeout=0.1, parent=None): - port = [p.device for p in serial.tools.list_ports.comports() if sn == p.serial_number] - self.serial = serial.Serial(port[0], baudrate=baudrate, timeout=timeout) - self.current_baudrate = baudrate - - # Position information - self.x_pos = 0 - self.y_pos = 0 - self.z_pos = 0 # Always 0 for Prior stage - self.theta_pos = 0 # Always 0 for Prior stage - - # Button and switch state - self.button_and_switch_state = 0 - self.joystick_button_pressed = 0 - self.signal_joystick_button_pressed_event = False - self.switch_state = 0 - self.joystick_enabled = False - - # Prior-specific properties - self.stage_microsteps_per_mm = 100000 # Stage property - self.user_unit = None - self.stage_model = None - self.stage_limits = None - self.resolution = 0.1 - self.x_direction = 1 # 1 or -1 - self.y_direction = 1 # 1 or -1 - self.speed = 200 # Default value - self.acceleration = 500 # Default value - - # Position updating callback - self.pos_callback_external = None - self.serial_lock = threading.Lock() - self.position_updating_event = threading.Event() - self.position_updating_thread = threading.Thread(target=self.return_position_info, daemon=True) - - self.set_baudrate(baudrate) - - self.initialize() - self.position_updating_thread.start() - - def set_baudrate(self, baud): - allowed_baudrates = {9600: "96", 19200: "19", 38400: "38", 115200: "115"} - if baud not in allowed_baudrates: - print("Baudrate not allowed. Setting baudrate to 9600") - baud_command = "BAUD 96" - else: - baud_command = "BAUD " + allowed_baudrates[baud] - print(baud_command) - - for bd in allowed_baudrates: - self.serial.baudrate = bd - self.serial.write(b"\r") - time.sleep(0.1) - self.serial.flushInput() - - self.send_command(baud_command) - - self.serial.baudrate = baud - - try: - test_response = self.send_command("$") # Send a simple query command - if not test_response: - raise Exception("No response received after changing baud rate") - else: - self.current_baudrate = baud - print(f"Baud rate successfully changed to {baud}") - return - except Exception as e: - # If verification fails, try to revert to the original baud rate - self.serial.baudrate = self.current_baudrate - print(f"Serial baudrate: {bd}") - print(f"Failed to verify communication at new baud rate: {e}") - - raise Exception("Failed to set baudrate.") - - def initialize(self): - self.send_command("COMP 0") # Set to standard mode - self.send_command("BLSH 1") # Enable backlash correction - self.send_command("RES,S," + str(self.resolution)) # Set resolution - response = self.send_command("H 0") # Joystick enabled - self.joystick_enabled = True - self.user_unit = self.stage_microsteps_per_mm * self.resolution - self.get_stage_info() - self.set_acceleration(self.acceleration) - self.set_max_speed(self.speed) - - def send_command(self, command): - with self.serial_lock: - self.serial.write(f"{command}\r".encode()) - response = self.serial.readline().decode().strip() - if response.startswith("E"): - raise Exception(f"Error from controller: {response}") - return response - - def get_stage_info(self): - stage_info = self.send_command("STAGE") - self.stage_model = re.search(r"STAGE\s*=\s*(\S+)", stage_info).group(1) - print("Stage model: ", self.stage_model) - - def mm_to_steps(self, mm): - return int(mm * self.user_unit) - - def steps_to_mm(self, steps): - return steps / self.user_unit - - def set_max_speed(self, speed=1000): - """Set the maximum speed of the stage. Range is 1 to 1000.""" - if 1 <= speed <= 1000: - response = self.send_command(f"SMS {speed}") - print(f"Maximum speed set to {speed}. Response: {response}") - else: - raise ValueError("Speed must be between 1 and 1000") - - def get_max_speed(self): - """Get the current maximum speed setting.""" - response = self.send_command("SMS") - print(f"Current maximum speed: {response}") - return int(response) - - def set_acceleration(self, acceleration=1000): - """Set the acceleration of the stage. Range is 1 to 1000.""" - if 1 <= acceleration <= 1000: - response = self.send_command(f"SAS {acceleration}") - self.acceleration = acceleration - print(f"Acceleration set to {acceleration}. Response: {response}") - else: - raise ValueError("Acceleration must be between 1 and 1000") - - def get_acceleration(self): - """Get the current acceleration setting.""" - response = self.send_command("SAS") - print(f"Current acceleration: {response}") - return int(response) - - def set_callback(self, function): - self.pos_callback_external = function - - def return_position_info(self): - while not self.position_updating_event.is_set(): - if self.pos_callback_external is not None: - self.pos_callback_external(self) - - def home_xy(self): - """Home the XY stage.""" - self.send_command("M") # 'M' command moves stage to (0,0,0) - self.wait_for_stop() - self.x_pos = 0 - self.y_pos = 0 - print("finished homing") - - def home_x(self): - self.move_relative(-self.x_pos, 0) - self.x_pos = 0 - - def home_y(self): - self.move_relative(0, -self.y_pos) - self.y_pos = 0 - - def zero_xy(self): - self.send_command("Z") - self.x_pos = 0 - self.y_pos = 0 - - def zero_x(self): - self.set_pos(0, self.y_pos) - - def zero_y(self): - self.set_pos(self.x_pos, 0) - - def get_pos(self): - response = self.send_command("P") - x, y, z = map(int, response.split(",")) - self.x_pos = x - self.y_pos = y - return x, y, 0, 0 # Z and theta are 0 - - def set_pos(self, x, y, z=0): - self.send_command(f"P {x},{y},{z}") - self.x_pos = x - self.y_pos = y - - def move_relative_mm(self, x_mm, y_mm): - x_steps = self.mm_to_steps(x_mm) - y_steps = self.mm_to_steps(y_mm) - return self.move_relative(x_steps, y_steps) - - def move_absolute_mm(self, x_mm, y_mm): - x_steps = self.mm_to_steps(x_mm) - y_steps = self.mm_to_steps(y_mm) - return self.move_absolute(x_steps, y_steps) - - def move_absolute_x_mm(self, x_mm): - x_steps = self.mm_to_steps(x_mm) - return self.move_absolute_x(x_steps) - - def move_absolute_y_mm(self, y_mm): - y_steps = self.mm_to_steps(y_mm) - return self.move_absolute_y(y_steps) - - def move_relative(self, x, y, blocking=True): - x = x * self.x_direction - y = y * self.y_direction - self.send_command(f"GR {x},{y}") - if blocking: - self.wait_for_stop() - else: - threading.Thread(target=self.wait_for_stop, daemon=True).start() - - def move_absolute(self, x, y, blocking=True): - x = x * self.x_direction - y = y * self.y_direction - self.send_command(f"G {x},{y}") - if blocking: - self.wait_for_stop() - else: - threading.Thread(target=self.wait_for_stop, daemon=True).start() - - def move_absolute_x(self, x, blocking=True): - x = x * self.x_direction - self.send_command(f"GX {x}") - if blocking: - self.wait_for_stop() - else: - threading.Thread(target=self.wait_for_stop, daemon=True).start() - - def move_absolute_y(self, y, blocking=True): - y = y * self.y_direction - self.send_command(f"GY {y}") - if blocking: - self.wait_for_stop() - else: - threading.Thread(target=self.wait_for_stop, daemon=True).start() - - def enable_joystick(self): - self.send_command("J") - self.joystick_enabled = True - - def disable_joystick(self): - self.send_command("H") - self.joystick_enabled = False - - def wait_for_stop(self): - while True: - status = int(self.send_command("$,S")) - if status == 0: - self.get_pos() - print("xy position: ", self.x_pos, self.y_pos) - break - time.sleep(0.05) - - def stop(self): - return self.send_command("K") - - def close(self): - self.disable_joystick() - self.position_updating_event.set() - self.position_updating_thread.join() - self.serial.close() - print("Stage closed") - - def home_z(self): - pass - - def zero_z(self): - pass