diff --git a/.gitignore b/.gitignore index b6e4761..1da13b3 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,6 @@ dmypy.json # Pyre type checker .pyre/ + +# vscode +.vscode \ No newline at end of file diff --git a/rn42.py b/rn42.py new file mode 100755 index 0000000..20c3837 --- /dev/null +++ b/rn42.py @@ -0,0 +1,501 @@ +#!/usr/bin/env python3 + +# Copyright 2016 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""This module provides an abstraction of RN-42 bluetooth chip.""" + +import logging +import serial + +import serial_utils + + +class RN42Exception(Exception): + """A dummpy exception class for RN42 class.""" + pass + + +class RN42(object): + """This is an abstraction of Roving Network's RN-42 bluetooth evaluation kit. + RN-42 supports SPP and HID protocols. The primary use case is to + configure it to use the HID protocol to emulate a keyboard, a mouse, + or a combo of both. + For user guide about serial control of the kit, refer to + http://ww1.microchip.com/downloads/en/DeviceDoc/50002325A.pdf + For advanced information about configuring HID profile, refer to + http://ww1.microchip.com/downloads/en/DeviceDoc/bluetooth_cr_UG-v1.0r.pdf + """ + # Serial port settings + BAUDRATE = 115200 + BYTESIZE = serial.EIGHTBITS + PARITY = serial.PARITY_NONE + STOPBITS = serial.STOPBITS_ONE + DRIVER = 'ftdi_sio' + CHIP_NAME = 'RNBT' + RETRY = 2 # Try (RETRY + 1) times in total. + + # A newline is a carriage return '\r' followed by line feed '\n'. + NEWLINE = '\r\n' + + # Response status + AOK = 'AOK' # Acknowledge OK + UNKNOWN = '?' # Unknown command + + # basic chip operations + CMD_ENTER_COMMAND_MODE = '$$$' + CMD_LEAVE_COMMAND_MODE = '---' + CMD_REBOOT = 'R,1' + + # chip basic information + CMD_GET_CHIP_NAME = 'GN' + CMD_GET_FIRMWARE_VERSION = 'V' + + # operation modes: master or slave + CMD_GET_OPERATION_MODE = 'GM' + CMD_SET_MASTER_MODE = 'SM,1' + CMD_SET_SLAVE_MODE = 'SM,0' + + # authentication mode + CMD_GET_AUTHENTICATION_MODE = 'GA' + CMD_SET_AUTHENTICATION_OPEN_MODE = 'SA,0' # no authentication + CMD_SET_AUTHENTICATION_PIN_MODE = 'SA,4' # host to send a matching pin + + # bluetooth service profiles + PROFILE_SPP = '0' + PROFILE_HID = '6' + CMD_GET_SERVICE_PROFILE = 'G~' + CMD_SET_SERVICE_PROFILE_SPP = 'S~,' + PROFILE_SPP + CMD_SET_SERVICE_PROFILE_HID = 'S~,' + PROFILE_HID + + # bluetooth mac address and connection + CMD_GET_RN42_BLUETOOTH_MAC = 'GB' + CMD_GET_CONNECTION_STATUS = 'GK' + CMD_GET_REMOTE_CONNECTED_BLUETOOTH_MAC = 'GF' + + # HID device classes + CMD_GET_HID = 'GH' + CMD_SET_HID_KEYBOARD = 'SH,0000' + CMD_SET_HID_GAMEPAD = 'SH,0010' + CMD_SET_HID_MOUSE = 'SH,0020' + CMD_SET_HID_COMBO = 'SH,0030' + CMD_SET_HID_JOYSTICK = 'SH,0040' + + # the operation mode + OPERATION_MODE = { + 'Slav': 'SLAVE', # slave mode + 'Mstr': 'MASTER', # master mode + 'Trig': 'TRIGGER', # trigger mode + 'Auto': 'AUTO', # auto connect master mode + 'DTR': 'DTR', # auto connect DTR mode + 'Any': 'ANY', # auto connect any mode + 'Pair': 'PAIR' # paring mode + } + + # the authentication mode + AUTHENTICATION_MODE = { + '0': 'OPEN', + '1': 'SSP_KEYBOARD', + '2': 'SSP_JUST_WORK', + '4': 'PIN_CODE' + } + + # the service profile + SERVICE_PROFILE = { + '0': 'SPP', + '1': 'DUN_DCE', + '2': 'DUN_DTE', + '3': 'MDM_SPP', + '4': 'SPP_AND_DUN_DCE', + '5': 'APL', + '6': 'HID' + } + + # HID device types + HID_DEVICE_TYPE = { + '0200': 'KEYBOARD', + '0210': 'GAMEPAD', + '0220': 'MOUSE', + '0230': 'COMBO', + '0240': 'JOYSTICK', + '0250': 'DIGITIZER', + '0260': 'SENSOR', + '0270': 'USECFG', + } + + def __init__(self): + self._command_mode = False + self._closed = False + try: + self._serial = serial_utils.SerialDevice() + except Exception: + raise RN42Exception('Fail to create a serial device.') + try: + self._serial.Connect( + # driver=self.DRIVER, + port="/dev/tty.usbserial-AU045GW9", + baudrate=self.BAUDRATE, + bytesize=self.BYTESIZE, + parity=self.PARITY, + stopbits=self.STOPBITS) + logging.info('Connect to the serial port successfully.') + except Exception: + raise RN42Exception('Fail to connect to the serial device.') + + def __del__(self): + self.Close() + + def Close(self): + """ + Close the device gracefully. + """ + if not self._closed: + self.LeaveCommandMode() + self._serial.Disconnect() + self._closed = True + + def SerialSendReceive(self, command, expect='', expect_in='', msg='serial SendReceive()'): + """A wrapper of SerialDevice.SendReceive(). + + Args: + command: the serial command + expect: expect the exact string matching the response + expect_in: expect the string in the response + msg: the message to log + + Returns: + the result received from the serial console + + Raises: + RN42Exception if there is an error in serial communication or + if the response is not expected. + """ + try: + # All commands must end with a newline. + # size=0 means to receive all waiting characters. + # Retry a few times since sometimes the serial communication + # may not be reliable. + # Strip the result which ends with a newline too. + result = self._serial.SendReceive( + command + self.NEWLINE, size=0, + retry=self.RETRY).decode().strip() + + if ((expect and expect != result) + or (expect_in and expect_in not in result)): + error_msg = 'Failulre in %s: %s' % (msg, result) + raise RN42Exception(error_msg) + except Exception: + error_msg = 'Failulre in %s' % msg + raise RN42Exception(error_msg) + + logging.info('Success in %s: %s', msg, result) + return result + + def EnterCommandMode(self): + """Make the chip enter command mode. + + Returns: + True if entering the command mode successfully. + + Raises: + RN42Exception if there is an error in serial communication or + if the response is not expected. + """ + try: + # The command to enter command mode is special. It does not end + # with a newline. + # The result is something like '...CMD\r\n' where '...' means + # some possible random characters in the serial buffer. + result = self._serial.SendReceive( + self.CMD_ENTER_COMMAND_MODE, size=0, + retry=self.RETRY).decode().strip() + except serial.SerialTimeoutException: + raise RN42Exception('Failure in entering command mode.') + + if 'CMD' in result: + logging.info('Enter command mode successfully.') + self._command_mode = True + return True + elif result == '': + # If the chip is already in command mode, this would cause timeout + # and returns an empty string. So let's check if we could get the + # chip name to make sure that it is indeed in command mode. + try: + chip_name = self.GetChipName() + if chip_name.startswith(self.CHIP_NAME): + msg = 'Correct chip name when entering command mode: %s' + logging.info(msg, chip_name) + self._command_mode = True + return True + else: + msg = 'Incorrect chip name when entering command mode: %s' + raise RN42Exception(msg % chip_name) + except Exception: + msg = 'Failure to get chip name in entering command mode.' + raise RN42Exception(msg) + else: + msg = 'Incorrect response in entering command mode: %s' + raise RN42Exception(msg % result) + + def LeaveCommandMode(self): + """Make the chip leave command mode. + Returns: + True if the kit left the command mode successfully. + """ + if self._command_mode: + self.SerialSendReceive(self.CMD_LEAVE_COMMAND_MODE, + expect='END', + msg='leaving command mode') + self._command_mode = False + return True + + def Reboot(self): + """Reboot the chip. + Reboot is required to make some settings take effect when the settings are changed. + + Returns: + True if the kit rebooted successfully. + """ + self.SerialSendReceive(self.CMD_REBOOT, + expect='Reboot', + msg='rebooting RN-42') + return True + + def GetChipName(self): + """Get the chip name. + The chip returns something like 'RNBT-A955\\r\\n' + where 'RN' means Roving Network, 'BT' bluetooth, and + 'A955' the last four digits of its MAC address. + + Returns: + the chip name + """ + return self.SerialSendReceive(self.CMD_GET_CHIP_NAME, + msg='getting chip name') + + def GetFirmwareVersion(self): + """Get the firmware version of the chip. + The chip returns something like + 'Ver 6.15 04/26/2013\\r\\n(c) Roving Networks\\r\\n' + Note that the version must be higher than 6.11 to support HID profile. + + Returns: + the firmware version + """ + return self.SerialSendReceive(self.CMD_GET_FIRMWARE_VERSION, + expect_in='Ver', + msg='getting firmware version') + + def GetOperationMode(self): + """Get the operation mode. + + Returns: + the operation mode + """ + result = self.SerialSendReceive(self.CMD_GET_OPERATION_MODE, + msg='getting operation mode') + return self.OPERATION_MODE.get(result) + + def SetMasterMode(self): + """Set the chip to master mode. + + Returns: + True if setting master mode successfully. + """ + self.SerialSendReceive(self.CMD_SET_MASTER_MODE, + expect=self.AOK, + msg='setting master mode') + return True + + def SetSlaveMode(self): + """Set the chip to slave mode. + + Returns: + True if setting slave mode successfully. + """ + self.SerialSendReceive(self.CMD_SET_SLAVE_MODE, + expect=self.AOK, + msg='setting slave mode') + return True + + def GetAuthenticationMode(self): + """Get the authentication mode. + Returns: + a string representing the authentication mode + """ + result = self.SerialSendReceive(self.CMD_GET_AUTHENTICATION_MODE, + msg='getting authentication mode') + return self.AUTHENTICATION_MODE.get(result) + + def SetAuthenticationOpenMode(self): + """Set the authentication to open mode (no authentication). + + Returns: + True if setting open mode successfully. + """ + self.SerialSendReceive(self.CMD_SET_AUTHENTICATION_OPEN_MODE, + expect=self.AOK, + msg='setting authentication open mode') + return True + + def SetAuthenticationPinMode(self): + """Set the authentication to pin mode (host to send a matching pin). + Returns: + True if setting pin code mode successfully. + """ + self.SerialSendReceive(self.CMD_SET_AUTHENTICATION_PIN_MODE, + expect=self.AOK, + msg='setting authentication pin mode') + return True + + def GetServiceProfile(self): + """Get the service profile. + Returns: + a string representing the service profile + """ + result = self.SerialSendReceive(self.CMD_GET_SERVICE_PROFILE, + msg='getting service profile') + return self.SERVICE_PROFILE.get(result) + + def SetServiceProfileSPP(self): + """Set SPP as service profile. + + Returns: + True if setting SPP profile successfully. + """ + self.SerialSendReceive(self.CMD_SET_SERVICE_PROFILE_SPP, + expect=self.AOK, + msg='setting SPP as service profile') + return True + + def SetServiceProfileHID(self): + """Set HID as service profile. + + Returns: + True if setting HID profile successfully. + """ + self.SerialSendReceive(self.CMD_SET_SERVICE_PROFILE_HID, + expect=self.AOK, + msg='setting HID as service profile') + return True + + def GetLocalBluetoothAddress(self): + """Get the local RN-42 bluetooth mac address. + Returns: + the bluetooth mac address of the kit + """ + return self.SerialSendReceive(self.CMD_GET_RN42_BLUETOOTH_MAC, + msg='getting local bluetooth address') + + def GetConnectionStatus(self): + """Get the connection status. + the connection status returned from the kit could be + '0,0,0': not connected + '1,0,0': connected + Returns: + Ture if RN-42 is connected. + """ + result = self.SerialSendReceive(self.CMD_GET_CONNECTION_STATUS, + msg='getting connection status') + connection = result.split(',')[0] + return connection == '1' + + def GetRemoteConnectedBluetoothAddress(self): + """Get the bluetooth mac address of the current connected remote host. + Returns: + the bluetooth mac address of the remote connected device if applicable, + or None if there is no remote connected device. + """ + result = self.SerialSendReceive( + self.CMD_GET_REMOTE_CONNECTED_BLUETOOTH_MAC, + msg='getting local bluetooth address') + # result is '000000000000' if there is no remote connected device + return None if result == '000000000000' else result + + def GetHIDDeviceType(self): + """Get the HID device type. + Returns: + a string representing the HID device type + """ + result = self.SerialSendReceive(self.CMD_GET_HID, + msg='getting HID device type') + return self.HID_DEVICE_TYPE.get(result) + + def SetHIDKeyboard(self): + """Set keyboard as the HID device. + Returns: + True if setting keyboard as the HID device successfully. + """ + self.SerialSendReceive(self.CMD_SET_HID_KEYBOARD, + expect=self.AOK, + msg='setting keyboard as HID device') + return True + + def SetHIDGamepad(self): + """Set game pad as the HID device. + Returns: + True if setting game pad as the HID device successfully. + """ + self.SerialSendReceive(self.CMD_SET_HID_GAMEPAD, + expect=self.AOK, + msg='setting gamepad as HID device') + return True + + def SetHIDMouse(self): + """Set mouse as the HID device. + + Returns: + True if setting mouse as the HID device successfully. + """ + self.SerialSendReceive( + self.CMD_SET_HID_MOUSE, + expect=self.AOK, + msg='setting mouse as HID device') + return True + + def SetHIDCombo(self): + """Set combo as the HID device. + + Returns: + True if setting combo as the HID device successfully. + """ + self.SerialSendReceive( + self.CMD_SET_HID_COMBO, + expect=self.AOK, + msg='setting combo as HID device') + return True + + def SetHIDJoystick(self): + """Set joystick as the HID device. + + Returns: + True if setting joystick as the HID device successfully. + """ + self.SerialSendReceive( + self.CMD_SET_HID_JOYSTICK, + expect=self.AOK, + msg='setting joystick as HID device') + return True + + +def GetRN42Info(): + """A simple demo of getting RN-42 information.""" + print('Hello RN-42-EK') + + rn42 = RN42() + print('enter:', rn42.EnterCommandMode()) + print('chip name:', rn42.GetChipName()) + print('firmware version:', rn42.GetFirmwareVersion()) + print('operation mode:', rn42.GetOperationMode()) + print('authentication mode:', rn42.GetAuthenticationMode()) + print('service profile:', rn42.GetServiceProfile()) + print('local bluetooth address:', rn42.GetLocalBluetoothAddress()) + print('connection status:', rn42.GetConnectionStatus()) + print('remote bluetooth address:', rn42.GetRemoteConnectedBluetoothAddress()) + print('HID device type:', rn42.GetHIDDeviceType()) + print('leave:', rn42.LeaveCommandMode()) + + +if __name__ == '__main__': + GetRN42Info() diff --git a/serial_utils.py b/serial_utils.py new file mode 100644 index 0000000..7fef92e --- /dev/null +++ b/serial_utils.py @@ -0,0 +1,488 @@ +# Lint as: python2, python3 +# Copyright 2016 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Utilities for serial port communication. +For some test cases, DUT needs to communicates with fixuture via USB-Serial +dungle. We provides FindTtyByDriver() to help finding the right +/dev/tty* path for the given driver; and OpenSerial() to open a serial port. + +Provides an interface to communicate w/ a serial device: SerialDevice. See +class comment for details. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +import glob +import logging +import os +import re +# site-packages: dev-python/pyserial +import serial +import time +# from six.moves import range + + +def OpenSerial(**kwargs): + """Tries to open a serial port. + + Args: + kwargs: a dict of parameters for a serial connection. Should contain + 'port'. For other parameters, like 'baudrate', 'bytesize', 'parity' + , 'stopbits' and 'timeout', please refer pySerial documentation. + + Returns: + serial object if successful. + + Raises: + ValueError if kwargs is invalid; otherwise, serial.SerialException. + """ + port = kwargs.get('port') + if not port: + raise ValueError('Missing parameter "port".') + ser = serial.Serial(**kwargs) + if not ser.isOpen(): + raise serial.SerialException('Failed to open serial: %r' % port) + return ser + + +def FindTtyByDriver(driver_name, + interface_protocol=None, + multiple_ports=False): + """Finds the tty terminal matched to driver_name and interface protocol. + Checks the interface protocol if specified. In some situations where there + may exist multiple ports with the same driver, use the interface_protocol + to distinguish between them. An example is Arduino DUE board with a + Programming Port and a Native USB Port. + In some cases there may be multiple ports with the same driver and with the + same interface_protocol, set multiple_ports to True and all matched paths + found will be returned in a list. + + Args: + driver_name: driver name for the target TTY device. + interface_protocol: the interface protocol for the target TTY device. + multiple_ports: determines whether it returns all matched paths by list, or + just return the first found one. + + Returns: + If multiple_ports is True, return /dev/tty path if driver_name is matched; + None if not found. + If multiple_ports is False, return a list of all matched /dev/tty path; An + empty list if not found. + """ + matched_candidates = [] + for candidate in glob.glob('/dev/tty*'): + device_path = '/sys/class/tty/%s/device' % os.path.basename(candidate) + driver_path = os.path.realpath(os.path.join(device_path, 'driver')) + # Check if driver_name exist at the tail of driver_path. + if re.search(driver_name + '$', driver_path): + if (interface_protocol is None or interface_protocol + == DeviceInterfaceProtocol(device_path)): + if multiple_ports: + matched_candidates.append(candidate) + else: + return candidate + if multiple_ports: + return matched_candidates + else: + return None + + +def FindTtyByPortIndex(port_index, driver_name=None): + """Finds serial port path tty* with given port index. + Port index is fixed as the layout of physical ports. + Example: if serial path is ttyUSB0 for port_index = 1-1, the system path + /sys/class/tty/ttyUSB0/device will be linked to + /sys/devices/pci0000..../..../usb1/1-1/... + + Args: + port_index: String for serial connection port index. + driver_name: String for serial connection driver. + + Returns: + matched /dev/tty path. Return None if no port has been detected. + """ + for candidate in glob.glob('/dev/tty*'): + device_path = '/sys/class/tty/%s/device' % os.path.basename(candidate) + driver_path = os.path.realpath(os.path.join(device_path, 'driver')) + # If driver_name is given, check if driver_name exists at the tail of + # driver_path. + if driver_name and not driver_path.endswith(driver_name): + continue + device_path = os.path.realpath(device_path) + # Check if port_index exists in device_path. + if '/%s/' % port_index in device_path: + logging.info('Find serial path : %s', candidate) + return candidate + return None + + +def FindTtyByUsbVidPid(usb_vid, usb_pid, driver_name=None): + """Finds the tty for the usb device with given vid, pid, and driver. + This is more useful than port index or driver alone, but only works if your + devices have different VID/PID/(optionally driver) values. + Otherwise, it won't help, and you'll have to probe something else. + + Args: + usb_vid: The USB VID (Vendor ID) as a hexadecimal string + usb_pid: The USB PID (Product ID) as a hexadecimal string + driver_name: String for serial connection driver. + Returns: + matched /dev/tty path. Return None if no port has been detected. + """ + try: + import pyudev + except ImportError: + logging.error("Failed to import pyudev") + return None + port = None + context = pyudev.Context() + for device in context.list_devices(subsystem='tty'): + if 'ID_VENDOR' not in device: + continue + if usb_vid is not None: + if device['ID_VENDOR_ID'] != usb_vid: + continue + if usb_pid is not None: + if device['ID_MODEL_ID'] != usb_pid: + continue + if driver_name is not None: + if device['ID_USB_DRIVER'] != driver_name: + continue + port = device.device_node + break + logging.info('Found USB serial tty: %s', port) + return port + + +def FindTtyListByUsbVidPid(usb_vid, usb_pid): + """Returns list of TTYs matching vid/pid and driver (if provided). + There may be more than one attached serial peripheral with matching + {vid,pid,driver}. To distinguish between peripherals in this case, + give caller an opportunity to select specific port to connect on. + Args: + usb_vid: The USB VID (Vendor ID) as a hexadecimal string + usb_pid: The USB PID (Product ID) as a hexadecimal string + driver_name: String for serial connection driver. + Returns: + List of serial devices with additional attributes + """ + serial_devices = [] + # TODO(yuhsuan): There is no pyudev package in chameleon and pip is broken + # now. Add a handler here to avoid chameleon crash until we get new image + # of chameleon. (crbug.com/951703) + try: + import pyudev + except ImportError: + logging.error("Failed to import pyudev") + return serial_devices + context = pyudev.Context() + for device in context.list_devices(subsystem='tty'): + if 'ID_VENDOR' not in device: + continue + if usb_vid is not None: + if device['ID_VENDOR_ID'] != usb_vid: + continue + if usb_pid is not None: + if device['ID_MODEL_ID'] != usb_pid: + continue + if 'ID_SERIAL_SHORT' not in device: + continue + # (vid,pid) match. Append device to list for caller to validate + # serial number. + serial_devices.append({ + 'vid': device['ID_VENDOR_ID'], + 'pid': device['ID_MODEL_ID'], + 'serial': device['ID_SERIAL_SHORT'], + 'port': device.device_node + }) + return serial_devices + + +def ReadSysfsFile(path): + """Extracts the contents of the given sysfs file. + Intended for use on the one-line files in sysfs that contain small amounts of + data we want to know. + Args: + path: The path to the sysfs file to read. + Returns: + The file if found else '' + """ + try: + with open(path) as f: + return f.read().strip() + except IOError: + return '' + + +def DeviceInterfaceProtocol(device_path): + """Extracts the interface protocol of the specified device path. + Args: + device_path: The tty device path in the sysfs. + Returns: + The interface protocol if found else '' + """ + interface_protocol_path = os.path.join(device_path, 'bInterfaceProtocol') + try: + with open(interface_protocol_path) as f: + return f.read().strip() + except IOError: + return '' + + +class SerialDevice(object): + """Interface to communicate with a serial device. + Instead of giving a fixed port, it can look up port by driver name. + It has several handy methods, like SendRecv() and SendExpectRecv(), + which support fail retry. + + Property: + log: True to enable logging. + + Usage: + fixture = SerialDevice() + fixture.Connect(driver='pl2303') + # Send 'P' for ping fixture and expect an 'OK' response. + # Allow to retry twice. + fixture.SendExpectReceive('P', 'OK', retry=2) + # Send 'FID' for getting fixture ID. Return received result. No retry. + fixture_id = fixture.SendRecv('FID') + """ + + def __init__(self, + send_receive_interval_secs=0.2, + retry_interval_secs=0.5, + log=False): + """Constructor. + Sets intervals between send/receive and between retries. + Also, setting log to True to emit actions to logging.info. + + Args: + send_receive_interval_secs: interval (seconds) between send-receive. + retry_interval_secs: interval (seconds) between retrying command. + log: True to enable logging. + """ + self._serial = None + self.port = '' + self.send_receive_interval_secs = send_receive_interval_secs + self.retry_interval_secs = retry_interval_secs + self.log = log + + def __del__(self): + self.Disconnect() + + def Connect(self, + driver=None, + port=None, + usb_vid=None, + usb_pid=None, + known_device_set=None, + baudrate=9600, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=0.5, + writeTimeout=0.5): + """Opens a serial connection by port, by device driver name, or by VID/PID. + All three of driver, usb_vid, and usb_pid must be specified to lookup by + VID/PID. + + Args: + driver: driver name of the target serial connection. used to look up port if port is not specified. + usb_vid: USB VID of the target serial connection. used to look up port if port is not specified. + usb_pid: USB PID of the target serial connection. used to look up port if port is not specified. + port: See serial.Serial(). + baudrate: See serial.Serial(). + bytesize: See serial.Serial(). + parity: See serial.Serial(). + stopbits: See serial.Serial(). + timeout: See serial.Serial(). + writeTimeout: See serial.Serial(). + + Raises: + SerialException on errors. + """ + if driver and not port: + if usb_vid and usb_pid: + if known_device_set: + devices = FindTtyListByUsbVidPid(usb_vid, usb_pid) + for device in devices: + if device['serial'] in known_device_set: + port = device['port'] + break + else: + port = FindTtyByUsbVidPid(usb_vid, + usb_pid, + driver_name=driver) + else: + port = FindTtyByDriver(driver) + if not port: + raise serial.SerialException( + 'Serial device with driver %r not found' % driver) + self.port = port + self._serial = OpenSerial(port=port, + baudrate=baudrate, + bytesize=bytesize, + parity=parity, + stopbits=stopbits, + timeout=timeout, + writeTimeout=writeTimeout) + if self.log: + logging.info('Serial port %r opened', port) + + def Disconnect(self): + """Closes the connection if it exists.""" + if self._serial: + self._serial.close() + + def SetTimeout(self, read_timeout, write_timeout): + """Overrides read/write timeout. + Args: + read_timeout: read timeout. + write_timeout: write timeout. + """ + self._serial.setTimeout(read_timeout) + self._serial.setWriteTimeout(write_timeout) + + def GetTimeout(self): + """Returns (read timeout, write timeout).""" + return (self._serial.getTimeout(), self._serial.getWriteTimeout()) + + def Send(self, command, flush=True): + """Sends a command. It blocks at most write_timeout seconds. + Args: + command: command to send. + flush: call flush() after write(). Default True. + + Raises: + SerialTimeoutException if it is timeout and fails to send the command. + SerialException if it is disconnected during sending. + """ + try: + start_time = time.time() + self._serial.write(command.encode()) + if flush: + self._serial.flush() + if self.log: + duration = time.time() - start_time + logging.info('Successfully sent %r. Took %.3f seconds', command, duration) + except serial.SerialTimeoutException: + error_message = 'Send %r timeout after %.2f seconds' % ( + command, self._serial.getWriteTimeout()) + if self.log: + logging.warning(error_message) + raise serial.SerialTimeoutException(error_message) + except serial.SerialException: + raise serial.SerialException('Serial disconnected') + + def Receive(self, size=1): + """Receives N bytes. It blocks at most timeout seconds. + Args: + size: number of bytes to receive. 0 means receiving what already in the input buffer. + + Returns: + Received N bytes. + + Raises: + SerialTimeoutException if it fails to receive N bytes. + """ + start_time = time.time() + if size == 0: + size = self._serial.inWaiting() + response = self._serial.read(size) + + if len(response) == size: + if self.log: + duration = time.time() - start_time + logging.info('Successfully received %r. Took %.3f seconds', response, duration) + return response + else: + error_message = 'Receive %d bytes timeout after %.2f seconds' % (size, self._serial.getTimeout()) + if self.log: + logging.warning(error_message) + raise serial.SerialTimeoutException(error_message) + + def ReceiveLine(self): + """Receives one line. It blocks at most timeout seconds. + + Returns: + Received data of one line. + """ + return self._serial.readline() + + def FlushBuffer(self): + """Flushes input/output buffer.""" + self._serial.flushInput() + self._serial.flushOutput() + + def SendReceive(self, command, size=1, retry=0, interval_secs=None, suppress_log=False): + """Sends a command and returns a N bytes response. + Args: + command: command to send + size: number of bytes to receive. 0 means receiving what already in the input buffer. + retry: number of retry. + interval_secs: #seconds to wait between send and receive. If specified, overrides self.send_receive_interval_secs. + suppress_log: True to disable log regardless of self.log value. + + Returns: + Received N bytes. + + Raises: + SerialTimeoutException if it fails to receive N bytes. + """ + for nth_run in range(retry + 1): + self.FlushBuffer() + try: + self.Send(command) + if interval_secs is None: + time.sleep(self.send_receive_interval_secs) + else: + time.sleep(interval_secs) + + response = self.Receive(size) + if not suppress_log and self.log: + logging.info('Successfully sent %r and received %r', + command, response) + return response + except serial.SerialTimeoutException: + if nth_run < retry: + time.sleep(self.retry_interval_secs) + error_message = 'Timeout receiving %d bytes for command %r' % (size, command) + + if not suppress_log and self.log: + logging.warning(error_message) + raise serial.SerialTimeoutException(error_message) + + def SendExpectReceive(self, command, expect_response, retry=0, interval_secs=None): + """Sends a command and expects to receive a response. + Args: + command: command to send + expect_response: expected response received + retry: number of retry. + interval_secs: #seconds to wait between send and receive. If specified, overrides self.send_receive_interval_secs. + + Returns: + True if command is sent and expected response received. + """ + try: + response = self.SendReceive( + command, + len(expect_response), + retry=retry, + interval_secs=interval_secs, + suppress_log=True) + except serial.SerialTimeoutException: + if self.log: + logging.warning('SendReceive timeout for command %r', command) + return False + if self.log: + if response == expect_response: + logging.info( + 'Successfully sent %r and received expected response %r', + command, expect_response) + else: + logging.warning('Sent %r but received %r (expected: %r)', + command, response, expect_response) + return response == expect_response