Skip to content

Commit

Permalink
check for camera trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
bimac committed Aug 21, 2023
1 parent 8b16805 commit 3cb2db0
Showing 1 changed file with 34 additions and 24 deletions.
58 changes: 34 additions & 24 deletions scripts/hardware_validation/verify_hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from glob import glob
from pathlib import Path
import time
from struct import unpack

import usb.core

from serial import Serial
Expand All @@ -12,20 +14,18 @@

from iblutil.util import setup_logger
import iblrig.base_tasks
from pybpodapi.protocol import Bpod # StateMachine
from pybpodapi.protocol import Bpod, StateMachine

# set up logging
log = setup_logger('iblrig', level='DEBUG')

issues = 0


# function for querying a serial device
def query(s_obj, req, n=1, end: bool = False):
s_obj.write(req)
return s.read(n)


def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False):
global issues
tree = '└' if last else '├'
Expand Down Expand Up @@ -75,7 +75,7 @@ def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False):
# check for duplicate serial ports
seen = set()
for p in [x for x in ports.values() if x in seen or seen.add(x)]:
log_fun('fail', f'Duplicate serial port: "{p}"')
log_fun('fail', f'duplicate serial port: "{p}"')
else:
log_fun('pass', 'no duplicate serial ports found')

Expand All @@ -90,7 +90,7 @@ def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False):

# check for invalid port-strings
for p in [(k, v) for k, v in ports.items() if v not in valid_ports]:
log_fun('fail', f'Invalid serial port: "{p[1]}"', last=True)
log_fun('fail', f'invalid serial port: "{p[1]}"', last=True)
else:
log_fun('pass', 'no invalid port-strings found', last=True)

Expand All @@ -112,7 +112,7 @@ def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False):
try:
s = Serial(port, timeout=1, writeTimeout=1)
except serial.SerialException:
log_fun('fail', f'Cannot connect to {port} ({description}) - is another process using the port?', last=True)
log_fun('fail', f'cannot connect to {port} ({description}) - is another process using the port?', last=True)
continue
else:
log_fun('pass', 'serial port can be connected to')
Expand Down Expand Up @@ -146,9 +146,9 @@ def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False):
s.close()

if ok:
log_fun('pass', f'Device on port {port} seems to be a {device_name}', last=True)
log_fun('pass', f'device on port {port} seems to be a {device_name}', last=True)
else:
log_fun('fail', f'Device on port {port} does not appear to be a {device_name}', last=True)
log_fun('fail', f'device on port {port} does not appear to be a {device_name}', last=True)

# To Do: Look into this required delay
time.sleep(.02)
Expand All @@ -163,7 +163,7 @@ def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False):
if len(module) == 0:
log_fun('fail', 'Rotary Encoder Module is not connected to the Bpod')
elif len(module) > 1:
log_fun('fail', 'More than one Rotary Encoder Module connected to the Bpod')
log_fun('fail', 'more than one Rotary Encoder Module connected to the Bpod')
else:
log_fun('pass', f'module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}')

Expand Down Expand Up @@ -194,22 +194,21 @@ def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False):

dev = usb.core.find(idVendor=0x04D8, idProduct=0xEE6A)
if not dev:
log.critical(' ├ ✘ Cannot find Harp Sound Card')
log_fun('fail', Cannot find Harp Sound Card')
else:
log_fun('pass', 'found USB device {:04X}:{:04X} (Harp Sound Card)'.format(dev.idVendor, dev.idProduct))

dev = next((p for p in serial.tools.list_ports.comports() if (p.vid == 1027 and p.pid == 24577)), None)
if not dev:
log.critical(
' ├ ✘ Cannot find Harp Sound Card\'s Serial port - did you plug in *both* USB ports of the device?')
log_fun('fail', cannot find Harp Sound Card\'s Serial port - did you plug in *both* USB ports of the device?')
else:
log_fun('pass', 'found USB device {:04X}:{:04X} (FT232 UART), serial port: {}'.format(dev.vid, dev.pid, dev.name))

module = [m for m in modules if m.name.startswith('SoundCard')]
if len(module) == 0:
log_fun('fail', 'Harp Sound Card is not connected to the Bpod', last=True)
elif len(module) > 1:
log_fun('fail', 'More than one Harp Sound Card connected to the Bpod', last=True)
log_fun('fail', 'more than one Harp Sound Card connected to the Bpod', last=True)
else:
log_fun('pass', f'module "{module[0].name}" is connected to the Bpod\'s module port #{module[0].serial_port}',
last=True)
Expand All @@ -223,26 +222,37 @@ def log_fun(msg_type: str = 'info', msg: str = '', last: bool = False):
log_fun('info', f'firmware version: {module.firmware_version}')
module.start_module_relay()
bpod.bpod_modules.module_write(module, "R")
r = bpod.bpod_modules.module_read(module, 12)
(t, p, h) = unpack('3f', bytes(bpod.bpod_modules.module_read(module, 12)))
module.stop_module_relay()
v = {
"Temperature_C": np.frombuffer(bytes(r[:4]), np.float32)[0],
"AirPressure_mb": np.frombuffer(bytes(r[4:8]), np.float32)[0] / 100,
"RelativeHumidity": np.frombuffer(bytes(r[8:]), np.float32)[0],
}
log_fun('info', f'temperature: {v["Temperature_C"]:.1f} °C')
log_fun('info', f'air pressure: {v["AirPressure_mb"]:.1f} mbar')
log_fun('info', f'rel. humidity: {v["RelativeHumidity"]:.1f}%')
log_fun('info', f'temperature: {t:.1f} °C')
log_fun('info', f'air pressure: {p/100:.1f} mbar')
log_fun('info', f'rel. humidity: {h:.1f}%')
else:
log_fun('fail', 'Could not find Ambient Module', last=True)

if "device_cameras" in hw_settings and isinstance(hw_settings["device_cameras"], dict):
log_fun('head', 'Checking Camera Trigger:')
sma = StateMachine(bpod)
sma.add_state(
state_name="collect",
state_timer=1,
state_change_conditions={"Tup": "exit"},
)
bpod.send_state_machine(sma)
bpod.run_state_machine(sma)
triggers = [i.host_timestamp for i in bpod.session.current_trial.events_occurrences if i.content == 'Port1In']
np.mean(np.diff(triggers))
if len(triggers) == 0:
log_fun('fail', 'could not read camera trigger.', last=True)
else:
log_fun('pass', 'successfully read camera trigger.')
log_fun('info', f'average frame-rate: {np.mean(np.diff(triggers)) * 1E3:.3f} Hz', last=True)

bpod.close()

# TO DO: bpod
# TO DO: BNC connections
# TO DO: camera
# TO DO: sound output
# TO DO: ambient module


if issues:
Expand Down

0 comments on commit 3cb2db0

Please sign in to comment.