From 933616402aa74c7caf562645289cdd0b296ab6ed Mon Sep 17 00:00:00 2001 From: Todd Parsons Date: Wed, 25 Oct 2023 11:54:15 +0100 Subject: [PATCH] RF: Update TPad infrastructure to work with DeviceManager --- psychopy_bbtk/tpad.py | 118 ++++++++++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 44 deletions(-) diff --git a/psychopy_bbtk/tpad.py b/psychopy_bbtk/tpad.py index 73cd3cc..39eace3 100644 --- a/psychopy_bbtk/tpad.py +++ b/psychopy_bbtk/tpad.py @@ -1,4 +1,5 @@ -from psychopy.hardware import manager as mgr, serialdevice as sd, photodiode, button +from psychopy.hardware import base, serialdevice as sd, photodiode, button +from psychopy.hardware.manager import deviceManager, DeviceManager, DeviceMethod from psychopy import logging, layout from psychopy.tools import systemtools as st import serial @@ -46,11 +47,11 @@ def splitTPadMessage(message): return re.match(messageFormat, message).groups() -class TPadManagerPlugin(mgr.DeviceManager): +class TPadManagerPlugin(DeviceManager): """ Class which plugs in to DeviceManager and adds methods for managing BBTK TPad devices """ - @mgr.DeviceMethod("tpad", "add") + @DeviceMethod("tpad", "add") def addTPad(self, name=None, port=None, pauseDuration=1/240): """ Add a BBTK TPad. @@ -71,14 +72,14 @@ def addTPad(self, name=None, port=None, pauseDuration=1/240): """ # make unique name if none given if name is None: - name = mgr.DeviceManager.makeUniqueName("tpad") + name = DeviceManager.makeUniqueName(self, "tpad") self._assertDeviceNameUnique(name) # create and store TPad - self._devices['tpad'][name] = TPad(port=port, pauseDuration=pauseDuration) + self._devices['tpad'][name] = TPadDevice(port=port, pauseDuration=pauseDuration) # return created TPad return self._devices['tpad'][name] - @mgr.DeviceMethod("tpad", "remove") + @DeviceMethod("tpad", "remove") def removeTPad(self, name): """ Remove a TPad. @@ -90,7 +91,7 @@ def removeTPad(self, name): """ del self._devices['tpad'][name] - @mgr.DeviceMethod("tpad", "get") + @DeviceMethod("tpad", "get") def getTPad(self, name): """ Get a TPad by name. @@ -107,7 +108,7 @@ def getTPad(self, name): """ return self._devices['tpad'].get(name, None) - @mgr.DeviceMethod("tpad", "getall") + @DeviceMethod("tpad", "getall") def getTPads(self): """ Get a mapping of TPads that have been initialized. @@ -122,15 +123,36 @@ def getTPads(self): """ return self._devices['tpad'] + @DeviceMethod("tpad", "available") + def getAvailableTPads(self): + """ + Get details of all available TPad devices. + + Returns + ------- + dict + Dictionary of information about available TPads connected to the system. + """ + foundDevices = [] + # look for all serial devices + for device in st.systemProfilerWindowsOS(connected=True, classname="Ports"): + # skip non-TPads + if "BBTKTPAD" not in device['Instance ID']: + continue + # get port + port = device['Device Description'].split("(")[1].split(")")[0] + # append + foundDevices.append( + {'port': port} + ) + + return foundDevices + class TPadPhotodiode(photodiode.BasePhotodiode): - def __init__(self, port, number): - # if no TPad device present, try to create one - if sd.ports[port] is None: - pad = TPad(port=port) - pad.photodiodes[number] = self + def __init__(self, pad, number): # initialise base class - photodiode.BasePhotodiode.__init__(self, port) + photodiode.BasePhotodiode.__init__(self, pad) # store number self.number = number @@ -176,13 +198,7 @@ def findPhotodiode(self, win): class TPadButton(button.BaseButton): - def __init__(self, port, number): - # if no TPad device present, try to create one - if sd.ports[port] is None: - pad = TPad(port=port) - pad.buttons[number] = self - else: - pad = sd.ports[port] + def __init__(self, pad, number): # initialise base class button.BaseButton.__init__(self, device=pad) # store number @@ -213,19 +229,27 @@ def __init__(self, *args, **kwargs): pass -class TPad(sd.SerialDevice): - def __init__(self, port=None, pauseDuration=1/240): +class TPad: + def __init__(self, name=None, port=None, pauseDuration=1/240): # get port if not given if port is None: port = self._detectComPort() - # initialise as a SerialDevice - sd.SerialDevice.__init__(self, port=port, baudrate=115200, pauseDuration=pauseDuration) + # get/make device + if deviceManager.checkDeviceNameAvailable(name): + # if no matching device is in DeviceManager, make a new one + self.device = deviceManager.addTPad( + name=name, port=port, pauseDuration=pauseDuration + ) + else: + # otherwise, use the existing device + self.device = deviceManager.getTPad(name) + # dict of responses by timestamp self.messages = {} # inputs - self.photodiodes = {i+1: TPadPhotodiode(port, i+1) for i in range(2)} - self.buttons = {i+1: TPadButton(port, i+1) for i in range(10)} - self.voicekeys = {i+1: TPadVoicekey(port, i+1) for i in range(1)} + self.photodiodes = {i + 1: TPadPhotodiode(self, i + 1) for i in range(2)} + self.buttons = {i + 1: TPadButton(self, i + 1) for i in range(10)} + self.voicekeys = {i + 1: TPadVoicekey(self, i + 1) for i in range(1)} # reset timer self._lastTimerReset = None self.resetTimer() @@ -290,33 +314,33 @@ def setMode(self, mode): self.dispatchMessages() # exit out of whatever mode we're in (effectively set it to 0) try: - self.sendMessage("X") - self.pause() + self.device.sendMessage("X") + self.device.pause() except serial.serialutil.SerialException: pass # set mode - self.sendMessage(f"MOD{mode}") - self.pause() + self.device.sendMessage(f"MOD{mode}") + self.device.pause() # clear messages - self.getResponse() + self.device.getResponse() def resetTimer(self, clock=logging.defaultClock): # enter settings mode self.setMode(0) # send reset command - self.sendMessage(f"REST") + self.device.sendMessage(f"REST") # store time self._lastTimerReset = clock.getTime(format=float) # allow time to process - self.pause() + self.device.pause() # reset mode self.setMode(3) def isAwake(self): self.setMode(0) # call help and get response - self.sendMessage("HELP") - resp = self.getResponse() + self.device.sendMessage("HELP") + resp = self.device.getResponse() # set to mode 3 self.setMode(3) @@ -325,11 +349,11 @@ def isAwake(self): def dispatchMessages(self, timeout=None): # if timeout is None, use pause duration if timeout is None: - timeout = self.pauseDuration + timeout = self.device.pauseDuration # get data from box - self.pause() - data = sd.SerialDevice.getResponse(self, length=2, timeout=timeout) - self.pause() + self.device.pause() + data = sd.SerialDevice.getResponse(self.device, length=2, timeout=timeout) + self.device.pause() # parse lines for line in data: if re.match(messageFormat, line): @@ -360,8 +384,14 @@ def calibratePhotodiode(self, level=127): # set to mode 0 self.setMode(0) # call help and get response - self.sendMessage(f"AAO1 {level}") - self.sendMessage(f"AAO2 {level}") - self.getResponse() + self.device.sendMessage(f"AAO1 {level}") + self.device.sendMessage(f"AAO2 {level}") + self.device.getResponse() # set to mode 3 self.setMode(3) + + +class TPadDevice(sd.SerialDevice, base.BaseDevice): + def __init__(self, port=None, pauseDuration=1/240): + # initialise as a SerialDevice + sd.SerialDevice.__init__(self, port=port, baudrate=115200, pauseDuration=pauseDuration)