diff --git a/ledboard.py b/ledboard.py index d53a6cd..bba2720 100644 --- a/ledboard.py +++ b/ledboard.py @@ -40,6 +40,7 @@ import serial import serial.tools.list_ports as list_ports from lvp_logger import logger +import time class LEDBoard: @@ -84,12 +85,15 @@ def connect(self): timeout=self.timeout, write_timeout=self.write_timeout) - self.driver.close() - self.driver.open() + #self.driver.close() + #self.driver.open() # self.exchange_command('import main.py') # self.exchange_command('import main.py') logger.info('[LED Class ] LEDBoard.connect() succeeded') + #Sometimes the firmware fails to start (or the port has a \x00 left in the buffer), this forces MicroPython to reset, and the normal firmware just complains + self.driver.write(b'\x04\n') + logger.debug('[LED Class ] LEDBOARD.connect() port initial state: %r'%self.driver.readline()) except: self.driver = False logger.exception('[LED Class ] LEDBoard.connect() failed') @@ -98,17 +102,17 @@ def exchange_command(self, command): """ Exchange command through serial to LED board This should NOT be used in a script. It is intended for other functions to access""" - stream = command.encode('utf-8')+b"\r\n" + stream = command.encode('utf-8')+b"\n" if self.driver != False: try: - self.driver.close() - self.driver.open() self.driver.write(stream) response = self.driver.readline() + self.driver.flushInput() + self.driver.flush() response = response.decode("utf-8","ignore") - logger.info('[LED Class ] LEDBoard.exchange_command('+command+') succeeded') + logger.info('[LED Class ] LEDBoard.exchange_command('+command+') succeeded: %r'%response) return response[:-2] except serial.SerialTimeoutException: diff --git a/lumascope_api.py b/lumascope_api.py index 0964f2c..513fb44 100644 --- a/lumascope_api.py +++ b/lumascope_api.py @@ -43,15 +43,11 @@ # Import additional libraries from lvp_logger import logger -from threading import Timer import time +import os import cv2 import numpy as np -class RepeatTimer(Timer): - def run(self): - while not self.finished.wait(self.interval): - self.function(*self.args, **self.kwargs) class Lumascope(): @@ -160,6 +156,31 @@ def get_image(self): return self.camera.array else: return False + + def get_next_save_path(self, path): + """ GETS THE NEXT SAVE PATH GIVEN AN EXISTING SAVE PATH + + :param path of the format './{save_folder}/{well_label}_{color}_{file_id}.tiff' + :returns the next save path './{save_folder}/{well_label}_{color}_{file_id + 1}.tiff' + + """ + + # Extract file extension (.tiff) and file_id (00001) + dot_idx = path.rfind('.') + under_idx = path.rfind('_') + file_extension = path[dot_idx + 1:] + file_id = path[under_idx + 1:dot_idx] + + # Determine the next file_id + num_zeros = len(file_id) + number_str = str(int(file_id) + 1) + zeros_to_add = num_zeros - len(number_str) + if zeros_to_add <= 0: + new_file_id = number_str + else: + new_file_id = '0' * zeros_to_add + number_str + + return f'{path[:under_idx]}_{new_file_id}.{file_extension}' def save_image(self, array, save_folder = './capture', file_root = 'img_', append = 'ms', color = 'BF'): """CAMERA FUNCTIONS @@ -182,17 +203,22 @@ def save_image(self, array, save_folder = './capture', file_root = 'img_', appen img = np.flip(img, 0) # set filename options - if append == 'ms': - append = str(int(round(time.time() * 1000))) - elif append == 'time': - append = time.strftime("%Y%m%d_%H%M%S") - else: - append = '' - - # generate filename string - filename = file_root + append + '.tiff' + # if append == 'ms': + # append = str(int(round(time.time() * 1000))) + # elif append == 'time': + # append = time.strftime("%Y%m%d_%H%M%S") + # else: + # append = '' + + # generate filename and save path string + initial_id = '_000001' + filename = file_root + append + initial_id + '.tiff' path = save_folder + '/' + filename + # Obtain next save path if current directory already exists + if os.path.exists(path): + path = self.get_next_save_path(path) + try: cv2.imwrite(path, img.astype(np.uint8)) logger.info(f'[SCOPE API ] Saving Image to {path}') @@ -290,18 +316,20 @@ def set_auto_exposure_time(self, state = True): def zhome(self): """MOTION CONTROL FUNCTIONS Home the z-axis (i.e. focus)""" - if not self.motion: return + #if not self.motion: return self.motion.zhome() def xyhome(self): """MOTION CONTROL FUNCTIONS Home the xy-axes (i.e. stage). Note: z-axis and turret will always home first""" - if not self.motion: return + #if not self.motion: return self.is_homing = True self.motion.xyhome() + return - self.xyhome_timer = RepeatTimer(0.01, self.xyhome_iterate) - self.xyhome_timer.start() + #while self.is_moving(): + # time.sleep(0.01) + #self.is_homing = False def xyhome_iterate(self): if not self.is_moving(): @@ -312,37 +340,32 @@ def xycenter(self): """MOTION CONTROL FUNCTIONS Move Stage to the center.""" - if not self.motion: return + #if not self.motion: return self.motion.xycenter() def thome(self): """MOTION CONTROL FUNCTIONS Home the Turret""" - if not self.motion: - return + #if not self.motion: + # return self.motion.thome() def tmove(self, degrees): """MOTION CONTROL FUNCTIONS Move turret to position in degrees""" - if not self.motion: return # MUST home move objective home first to prevent crash - self.zhome() - #self.xyhome() - #self.xycenter() + #self.zhome() + #self.move_absolute_position('Z', self.z_min) + self.move_absolute_position('Z', 0) + self.is_turreting = True self.move_absolute_position('T', degrees) - self.tmove_timer = RepeatTimer(0.01, self.tmove_iterate, args=(degrees,)) - #self.tmove_timer = RepeatTimer(2, self.tmove_iterate, args=(degrees,)) - self.tmove_timer.start() - - def tmove_iterate(self, degrees): - if not self.is_moving(): - self.is_turreting = False - self.tmove_timer.cancel() + #while not self.is_moving(): + # time.sleep(0.1) + self.is_turreting = False def get_target_position(self, axis): """MOTION CONTROL FUNCTIONS @@ -368,21 +391,21 @@ def move_absolute_position(self, axis, pos): """MOTION CONTROL FUNCTIONS Move to absolute position (in um) of axis""" - if not self.motion: return + #if not self.motion: return self.motion.move_abs_pos(axis, pos) def move_relative_position(self, axis, um): """MOTION CONTROL FUNCTIONS Move to relative distance (in um) of axis""" - if not self.motion: return + #if not self.motion: return self.motion.move_rel_pos(axis, um) def get_home_status(self, axis): """MOTION CONTROL FUNCTIONS Return True if axis is in home position or motionboard is """ - if not self.motion: return True + #if not self.motion: return True status = self.motion.home_status(axis) return status @@ -390,7 +413,7 @@ def get_target_status(self, axis): """MOTION CONTROL FUNCTIONS Return True if axis is at target position""" - if not self.motion: return True + #if not self.motion: return True status = self.motion.target_status(axis) return status @@ -399,7 +422,7 @@ def get_reference_status(self, axis): """MOTION CONTROL FUNCTIONS Get all reference status register bits as 32 character string (32-> 0) """ - if not self.motion: return + #if not self.motion: return status = self.motion.reference_status(axis) return status @@ -407,7 +430,7 @@ def get_overshoot(self): """MOTION CONTROL FUNCTIONS Is z-axis (focus) currently in overshoot mode?""" - if not self.motion: return False + #if not self.motion: return False return self.motion.overshoot def is_moving(self): @@ -533,23 +556,23 @@ def autofocus(self, AF_min, AF_max, AF_range): # Start the autofocus process at z-minimum self.move_absolute_position('Z', self.z_min) - # Start thread to wait until autofocus is complete - self.autofocus_timer = RepeatTimer(0.01, self.autofocus_iterate, args=(AF_min,)) - self.autofocus_timer.start() + while not self.autofocus_iterate(AF_min): + time.sleep(0.01) def autofocus_iterate(self, AF_min): """INTEGRATED SCOPE FUNCTIONS iterate autofocus functionality""" + done=False # Ignore steps until conditions are met - if self.is_moving(): return # needs to be in position - if self.is_capturing: return # needs to have completed capture with illumination + if self.is_moving(): return done # needs to be in position + if self.is_capturing: return done # needs to have completed capture with illumination # Is there a previous capture result to pull? if self.capture_return is False: # No -> start a capture event self.capture() - return + return done else: # Yes -> pull the capture result and clear @@ -558,8 +581,8 @@ def autofocus_iterate(self, AF_min): if image is False: # Stop thread image can't be acquired - self.autofocus_timer.cancel() - return + done = True + return done # observe the image rows, cols = image.shape @@ -581,7 +604,7 @@ def autofocus_iterate(self, AF_min): if next_target <= self.z_max: self.move_relative_position('Z', self.resolution) - return + return done # Adjust future steps if next_target went out of bounds # Calculate new step size for resolution @@ -616,7 +639,8 @@ def autofocus_iterate(self, AF_min): self.is_focusing = False # Stop thread image when autofocus is complete - self.autofocus_timer.cancel() + done=True + return done # Algorithms for estimating the quality of the focus def focus_function(self, image, algorithm = 'vollath4'): diff --git a/lumaviewpro.py b/lumaviewpro.py index 02655fe..41c45f8 100644 --- a/lumaviewpro.py +++ b/lumaviewpro.py @@ -34,7 +34,7 @@ Gerard Decker, The Earthineering Company MODIFIED: -May 24, 2023 +May 31, 2023 ''' # General @@ -108,6 +108,8 @@ global focus_round focus_round = 0 + + def focus_log(positions, values): global focus_round if False: @@ -177,14 +179,35 @@ class CompositeCapture(FloatLayout): def __init__(self, **kwargs): super(CompositeCapture,self).__init__(**kwargs) + # Gets the current well label (ex. A1, C2, ...) + def get_well_label(self): + current_labware = WellPlate() + current_labware.load_plate(settings['protocol']['labware']) + protocol_settings = lumaview.ids['motionsettings_id'].ids['protocol_settings_id'] + + # Get target position + try: + x_target = lumaview.scope.get_target_position('X') + y_target = lumaview.scope.get_target_position('Y') + except: + logger.exception('[LVP Main ] Error talking to Motor board.') + raise + + x_target, y_target = protocol_settings.stage_to_plate(x_target, y_target) + + well_x, well_y = current_labware.get_well_index(x_target, y_target) + letter = chr(ord('A') + well_y) + return f'{letter}{well_x + 1}' + def live_capture(self): logger.info('[LVP Main ] CompositeCapture.live_capture()') global lumaview save_folder = settings['live_folder'] file_root = 'live_' - append = 'ms' color = 'BF' + well_label = self.get_well_label() + append = f'{well_label}_{color}' layers = ['BF', 'PC', 'EP', 'Blue', 'Green', 'Red'] for layer in layers: @@ -209,7 +232,8 @@ def custom_capture(self, channel, illumination, gain, exposure, false_color = Tr color = lumaview.scope.ch2color(channel) save_folder = settings[color]['save_folder'] file_root = settings[color]['file_root'] - append = 'ms' + well_label = self.get_well_label() + append = f'{well_label}_{color}' # Illuminate if lumaview.scope.led: @@ -322,9 +346,21 @@ def composite_capture(self): save_folder = settings['live_folder'] file_root = 'composite_' - append = str(int(round(time.time() * 1000))) - filename = file_root + append + '.tiff' - cv2.imwrite(save_folder+'/'+filename, img.astype(np.uint8)) + + # append = str(int(round(time.time() * 1000))) + well_label = self.get_well_label() + append = f'{well_label}' + + # generate filename and save path string + initial_id = '_000001' + filename = file_root + append + initial_id + '.tiff' + path = save_folder + '/' + filename + + # Obtain next save path if current directory already exists + if os.path.exists(path): + path = lumaview.scope.get_next_save_path(path) + + cv2.imwrite(path, img.astype(np.uint8)) # ------------------------------------------------------------------------- # MAIN DISPLAY of LumaViewPro App @@ -1394,6 +1430,7 @@ def stage_to_pixel(self, sx, sy, scale_x, scale_y): pixel_x, pixel_y = self.plate_to_pixel(px, py, scale_x, scale_y) return pixel_x, pixel_y + # Create New Protocol diff --git a/lvp_logger.py b/lvp_logger.py index cb843e4..22e7fff 100644 --- a/lvp_logger.py +++ b/lvp_logger.py @@ -30,7 +30,7 @@ Gerard Decker, The Earthineering Company MODIFIED: -May 24, 2023 +May 31, 2023 ''' ''' @@ -63,7 +63,7 @@ def format(self, record): logger = logging.getLogger(__name__) # determines lowest level of messages to log (DEBUG < INFO < WARNING < ERROR < CRITICAL) -logger.setLevel(logging.DEBUG) +logger.setLevel(logging.INFO) # obtains name of the module (file) importing lvp_logger filename = '%s' % __file__ diff --git a/motorboard.py b/motorboard.py index 6e4a255..63004bd 100644 --- a/motorboard.py +++ b/motorboard.py @@ -33,7 +33,7 @@ Gerard Decker, The Earthineering Company MODIFIED: -May 24, 2023 +May 31, 2023 ''' #import threading @@ -57,6 +57,7 @@ def __init__(self, **kwargs): self.found = False self.overshoot = False self.backlash = 25 # um of additional downlaod travel in z for drive hysterisis + self.has_turret = False for port in ports: if (port.vid == 0x2E8A) and (port.pid == 0x0005): @@ -69,8 +70,8 @@ def __init__(self, **kwargs): self.bytesize=serial.EIGHTBITS self.parity=serial.PARITY_NONE self.stopbits=serial.STOPBITS_ONE - self.timeout=0.01 # seconds - self.write_timeout=0.01 # seconds + self.timeout=None # seconds + self.write_timeout=None # seconds self.driver = False try: logger.info('[XYZ Class ] Found motor controller and about to establish connection.') @@ -93,15 +94,17 @@ def connect(self): self.driver.close() #print([comport.device for comport in serial.tools.list_ports.comports()]) self.driver.open() - + logger.info('[XYZ Class ] MotorBoard.connect() succeeded') # After powering on the scope, the first command seems to be ignored. # This is to ensure the following commands are followed # Dev 2023-MAY-16 the above 2 comments are suspect - doesn't seem to matter - self.exchange_command('INFO') - - ## self.check_firmware() - IN PROGRESS, LEAVE COMMENTED + #Sometimes the firmware fails to start (or the port has a \x00 left in the buffer), this forces MicroPython to reset, and the normal firmware just complains + self.driver.write(b'\x04\n') + logger.debug('[XYZ Class ] MotorBoard.connect() port initial state: %r'%self.driver.readline()) + # Fullinfo checks to see if it has a turret, so call that here + self.fullinfo() except: self.driver = False logger.exception('[XYZ Class ] MotorBoard.connect() failed') @@ -109,41 +112,39 @@ def connect(self): #---------------------------------------------------------- # Define Communication #---------------------------------------------------------- - def exchange_command(self, command): + def exchange_command(self, command, response_numlines=1): """ Exchange command through serial to SPI to the motor boards This should NOT be used in a script. It is intended for other functions to access""" - stream = command.encode('utf-8')+b"\r\n" + stream = command.encode('utf-8')+b"\n" #print(stream) - if self.driver != False: - try: - self.driver.close() - self.driver.open() - self.driver.write(stream) - #if (command)=='HOME': # ESW to increase homing reliability - # CRLF = command.encode('utf-8')+b"\r\n" - # self.driver.write(CRLF) - response = self.driver.readline() - response = response.decode("utf-8","ignore") - #print(response[:-2]) - - # (too often) logger.info('[XYZ Class ] MotorBoard.exchange_command('+command+') succeeded') - return response[:-2] - - except serial.SerialTimeoutException: - self.driver = False - logger.exception('[XYZ Class ] MotorBoard.exchange_command('+command+') Serial Timeout Occurred') - - except: - self.driver = False - logger.exception('[XYZ Class ] MotorBoard.exchange_command('+command+') failed') - - else: + if not self.driver: try: self.connect() except: return + try: + self.driver.write(stream) + #if (command)=='HOME': # ESW to increase homing reliability + # CRLF = command.encode('utf-8')+b"\r\n" + # self.driver.write(CRLF) + + resp_lines = [self.driver.readline() for _ in range(response_numlines)] + response = [r.decode("utf-8","ignore").strip() for r in resp_lines] + if response_numlines == 1: + response = response[0] + logger.debug('[XYZ Class ] MotorBoard.exchange_command('+command+') %r'%response) + return response + + except serial.SerialTimeoutException: + self.driver = False + logger.exception('[XYZ Class ] MotorBoard.exchange_command('+command+') Serial Timeout Occurred') + + except: + self.driver = False + logger.exception('[XYZ Class ] MotorBoard.exchange_command('+command+') failed') + # Firmware 1-14-2023 commands include # 'QUIT' @@ -164,6 +165,15 @@ def exchange_command(self, command): def infomation(self): self.exchange_command('INFO') + def fullinfo(self): + info = self.exchange_command("FULLINFO") + logger.info('[XYZ Class ] MotorBoard.fullinfo(): %s', info) + info = info.split() + model = info[info.index("Model:")+1] + if model[-1] == "T": + self.has_turret = True + # an option here is to set the current model in the LumaView object as well so the user doesn't need to + #---------------------------------------------------------- # Z (Focus) Functions #---------------------------------------------------------- @@ -173,13 +183,13 @@ def z_ustep2um(self, ustep): return um def z_um2ustep(self, um): - # logger.info('[XYZ Class ] MotorBoard.z_um2ustep('+str(um)+')') + # logger.info('[XYZ Class ] MotorBoard.z_um2ustep('+str(um)+')') ustep = int( um / 0.00586 ) # 0.00586 um/ustep Olympus Z return ustep def zhome(self): """ Home the objective """ - logger.info('[XYZ Class ] MotorBoard.zhome()') + logger.info('[XYZ Class ] MotorBoard.zhome()') self.exchange_command('ZHOME') #---------------------------------------------------------- @@ -197,15 +207,14 @@ def xy_um2ustep(self, um): def xyhome(self): """ Home the stage which also homes the objective first """ - logger.info('[XYZ Class ] MotorBoard.xyhome()') - if self.found: - self.exchange_command('HOME') + logger.info('[XYZ Class ] MotorBoard.xyhome()') + self.exchange_command('HOME') def xycenter(self): """ Home the stage which also homes the objective first """ logger.info('[XYZ Class ] MotorBoard.xycenter()') self.exchange_command('CENTER') - + #---------------------------------------------------------- # T (Turret) Functions #---------------------------------------------------------- @@ -228,7 +237,7 @@ def thome(self): #---------------------------------------------------------- # Motion Functions #---------------------------------------------------------- - + def move(self, axis, steps): """ Move the axis to an absolute position (in usteps) compared to Home """ @@ -323,8 +332,8 @@ def move_abs_pos(self, axis, pos): overshoot = max(1, overshoot) #self.SPI_write (self.chip_pin[axis], self.write_target[axis], overshoot) self.move(axis, overshoot) - while not self.target_status('Z'): - time.sleep(0.001) + #while not self.target_status('Z'): + # time.sleep(0.001) # complete overshoot self.overshoot = False