Skip to content

Commit

Permalink
microcontroller: remove is_simulated at Microcontroller level, and in…
Browse files Browse the repository at this point in the history
…stead implement simulated firmware at Serial level (#33)

We added back in an is_simulated flag in the Microcontroller
implementation. This isn't ideal because it meant we had to go and put
`if self.is_simulated:` in all the `Microcontroller` methods and take
custom action in simulation mode. This meant that none of our regular
path code was tested when running in simulation, and that the
`Microcontroller` code had a lot of noise in it to handle the simulation
case.

This moves the simulation handling down to the Serial / simulated
firmware level by handling and responding to `CMD_SET` commands (like
our firmware does) in our simulated serial device.

Tested by: Unit tests
  • Loading branch information
ianohara authored Dec 9, 2024
1 parent 90a3532 commit 352141b
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 55 deletions.
2 changes: 1 addition & 1 deletion software/control/gui_hcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def loadSimulationObjects(self):
self.emission_filter_wheel = serial_peripherals.FilterController_Simulation(115200, 8, serial.PARITY_NONE, serial.STOPBITS_ONE)
if USE_OPTOSPIN_EMISSION_FILTER_WHEEL:
self.emission_filter_wheel = serial_peripherals.Optospin_Simulation(SN=None)
self.microcontroller = microcontroller.Microcontroller(existing_serial=microcontroller.SimSerial(),is_simulation=True)
self.microcontroller = microcontroller.Microcontroller(existing_serial=microcontroller.SimSerial())

def loadHardwareObjects(self):
# Initialize hardware objects
Expand Down
106 changes: 52 additions & 54 deletions software/control/microcontroller.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import platform
import serial
import serial.tools.list_ports
import struct
import threading
import time

import numpy as np
import threading
import serial
import serial.tools.list_ports
from crc import CrcCalculator, Crc8
import struct

from qtpy.QtCore import QTimer

import squid.logging
from control._def import *
Expand Down Expand Up @@ -68,10 +66,49 @@ def __init__(self):

self.closed = False

@staticmethod
def unpack_position(pos_bytes):
return Microcontroller._payload_to_int(pos_bytes, len(pos_bytes))

def respond_to(self, write_bytes):
# Just immediately respond that the command was successful. We can
# implement specific command handlers here in the future for eg:
# correct position tracking and such.
# NOTE: As we need more and more microcontroller simulator functionality, add
# CMD_SET handlers here. Prefer this over adding checks for simulated mode in
# the Microcontroller!
command_byte = write_bytes[1]
# If this is a position related command, these are our position bytes.
position_bytes = write_bytes[2:6]
if command_byte == CMD_SET.MOVE_X:
self.x += self.unpack_position(position_bytes)
elif command_byte == CMD_SET.MOVE_Y:
self.y += self.unpack_position(position_bytes)
elif command_byte == CMD_SET.MOVE_Z:
self.z += self.unpack_position(position_bytes)
elif command_byte == CMD_SET.MOVE_THETA:
self.theta += self.unpack_position(position_bytes)
elif command_byte == CMD_SET.MOVETO_X:
self.x = self.unpack_position(position_bytes)
elif command_byte == CMD_SET.MOVETO_Y:
self.y = self.unpack_position(position_bytes)
elif command_byte == CMD_SET.MOVETO_Z:
self.z = self.unpack_position(position_bytes)
elif command_byte == CMD_SET.HOME_OR_ZERO:
axis = write_bytes[2]
# NOTE: write_bytes[3] might indicate that we only want to "ZERO", but
# in the simulated case zeroing is the same as homing. So don't check
# that here. If we want to simulate the homing motion in the future
# we'd need to do that here.
if axis == AXIS.X:
self.x = 0
elif axis == AXIS.Y:
self.y = 0
elif axis == AXIS.Z:
self.z = 0
elif axis == AXIS.THETA:
self.theta = 0
elif axis == AXIS.XY:
self.x = 0
self.y = 0

self.response_buffer.extend(SimSerial.response_bytes_for(
write_bytes[0],
CMD_EXECUTION_STATUS.COMPLETED_WITHOUT_ERRORS,
Expand Down Expand Up @@ -110,9 +147,7 @@ class Microcontroller:
LAST_COMMAND_ACK_TIMEOUT = 0.5
MAX_RETRY_COUNT = 5

def __init__(self, version='Arduino Due', sn=None, existing_serial=None, is_simulation=False):
self.is_simulation = is_simulation

def __init__(self, version='Arduino Due', sn=None, existing_serial=None):
self.log = squid.logging.get_logger(self.__class__.__name__)

self.tx_buffer_length = MicrocontrollerDef.CMD_LENGTH
Expand Down Expand Up @@ -275,13 +310,9 @@ def _move_axis_usteps(self, usteps, axis_command_code, axis_direction_sign):
self.send_command(cmd)

def move_x_usteps(self,usteps):
if self.is_simulation:
self.x_pos = self.x_pos + STAGE_MOVEMENT_SIGN_X*usteps
self._move_axis_usteps(usteps, CMD_SET.MOVE_X, STAGE_MOVEMENT_SIGN_X)

def move_x_to_usteps(self,usteps):
if self.is_simulation:
self.x_pos = usteps
payload = self._int_to_payload(usteps,4)
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.MOVETO_X
Expand All @@ -292,13 +323,9 @@ def move_x_to_usteps(self,usteps):
self.send_command(cmd)

def move_y_usteps(self,usteps):
if self.is_simulation:
self.y_pos = self.y_pos + STAGE_MOVEMENT_SIGN_Y*usteps
self._move_axis_usteps(usteps, CMD_SET.MOVE_Y, STAGE_MOVEMENT_SIGN_Y)

def move_y_to_usteps(self,usteps):
if self.is_simulation:
self.y_pos = usteps
payload = self._int_to_payload(usteps,4)
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.MOVETO_Y
Expand All @@ -309,13 +336,9 @@ def move_y_to_usteps(self,usteps):
self.send_command(cmd)

def move_z_usteps(self,usteps):
if self.is_simulation:
self.z_pos = self.z_pos + STAGE_MOVEMENT_SIGN_Z*usteps
self._move_axis_usteps(usteps, CMD_SET.MOVE_Z, STAGE_MOVEMENT_SIGN_Z)

def move_z_to_usteps(self,usteps):
if self.is_simulation:
self.z_pos = usteps
payload = self._int_to_payload(usteps,4)
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.MOVETO_Z
Expand All @@ -326,8 +349,6 @@ def move_z_to_usteps(self,usteps):
self.send_command(cmd)

def move_theta_usteps(self,usteps):
if self.is_simulation:
self.theta_pos = self.theta_pos + usteps
self._move_axis_usteps(usteps, CMD_SET.MOVE_THETA, STAGE_MOVEMENT_SIGN_THETA)

def set_off_set_velocity_x(self,off_set_velocity):
Expand Down Expand Up @@ -356,46 +377,34 @@ def set_off_set_velocity_y(self,off_set_velocity):
self.send_command(cmd)

def home_x(self):
if self.is_simulation:
self.x_pos = 0
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.HOME_OR_ZERO
cmd[2] = AXIS.X
cmd[3] = int((STAGE_MOVEMENT_SIGN_X+1)/2) # "move backward" if SIGN is 1, "move forward" if SIGN is -1
self.send_command(cmd)

def home_y(self):
if self.is_simulation:
self.y_pos = 0
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.HOME_OR_ZERO
cmd[2] = AXIS.Y
cmd[3] = int((STAGE_MOVEMENT_SIGN_Y+1)/2) # "move backward" if SIGN is 1, "move forward" if SIGN is -1
self.send_command(cmd)

def home_z(self):
if self.is_simulation:
self.z_pos = 0
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.HOME_OR_ZERO
cmd[2] = AXIS.Z
cmd[3] = int((STAGE_MOVEMENT_SIGN_Z+1)/2) # "move backward" if SIGN is 1, "move forward" if SIGN is -1
self.send_command(cmd)

def home_theta(self):
if self.is_simulation:
self.theta_pos = 0
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.HOME_OR_ZERO
cmd[2] = 3
cmd[3] = int((STAGE_MOVEMENT_SIGN_THETA+1)/2) # "move backward" if SIGN is 1, "move forward" if SIGN is -1
self.send_command(cmd)

def home_xy(self):
if self.is_simulation:
self.x_pos = 0
self.y_pos = 0
self.y_pos = 0
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.HOME_OR_ZERO
cmd[2] = AXIS.XY
Expand All @@ -404,35 +413,27 @@ def home_xy(self):
self.send_command(cmd)

def zero_x(self):
if self.is_simulation:
self.x_pos = 0
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.HOME_OR_ZERO
cmd[2] = AXIS.X
cmd[3] = HOME_OR_ZERO.ZERO
self.send_command(cmd)

def zero_y(self):
if self.is_simulation:
self.y_pos = 0
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.HOME_OR_ZERO
cmd[2] = AXIS.Y
cmd[3] = HOME_OR_ZERO.ZERO
self.send_command(cmd)

def zero_z(self):
if self.is_simulation:
self.z_pos = 0
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.HOME_OR_ZERO
cmd[2] = AXIS.Z
cmd[3] = HOME_OR_ZERO.ZERO
self.send_command(cmd)

def zero_theta(self):
if self.is_simulation:
self.theta_pos = 0
cmd = bytearray(self.tx_buffer_length)
cmd[1] = CMD_SET.HOME_OR_ZERO
cmd[2] = AXIS.THETA
Expand Down Expand Up @@ -698,13 +699,10 @@ def read_received_packet(self):
self.log.error("cmd checksum error, resending command")
self.resend_last_command()

if self.is_simulation:
pass
else:
self.x_pos = self._payload_to_int(msg[2:6],MicrocontrollerDef.N_BYTES_POS) # unit: microstep or encoder resolution
self.y_pos = self._payload_to_int(msg[6:10],MicrocontrollerDef.N_BYTES_POS) # unit: microstep or encoder resolution
self.z_pos = self._payload_to_int(msg[10:14],MicrocontrollerDef.N_BYTES_POS) # unit: microstep or encoder resolution
self.theta_pos = self._payload_to_int(msg[14:18],MicrocontrollerDef.N_BYTES_POS) # unit: microstep or encoder resolution
self.x_pos = self._payload_to_int(msg[2:6],MicrocontrollerDef.N_BYTES_POS) # unit: microstep or encoder resolution
self.y_pos = self._payload_to_int(msg[6:10],MicrocontrollerDef.N_BYTES_POS) # unit: microstep or encoder resolution
self.z_pos = self._payload_to_int(msg[10:14],MicrocontrollerDef.N_BYTES_POS) # unit: microstep or encoder resolution
self.theta_pos = self._payload_to_int(msg[14:18],MicrocontrollerDef.N_BYTES_POS) # unit: microstep or encoder resolution

self.button_and_switch_state = msg[18]
# joystick button
Expand Down
145 changes: 145 additions & 0 deletions software/tests/control/test_microcontroller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import pytest
import control._def
import control.microcontroller

def assert_pos_almost_equal(expected, actual):
assert len(actual) == len(expected)
for (e, a) in zip(expected, actual):
assert a == pytest.approx(e)

def test_create_simulated_microcontroller():
micro = control.microcontroller.Microcontroller(existing_serial=control.microcontroller.SimSerial())

def test_microcontroller_simulated_positions():
micro = control.microcontroller.Microcontroller(existing_serial=control.microcontroller.SimSerial())

micro.move_x_to_usteps(1000)
micro.wait_till_operation_is_completed()
micro.move_y_to_usteps(2000)
micro.wait_till_operation_is_completed()
micro.move_z_to_usteps(3000)
micro.wait_till_operation_is_completed()
micro.move_theta_usteps(4000)
micro.wait_till_operation_is_completed()

assert_pos_almost_equal((1000, 2000, 3000, 4000), micro.get_pos())

micro.home_x()
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((0, 2000, 3000, 4000), micro.get_pos())

micro.home_y()
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((0, 0, 3000, 4000), micro.get_pos())

micro.home_z()
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((0, 0, 0, 4000), micro.get_pos())

micro.home_theta()
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((0, 0, 0, 0), micro.get_pos())

micro.move_x_to_usteps(1000)
micro.wait_till_operation_is_completed()
micro.move_y_to_usteps(2000)
micro.wait_till_operation_is_completed()
micro.move_z_to_usteps(3000)
micro.wait_till_operation_is_completed()
micro.move_theta_usteps(4000)
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((1000, 2000, 3000, 4000), micro.get_pos())

# Multiply by the sign so we get a positive move. The way these relative move helpers work right now
# is that they multiply by the sign, but the read back is not multiplied by the sign. So if
# the movement sign is -1, doing a relative move of 100 will result in the get_pos() value being -100.
#
# NOTE(imo): This seems probably not right, so this might get fixed and this comment might be out of date.
micro.move_x_usteps(control._def.STAGE_MOVEMENT_SIGN_X * 1)
micro.wait_till_operation_is_completed()
micro.move_y_usteps(control._def.STAGE_MOVEMENT_SIGN_Y * 2)
micro.wait_till_operation_is_completed()
micro.move_z_usteps(control._def.STAGE_MOVEMENT_SIGN_Z * 3)
micro.wait_till_operation_is_completed()
micro.move_theta_usteps(control._def.STAGE_MOVEMENT_SIGN_THETA * 4)
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((1001, 2002, 3003, 4004), micro.get_pos())

micro.zero_x()
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((0, 2002, 3003, 4004), micro.get_pos())

micro.zero_y()
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((0, 0, 3003, 4004), micro.get_pos())

micro.zero_z()
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((0, 0, 0, 4004), micro.get_pos())

micro.zero_theta()
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((0, 0, 0, 0), micro.get_pos())

micro.move_x_to_usteps(1000)
micro.wait_till_operation_is_completed()
micro.move_y_to_usteps(2000)
micro.wait_till_operation_is_completed()
micro.move_z_to_usteps(3000)
micro.wait_till_operation_is_completed()
# There's no move_theta_to_usteps.
assert_pos_almost_equal((1000, 2000, 3000, 0), micro.get_pos())

micro.home_xy()
micro.wait_till_operation_is_completed()
assert_pos_almost_equal((0, 0, 3000, 0), micro.get_pos())

@pytest.mark.skip(reason="This is likely a bug, but I'm not sure yet. Tracking in https://linear.app/cephla/issue/S-115/microcontroller-relative-and-absolute-position-sign-mismatch")
def test_microcontroller_absolute_and_relative_match():
micro = control.microcontroller.Microcontroller(existing_serial=control.microcontroller.SimSerial())

def wait():
micro.wait_till_operation_is_completed()

micro.home_x()
wait()

micro.home_y()
wait()

micro.home_z()
wait()

micro.home_theta()
wait()

# For all our axes, we'd expect that moving to an absolute position from zero brings us to that position.
# Then doing a relative move of the negative of the absolute position should bring us back to zero.
abs_position = 1234

# X
micro.move_x_to_usteps(abs_position)
wait()
assert_pos_almost_equal((abs_position, 0, 0, 0), micro.get_pos())

micro.move_x_usteps(-abs_position)
wait()
assert_pos_almost_equal((0, 0, 0, 0), micro.get_pos())

# Y
micro.move_y_to_usteps(abs_position)
wait()
assert_pos_almost_equal((0, abs_position, 0, 0), micro.get_pos())

micro.move_y_usteps(-abs_position)
wait()
assert_pos_almost_equal((0, 0, 0, 0), micro.get_pos())

# Z
micro.move_z_to_usteps(abs_position)
wait()
assert_pos_almost_equal((0, 0, abs_position, 0), micro.get_pos())

micro.move_z_usteps(-abs_position)
wait()
assert_pos_almost_equal((0, 0, 0, 0), micro.get_pos())

0 comments on commit 352141b

Please sign in to comment.