Skip to content

Commit

Permalink
More stage config as I find them
Browse files Browse the repository at this point in the history
  • Loading branch information
ianohara committed Dec 10, 2024
1 parent a1ffce1 commit a415d54
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 17 deletions.
26 changes: 26 additions & 0 deletions software/squid/abc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import abc
import time
from typing import Optional

import pydantic

import squid.logging
from squid.config import AxisConfig, StageConfig
from squid.exceptions import SquidTimeout


class Pos(pydantic.BaseModel):
Expand All @@ -16,6 +19,10 @@ class StageStage(pydantic.BaseModel):
busy: bool

class AbstractStage(metaclass=abc.ABCMeta):
def __init__(self, stage_config: StageConfig):
self._config = stage_config
self._log = squid.logging.get_logger(self.__class__.__name__)

@abc.abstractmethod
def move_x(self, rel_mm: float, blocking: bool=True):
pass
Expand All @@ -40,6 +47,11 @@ def move_y_to(self, abs_mm: float, blocking: bool=True):
def move_z_to(self, abs_mm: float, blocking: bool=True):
pass

# TODO(imo): We need a stop or halt or something along these lines
# @abc.abstractmethod
# def stop(self, blocking: bool=True):
# pass

@abc.abstractmethod
def get_pos(self) -> Pos:
pass
Expand Down Expand Up @@ -70,3 +82,17 @@ def set_limits(self,

def get_config(self) -> StageConfig:
pass

def wait_for_idle(self, timeout_s):
start_time = time.time()
while time.time() < start_time + timeout_s:
if not self.get_state().busy:
return
# Sleep some small amount of time so we can yield to other threads if needed
# while waiting.
time.sleep(0.001)

error_message = f"Timed out waiting after {timeout_s:0.3f} [s]"
self._log.error(error_message)

raise SquidTimeout(error_message)
34 changes: 30 additions & 4 deletions software/squid/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ class AxisConfig(pydantic.BaseModel):
# step and so will travel a distance of 2.
MICROSTEPS_PER_STEP: float

# The Max speed the axis is allowed to travel in denoted in its native units. This means mm/s for
# linear axes, and radians/s for rotary axes.
MAX_SPEED: float
MAX_ACCELERATION: float

# The min and maximum position of this axis in its native units. This means mm for linear axes, and
# radians for rotary. `inf` is allowed (for something like a continuous rotary axis)
MIN_POSITION: float
MAX_POSITION: float

def convert_to_real_units(self, usteps: float):
if self.USE_ENCODER:
# TODO(imo): Do we need ENCODER_SIGN here too?
Expand All @@ -53,7 +63,11 @@ class StageConfig(pydantic.BaseModel):
ENCODER_STEP_SIZE=_def.ENCODER_STEP_SIZE_X_MM,
FULL_STEPS_PER_REV=_def.FULLSTEPS_PER_REV_X,
SCREW_PITCH=_def.SCREW_PITCH_X_MM,
MICROSTEPS_PER_STEP=_def.MICROSTEPPING_DEFAULT_X
MICROSTEPS_PER_STEP=_def.MICROSTEPPING_DEFAULT_X,
MAX_SPEED=_def.MAX_VELOCITY_X_mm,
MAX_ACCELERATION=_def.MAX_ACCELERATION_X_mm,
MIN_POSITION=0, # NOTE(imo): Min and Max need adjusting. They are arbitrary right now!
MAX_POSITION=10
),
Y_AXIS=AxisConfig(
MOVEMENT_SIGN=_def.STAGE_MOVEMENT_SIGN_Y,
Expand All @@ -62,7 +76,11 @@ class StageConfig(pydantic.BaseModel):
ENCODER_STEP_SIZE=_def.ENCODER_STEP_SIZE_Y_MM,
FULL_STEPS_PER_REV=_def.FULLSTEPS_PER_REV_Y,
SCREW_PITCH=_def.SCREW_PITCH_Y_MM,
MICROSTEPS_PER_STEP=_def.MICROSTEPPING_DEFAULT_Y
MICROSTEPS_PER_STEP=_def.MICROSTEPPING_DEFAULT_Y,
MAX_SPEED=_def.MAX_VELOCITY_Y_mm,
MAX_ACCELERATION=_def.MAX_ACCELERATION_Y_mm,
MIN_POSITION=0, # NOTE(imo): Min and Max need adjusting. They are arbitrary right now!
MAX_POSITION=10
),
Z_AXIS=AxisConfig(
MOVEMENT_SIGN=_def.STAGE_MOVEMENT_SIGN_Z,
Expand All @@ -71,7 +89,11 @@ class StageConfig(pydantic.BaseModel):
ENCODER_STEP_SIZE=_def.ENCODER_STEP_SIZE_Z_MM,
FULL_STEPS_PER_REV=_def.FULLSTEPS_PER_REV_Z,
SCREW_PITCH=_def.SCREW_PITCH_Z_MM,
MICROSTEPS_PER_STEP=_def.MICROSTEPPING_DEFAULT_Z
MICROSTEPS_PER_STEP=_def.MICROSTEPPING_DEFAULT_Z,
MAX_SPEED=_def.MAX_VELOCITY_Z_mm,
MAX_ACCELERATION=_def.MAX_ACCELERATION_Z_mm,
MIN_POSITION=0, # NOTE(imo): Min and Max need adjusting. They are arbitrary right now!
MAX_POSITION=1
),
THETA_AXIS=AxisConfig(
MOVEMENT_SIGN=_def.STAGE_MOVEMENT_SIGN_THETA,
Expand All @@ -80,7 +102,11 @@ class StageConfig(pydantic.BaseModel):
ENCODER_STEP_SIZE=_def.ENCODER_STEP_SIZE_THETA,
FULL_STEPS_PER_REV=_def.FULLSTEPS_PER_REV_THETA,
SCREW_PITCH=2.0*math.pi/_def.FULLSTEPS_PER_REV_THETA ,
MICROSTEPS_PER_STEP=_def.MICROSTEPPING_DEFAULT_Y
MICROSTEPS_PER_STEP=_def.MICROSTEPPING_DEFAULT_Y,
MAX_SPEED=2.0 * math.pi / 4, # NOTE(imo): I arbitrarily guessed this at 4 sec / rev, so it probably needs adjustment.
MAX_ACCELERATION=_def.MAX_ACCELERATION_X_mm,
MIN_POSITION=0, # NOTE(imo): Min and Max need adjusting. They are arbitrary right now!
MAX_POSITION=2.0 * math.pi / 4
)
)

Expand Down
3 changes: 3 additions & 0 deletions software/squid/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
class SquidError(RuntimeError):
pass

class SquidTimeout(SquidError, TimeoutError):
pass
49 changes: 36 additions & 13 deletions software/squid/stage/cephla.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,59 @@
import math
from typing import Optional

import control.microcontroller
import control._def as _def
import squid.logging
from squid.abc import AbstractStage, Pos, StageStage
from squid.config import StageConfig


class CephlaStage(AbstractStage):
@staticmethod
def _calc_move_timeout(distance, max_speed):
# We arbitrarily guess that if a move takes 3x the naive "infinite acceleration" time, then it
# probably timed out. But always use a minimum timeout of at least 3 seconds.
#
# Also protect against divide by zero.
return max((3, 3 * abs(distance) / min(0.1, abs(max_speed))))

def __init__(self, microcontroller: control.microcontroller.Microcontroller, stage_config: StageConfig):
super().__init__(stage_config)
self._microcontroller = microcontroller
self._config = stage_config
self._log = squid.logging.get_logger(self.__class__.__name__)

def move_x(self, rel_mm: float, blocking: bool = True):
self._microcontroller.move_x_usteps(self._config.X_AXIS.convert_real_units_to_ustep(rel_mm))
if blocking:
self._microcontroller.wait_till_operation_is_completed()
self._microcontroller.wait_till_operation_is_completed(
self._calc_move_timeout(rel_mm, self.get_config().X_AXIS.MAX_SPEED))

def move_y(self, rel_mm: float, blocking: bool = True):
self._microcontroller.move_y_usteps(self._config.Y_AXIS.convert_real_units_to_ustep(rel_mm))
if blocking:
self._microcontroller.wait_till_operation_is_completed()
self._microcontroller.wait_till_operation_is_completed(
self._calc_move_timeout(rel_mm, self.get_config().Y_AXIS.MAX_SPEED))

def move_z(self, rel_mm: float, blocking: bool = True):
self._microcontroller.move_z_usteps(self._config.Z_AXIS.convert_real_units_to_ustep(rel_mm))
if blocking:
self._microcontroller.wait_till_operation_is_completed()
self._microcontroller.wait_till_operation_is_completed(
self._calc_move_timeout(rel_mm, self.get_config().Z_AXIS.MAX_SPEED))

def move_x_to(self, abs_mm: float, blocking: bool = True):
self._microcontroller.move_x_to_usteps(self._config.X_AXIS.convert_real_units_to_ustep(abs_mm))
if blocking:
self._microcontroller.wait_till_operation_is_completed()
self._microcontroller.wait_till_operation_is_completed(
self._calc_move_timeout(abs_mm - self.get_pos().x, self.get_config().X_AXIS.MAX_SPEED))

def move_y_to(self, abs_mm: float, blocking: bool = True):
self._microcontroller.move_y_to_usteps(self._config.Y_AXIS.convert_real_units_to_ustep(abs_mm))
if blocking:
self._microcontroller.wait_till_operation_is_completed()
self._microcontroller.wait_till_operation_is_completed(
self._calc_move_timeout(abs_mm - self.get_pos().y, self.get_config().Y_AXIS.MAX_SPEED))

def move_z_to(self, abs_mm: float, blocking: bool = True):
self._microcontroller.move_z_to_usteps(self._config.Z_AXIS.convert_real_units_to_ustep(abs_mm))
if blocking:
self._microcontroller.wait_till_operation_is_completed()
self._microcontroller.wait_till_operation_is_completed(
self._calc_move_timeout(abs_mm - self.get_pos().z, self.get_config().Z_AXIS.MAX_SPEED))

def get_pos(self) -> Pos:
pos_usteps = self._microcontroller.get_pos()
Expand All @@ -56,24 +68,35 @@ def get_state(self) -> StageStage:
return StageStage(busy=self._microcontroller.is_busy())

def home(self, x: bool, y: bool, z: bool, theta: bool, blocking: bool = True):
# NOTE(imo): Arbitrarily use max speed / 5 for homing speed. It'd be better to have it exactly!
x_timeout = self._calc_move_timeout(
self.get_config().X_AXIS.MAX_POSITION - self.get_config().X_AXIS.MIN_POSITION,
self.get_config().X_AXIS.MAX_SPEED / 5.0)
y_timeout = self._calc_move_timeout(
self.get_config().Y_AXIS.MAX_POSITION - self.get_config().Y_AXIS.MIN_POSITION,
self.get_config().Y_AXIS.MAX_SPEED / 5.0)
z_timeout = self._calc_move_timeout(
self.get_config().Z_AXIS.MAX_POSITION - self.get_config().Z_AXIS.MIN_POSITION,
self.get_config().Z_AXIS.MAX_SPEED / 5.0)
theta_timeout = self._calc_move_timeout(2.0 * math.pi, self.get_config().THETA_AXIS.MAX_SPEED / 5.0)
if x and y:
self._microcontroller.home_xy()
elif x:
self._microcontroller.home_x()
elif y:
self._microcontroller.home_y()
if blocking:
self._microcontroller.wait_till_operation_is_completed()
self._microcontroller.wait_till_operation_is_completed(max(x_timeout, y_timeout))

if z:
self._microcontroller.home_z()
if blocking:
self._microcontroller.wait_till_operation_is_completed()
self._microcontroller.wait_till_operation_is_completed(z_timeout)

if theta:
self._microcontroller.home_theta()
if blocking:
self._microcontroller.wait_till_operation_is_completed()
self._microcontroller.wait_till_operation_is_completed(theta_timeout)

def zero(self, x: bool, y: bool, z: bool, theta: bool, blocking: bool = True):
if x:
Expand Down

0 comments on commit a415d54

Please sign in to comment.