diff --git a/sbot/future/classless/__init__.py b/sbot/future/classless/__init__.py new file mode 100644 index 0000000..9164a88 --- /dev/null +++ b/sbot/future/classless/__init__.py @@ -0,0 +1,3 @@ +from .robot import arduino, comp, motor, power, servo, utils + +__all__ = ['arduino', 'comp', 'motor', 'power', 'servo', 'utils'] diff --git a/sbot/future/classless/arduinos.py b/sbot/future/classless/arduinos.py new file mode 100644 index 0000000..c1faad7 --- /dev/null +++ b/sbot/future/classless/arduinos.py @@ -0,0 +1,176 @@ +"""The Arduino module provides an interface to the Arduino firmware.""" +from enum import Enum, IntEnum + +from sbot.logging import log_to_debug +from sbot.serial_wrapper import SerialWrapper +from sbot.utils import map_to_float + +from .utils import BoardManager + + +class GPIOPinMode(str, Enum): + """The possible modes for a GPIO pin.""" + + INPUT = 'INPUT' + INPUT_PULLUP = 'INPUT_PULLUP' + OUTPUT = 'OUTPUT' + + +class AnalogPin(IntEnum): + """The analog pins on the Arduino.""" + + A0 = 14 + A1 = 15 + A2 = 16 + A3 = 17 + A4 = 18 + A5 = 19 + + +DISABLED_PINS = (0, 1) +AVAILABLE_PINS = range(0, max(AnalogPin) + 1) + +ADC_MAX = 1023 # 10 bit ADC +ADC_MIN = 0 + + +class Arduino: + """ + The Arduino board interface. + + This is intended to be used with Arduino Uno boards running the sbot firmware. + + :param boards: The BoardManager object containing the arduino board references. + """ + + __slots__ = ('_boards',) + + def __init__(self, boards: BoardManager): + # Obtain a reference to the arduino + # This is contained in a list to allow for it to be populated later + self._boards = boards.arduino + + @log_to_debug + def set_pin_mode(self, pin: int, mode: GPIOPinMode) -> None: + """ + Set the mode of the pin. + + To do analog or digital reads set the mode to INPUT or INPUT_PULLUP. + To do digital writes set the mode to OUTPUT. + + :param pin: The pin to set the mode of. + :param value: The mode to set the pin to. + :raises IOError: If the pin mode is not a GPIOPinMode. + :raises IOError: If this pin cannot be controlled. + """ + port = self._get_port() + self._validate_pin(pin) + if not isinstance(mode, GPIOPinMode): + raise IOError('Pin mode only supports being set to a GPIOPinMode') + port.write(f'PIN:{pin}:MODE:SET:{mode.value}') + + @log_to_debug + def digital_read(self, pin: int) -> bool: + """ + Perform a digital read on the pin. + + :param pin: The pin to read from. + :raises IOError: If the pin's current mode does not support digital read + :raises IOError: If this pin cannot be controlled. + :return: The digital value of the pin. + """ + port = self._get_port() + self._validate_pin(pin) + response = port.query(f'PIN:{pin}:DIGITAL:GET?') + return (response == '1') + + @log_to_debug + def digital_write(self, pin: int, value: bool) -> None: + """ + Write a digital value to the pin. + + :param pin: The pin to write to. + :param value: The value to write to the pin. + :raises IOError: If the pin's current mode does not support digital write. + :raises IOError: If this pin cannot be controlled. + """ + port = self._get_port() + self._validate_pin(pin) + try: + if value: + port.write(f'PIN:{pin}:DIGITAL:SET:1') + else: + port.write(f'PIN:{pin}:DIGITAL:SET:0') + except RuntimeError as e: + # The firmware returns a NACK if the pin is not in OUTPUT mode + if 'is not supported in' in str(e): + raise IOError(str(e)) + + @log_to_debug + def analog_read(self, pin: int) -> float: + """ + Get the analog voltage on the pin. + + This is returned in volts. Only pins A0-A5 support analog reads. + + :param pin: The pin to read from. + :raises IOError: If the pin or its current mode does not support analog read. + :raises IOError: If this pin cannot be controlled. + :return: The analog voltage on the pin, ranges from 0 to 5. + """ + port = self._get_port() + self._validate_pin(pin) + if pin not in AnalogPin: + raise IOError('Pin does not support analog read') + try: + response = port.query(f'PIN:{pin}:ANALOG:GET?') + except RuntimeError as e: + # The firmware returns a NACK if the pin is not in INPUT mode + if 'is not supported in' in str(e): + raise IOError(str(e)) + # map the response from the ADC range to the voltage range + return map_to_float(int(response), ADC_MIN, ADC_MAX, 0.0, 5.0) + + @log_to_debug + def measure_ultrasound_distance(self, pulse_pin: int, echo_pin: int) -> int: + """ + Measure the distance to an object using an ultrasound sensor. + + The sensor can only measure distances up to 4000mm. + + :param pulse_pin: The pin to send the ultrasound pulse from. + :param echo_pin: The pin to read the ultrasound echo from. + :raises ValueError: If either of the pins are invalid + :return: The distance measured by the ultrasound sensor in mm. + """ + port = self._get_port() + try: # bounds check + self._validate_pin(pulse_pin) + except (IndexError, IOError): + raise ValueError("Invalid pulse pin provided") from None + try: + self._validate_pin(echo_pin) + except (IndexError, IOError): + raise ValueError("Invalid echo pin provided") from None + + response = port.query(f'ULTRASOUND:{pulse_pin}:{echo_pin}:MEASURE?') + return int(response) + + def _validate_pin(self, pin: int) -> None: + if pin in DISABLED_PINS: + raise IOError('This pin cannot be controlled.') + if pin not in AVAILABLE_PINS: + raise IndexError(f'Pin {pin} is not available on the Arduino.') + + def _get_port(self) -> SerialWrapper: + if len(self._boards) == 0: + raise RuntimeError("No Arduino connected") + return self._boards[0] + + def __repr__(self) -> str: + try: + port = self._get_port() + except RuntimeError: + return f"<{self.__class__.__qualname__} no arduino connected>" + else: + return f"<{self.__class__.__qualname__} {port}>" diff --git a/sbot/future/classless/comp.py b/sbot/future/classless/comp.py new file mode 100644 index 0000000..aec8159 --- /dev/null +++ b/sbot/future/classless/comp.py @@ -0,0 +1,149 @@ +""" +Implementation of loading metadata. + +Metadata is a dictionary of information about the environment that the robot is running in. +It usually includes the starting zone and a flag indicating whether we are in +competition or development mode. Metadata is stored in a JSON file, typically on a +competition USB stick. The environment variable SBOT_METADATA_PATH specifies a directory +where it, and its children, are searched for the JSON file to load. + +Example metadata file: +```json +{ + "zone": 2, + "is_competition": true +} +``` +""" +from __future__ import annotations + +import json +import logging +import os +from pathlib import Path +from typing import TypedDict + +from sbot.exceptions import MetadataKeyError, MetadataNotReadyError + +logger = logging.getLogger(__name__) + +# The name of the environment variable that specifies the path to search +# for metadata USB sticks +METADATA_ENV_VAR = "SBOT_METADATA_PATH" +# The name of the metadata file +METADATA_NAME = "metadata.json" + + +class Metadata(TypedDict): + """ + The structure of the metadata dictionary. + + :param is_competition: Whether the robot is in competition mode + :param zone: The zone that the robot is in + """ + + is_competition: bool + zone: int + + +# The default metadata to use if no file is found +DEFAULT_METADATA: Metadata = { + "is_competition": False, + "zone": 0, +} + + +class Comp: + """ + A collection of the robot metadata. + + This class is used to load and access the metadata of the robot. + """ + + def __init__(self) -> None: + self._metadata: Metadata | None = None + + @property + def is_competition(self) -> bool: + """ + Whether the robot is in a competition environment. + + This value is not available until wait_start has been called. + + :raises MetadataNotReadyError: If the metadata has not been loaded + """ + if self._metadata is None: + raise MetadataNotReadyError() + return self._metadata["is_competition"] + + @property + def zone(self) -> int: + """ + The zone that the robot is in. + + This value is not available until wait_start has been called. + + :raises MetadataNotReadyError: If the metadata has not been loaded + """ + if self._metadata is None: + raise MetadataNotReadyError() + return self._metadata["zone"] + + def _load(self) -> None: + """ + Search for a metadata file and load it. + + Searches the path identified by SBOT_METADATA_PATH and its children for + metadata.json (set by METADATA_NAME) and reads it. + """ + search_path = os.environ.get(METADATA_ENV_VAR) + if search_path: + search_root = Path(search_path) + if not search_root.is_dir(): + logger.error(f"Metaddata path {search_path} does not exist") + return + for item in Path(search_path).iterdir(): + try: + if item.is_dir() and (item / METADATA_NAME).exists(): + self._metadata = _load_metadata(item / METADATA_NAME) + elif item.name == METADATA_NAME: + self._metadata = _load_metadata(item) + return + except PermissionError: + logger.debug(f"Unable to read {item}") + else: + logger.info(f"No JSON metadata files found in {search_path}") + else: + logger.info(f"{METADATA_ENV_VAR} not set, not loading metadata") + + self._metadata = DEFAULT_METADATA + + +def _load_metadata(path: Path) -> Metadata: + """ + Load the metadata from a JSON file, found by `load`. + + The file must be a JSON dictionary with the keys `is_competition` and `zone`. + + :param path: The path to the metadata file + :raises RuntimeError: If the metadata file is invalid JSON + :raises TypeError: If the metadata file is not a JSON dictionary + :raises MetadataKeyError: If the metadata file is missing a required key + :return: The metadata dictionary + """ + logger.info(f"Loading metadata from {path}") + with path.open() as file: + try: + obj: Metadata = json.load(file) + except json.decoder.JSONDecodeError as e: + raise RuntimeError("Unable to load metadata.") from e + + if not isinstance(obj, dict): + raise TypeError(f"Found metadata file, but format is invalid. Got: {obj}") + + # check required keys exist at runtime + for key in Metadata.__annotations__.keys(): + if key not in obj.keys(): + raise MetadataKeyError(key) + + return obj diff --git a/sbot/future/classless/motors.py b/sbot/future/classless/motors.py new file mode 100644 index 0000000..20b2801 --- /dev/null +++ b/sbot/future/classless/motors.py @@ -0,0 +1,165 @@ +"""The interface for a single motor board output over serial.""" +from __future__ import annotations + +from enum import IntEnum +from typing import ClassVar, NamedTuple + +from sbot.logging import log_to_debug +from sbot.serial_wrapper import SerialWrapper +from sbot.utils import float_bounds_check, map_to_float, map_to_int + +from .utils import BoardIdentifier, BoardManager + + +class MotorPower(IntEnum): + """Special values for motor power.""" + + BRAKE = 0 + COAST = -1024 # A value outside the allowable range + + +class MotorStatus(NamedTuple): + """A tuple representing the status of the motor board.""" + + output_faults: tuple[bool, ...] + input_voltage: float + other: ClassVar[list[str]] = [] + + @classmethod + def from_status_response(cls, response: str) -> MotorStatus: + """ + Create a MotorStatus object from the response to a status command. + + :param response: The response from a *STATUS? command. + :raise TypeError: If the response is invalid. + :return: A MotorStatus object. + """ + output_fault_str, input_voltage_mv, *other = response.split(':') + output_faults = tuple((port == '1') for port in output_fault_str.split(',')) + input_voltage = float(input_voltage_mv) / 1000 + return cls(output_faults, input_voltage, other) + + +class Motor: + """ + A class representing a single motor board output. + + This class is intended to be used to communicate with the motor board over serial + using the text-based protocol added in version 4.4 of the motor board firmware. + + :param boards: The BoardManager object containing the motor board references. + """ + + __slots__ = ('_outputs',) + + def __init__(self, boards: BoardManager): + # Obtain a reference to the list of output ports + # This may not have been populated yet + self._outputs = boards.motors + + @log_to_debug + def set_power(self, id: int, power: float) -> None: + """ + Set the power of the motor. + + Internally this method maps the power to an integer between + -1000 and 1000 so only 3 digits of precision are available. + + :param id: The ID of the motor. + :param value: The power of the motor as a float between -1.0 and 1.0 + or the special values MotorPower.COAST and MotorPower.BRAKE. + """ + output = self._find_output(id) + if power == MotorPower.COAST: + output.port.write(f'MOT:{output.idx}:DISABLE') + return + power = float_bounds_check( + power, -1.0, 1.0, + 'Motor power is a float between -1.0 and 1.0') + + setpoint = map_to_int(power, -1.0, 1.0, -1000, 1000) + output.port.write(f'MOT:{output.idx}:SET:{setpoint}') + + @log_to_debug + def get_power(self, id: int) -> float: + """ + Read the current power setting of the motor. + + :param id: The ID of the motor. + :return: The power of the motor as a float between -1.0 and 1.0 + or the special value MotorPower.COAST. + """ + output = self._find_output(id) + response = output.port.query(f'MOT:{output.idx}:GET?') + + data = response.split(':') + enabled = (data[0] == '1') + value = int(data[1]) + + if not enabled: + return MotorPower.COAST + return map_to_float(value, -1000, 1000, -1.0, 1.0, precision=3) + + @log_to_debug + def status(self, id: int) -> MotorStatus: + """ + The status of the board the motor is connected to. + + :param id: The ID of the motor. + :return: The status of the board. + """ + output = self._find_output(id) + response = output.port.query('*STATUS?') + return MotorStatus.from_status_response(response) + + @log_to_debug + def reset(self) -> None: + """ + Reset attached boards. + + This command disables the motors and clears all faults. + :raise RuntimeError: If no motor boards are connected. + """ + boards = self._get_boards() + for board in boards: + board.write('*RESET') + + @log_to_debug + def get_motor_current(self, id: int) -> float: + """ + Read the current draw of the motor. + + :param id: The ID of the motor. + :return: The current draw of the motor in amps. + """ + output = self._find_output(id) + response = output.port.query(f'MOT:{output.idx}:I?') + return float(response) / 1000 + + @log_to_debug + def in_fault(self, id: int) -> bool: + """ + Check if the motor is in a fault state. + + :param id: The ID of the motor. + :return: True if the motor is in a fault state, False otherwise. + """ + output = self._find_output(id) + response = output.port.query('*STATUS?') + return MotorStatus.from_status_response(response).output_faults[output.idx] + + def _find_output(self, id: int) -> BoardIdentifier: + try: + return self._outputs[id] + except IndexError: + raise ValueError(f"Output {id} does not exist") + + def _get_boards(self) -> list[SerialWrapper]: + return sorted( + set(output.port for output in self._outputs), + key=lambda x: x.identity.asset_tag, + ) + + def __repr__(self) -> str: + board_ports = self._get_boards() + return f"<{self.__class__.__qualname__} {board_ports}>" diff --git a/sbot/future/classless/power.py b/sbot/future/classless/power.py new file mode 100644 index 0000000..d7b1ef6 --- /dev/null +++ b/sbot/future/classless/power.py @@ -0,0 +1,207 @@ +"""The interface for a single power board output over serial.""" +from __future__ import annotations + +from enum import IntEnum +from typing import ClassVar, NamedTuple + +from sbot.logging import log_to_debug + +from .utils import BoardIdentifier, BoardManager + + +class PowerOutputPosition(IntEnum): + """ + A mapping of output name to number of the PowerBoard outputs. + + The numbers here are the same as used in communication protocol with the PowerBoard. + """ + + H0 = 0 + H1 = 1 + L0 = 2 + L1 = 3 + L2 = 4 + L3 = 5 + FIVE_VOLT = 6 + + +class BatteryData(NamedTuple): + """ + A container for the battery data. + + :param voltage: The voltage of the battery in volts. + :param current: The current draw of the battery in amps. + """ + + voltage: float + current: float + + +class PowerStatus(NamedTuple): + """ + A named tuple containing the values of the power status output. + + :param overcurrent: A tuple of booleans indicating whether each output is + in an overcurrent state. + :param temperature: The temperature of the power board in degrees Celsius. + :param fan_running: Whether the fan is running. + :param regulator_voltage: The voltage of the regulator in volts. + :param other: A list of any other values returned by the status command. + """ + + overcurrent: tuple[bool, ...] + temperature: int + fan_running: bool + regulator_voltage: float + other: ClassVar[list[str]] = [] + + @classmethod + def from_status_response(cls, response: str) -> PowerStatus: + """ + Create a PowerStatus object from the response to a status command. + + :param response: The response from a *STATUS? command. + :raise TypeError: If the response is invalid. + :return: A PowerStatus object. + """ + oc_flags, temp, fan_running, raw_voltage, *other = response.split(':') + return cls( + overcurrent=tuple((x == '1') for x in oc_flags.split(',')), + temperature=int(temp), + fan_running=(fan_running == '1'), + regulator_voltage=float(raw_voltage) / 1000, + other=other, + ) + + +# This output is always on, and cannot be controlled via the API. +BRAIN_OUTPUT = PowerOutputPosition.L2 + + +class Power: + """ + A class representing a single power board output. + + This class is intended to be used to communicate with the power board over serial + using the text-based protocol. + + :param boards: The BoardManager object containing the power board. + """ + + __slots__ = ('_outputs',) + + def __init__(self, boards: BoardManager): + # Obtain a reference to the list of output ports + # This may not have been populated yet + self._outputs = boards.power + + @log_to_debug + def set_output(self, id: int, on: bool) -> None: + """ + Set whether the output is enabled. + + Outputs that have been disabled due to overcurrent will not be enabled, + but will not raise an error. + + :param id: The ID of the output. + :param value: Whether the output should be enabled. + """ + output = self._find_output(id) + if id == BRAIN_OUTPUT: + # Changing the brain output will also raise a NACK from the firmware + raise RuntimeError("Brain output cannot be controlled via this API.") + if on: + output.port.write(f'OUT:{output.idx}:SET:1') + else: + output.port.write(f'OUT:{output.idx}:SET:0') + + @log_to_debug + def is_output_on(self, id: int) -> bool: + """ + Return whether the output is enabled. + + Outputs are enabled at startup, but will be disabled if the output draws + too much current or are switched off by the user. + + :param id: The ID of the output. + :return: Whether the output is enabled. + """ + output = self._find_output(id) + response = output.port.query(f'OUT:{output.idx}:GET?') + return response == '1' + + @log_to_debug + def get_output_current(self, id: int) -> float: + """ + Return the current draw of the output. + + This current measurement has a 10% tolerance. + + :param id: The ID of the output. + :return: The current draw of the output, in amps. + """ + output = self._find_output(id) + response = output.port.query(f'OUT:{output.idx}:I?') + return float(response) / 1000 + + @log_to_debug + def get_battery_data(self) -> BatteryData: + """ + Get the battery data from the power board. + + This is implemented using an INA219 current sensor on the power board. + + :return: A BatteryData object containing the voltage and current of the battery. + :raise RuntimeError: If no power boards are connected. + """ + output = self._get_power_board() + volt_response = output.port.query('BATT:V?') + curr_response = output.port.query('BATT:I?') + return BatteryData( + voltage=float(volt_response) / 1000, + current=float(curr_response) / 1000 + ) + + @log_to_debug + def status(self) -> PowerStatus: + """ + Return the status of the power board. + + :return: The status of the power board. + :raise RuntimeError: If no power boards are connected. + """ + output = self._get_power_board() + response = output.port.query('*STATUS?') + return PowerStatus.from_status_response(response) + + @log_to_debug + def reset(self) -> None: + """ + Reset the power board. + + This turns off all outputs except the brain output and stops any running tones. + :raise RuntimeError: If no power boards are connected. + """ + if len(self._outputs) == 0: + raise RuntimeError("No power boards connected") + output = self._outputs[0] + output.port.write('*RESET') + + def _find_output(self, id: int) -> BoardIdentifier: + try: + return self._outputs[id] + except IndexError: + raise ValueError(f"Output {id} does not exist") + + def _get_power_board(self) -> BoardIdentifier: + if len(self._outputs) == 0: + raise RuntimeError("No power boards connected") + return self._outputs[0] + + def __repr__(self) -> str: + try: + pb = self._get_power_board() + except RuntimeError: + return f"<{self.__class__.__qualname__} no power board>" + else: + return f"<{self.__class__.__qualname__} {pb.port}>" diff --git a/sbot/future/classless/robot.py b/sbot/future/classless/robot.py new file mode 100644 index 0000000..cc3c5bc --- /dev/null +++ b/sbot/future/classless/robot.py @@ -0,0 +1,21 @@ +from .arduinos import Arduino +from .comp import Comp +from .motors import Motor +from .power import Power +from .servos import Servo +from .utils import BoardManager, Utils + +boards = BoardManager() +power = Power(boards) +motor = Motor(boards) +servo = Servo(boards) +arduino = Arduino(boards) +comp = Comp() +utils = Utils(boards, comp) + +# TODO load overrides +# TODO overrides ENABLE_TRACE_LOGGING, ENABLE_DEBUG_LOGGING + +if True: # TODO override SKIP_WAIT_START + boards.load_boards() + utils.wait_start() diff --git a/sbot/future/classless/servos.py b/sbot/future/classless/servos.py new file mode 100644 index 0000000..0aca963 --- /dev/null +++ b/sbot/future/classless/servos.py @@ -0,0 +1,211 @@ +"""The interface for a single servo board output over serial.""" +from __future__ import annotations + +from collections import defaultdict +from typing import NamedTuple + +from sbot.logging import log_to_debug +from sbot.serial_wrapper import SerialWrapper +from sbot.utils import float_bounds_check, map_to_float, map_to_int + +from .utils import BoardIdentifier, BoardManager + +DUTY_MIN = 300 +DUTY_MAX = 4000 +START_DUTY_MIN = 350 +START_DUTY_MAX = 1980 +NUM_SERVOS = 8 + + +class ServoStatus(NamedTuple): + """A named tuple containing the values of the servo status output.""" + + watchdog_failed: bool + power_good: bool + + @classmethod + def from_status_response(cls, response: str) -> ServoStatus: + """ + Create a ServoStatus from a status response. + + :param response: The response from a *STATUS? command. + :return: The ServoStatus. + """ + data = response.split(':') + + return cls( + watchdog_failed=(data[0] == '1'), + power_good=(data[1] == '1'), + ) + + +class Servo: + """ + A class representing a single servo board output. + + This class is intended to be used to communicate with the servo board over serial + using the text-based protocol added in version 4.3 of the servo board firmware. + + :param boards: The BoardManager object containing the servo board references. + """ + + __slots__ = ('_duty_limits', '_outputs') + + def __init__(self, boards: BoardManager): + self._outputs = boards.servos + + self._duty_limits: dict[int, tuple[int, int]] = defaultdict( + lambda: (START_DUTY_MIN, START_DUTY_MAX), + ) + + @log_to_debug + def set_duty_limits(self, id: int, lower: int, upper: int) -> None: + """ + Set the pulse on-time limits of the servo. + + These limits are used to map the servo position to a pulse on-time. + + :param id: The ID of the servo. + :param lower: The lower limit of the servo pulse in μs. + :param upper: The upper limit of the servo pulse in μs. + :raises TypeError: If the limits are not ints. + :raises ValueError: If the limits are not in the range 300 to 4000. + """ + # Validate output exists + _output = self._find_output(id) + + if not (isinstance(lower, int) and isinstance(upper, int)): + raise TypeError( + f'Servo pulse limits are ints in μs, in the range {DUTY_MIN} to {DUTY_MAX}' + ) + if not (DUTY_MIN <= lower <= DUTY_MAX and DUTY_MIN <= upper <= DUTY_MAX): + raise ValueError( + f'Servo pulse limits are ints in μs, in the range {DUTY_MIN} to {DUTY_MAX}' + ) + + self._duty_limits[id] = (lower, upper) + + @log_to_debug + def get_duty_limits(self, id: int) -> tuple[int, int]: + """ + Get the current pulse on-time limits of the servo. + + The limits are specified in μs. + + :param id: The ID of the servo. + :return: A tuple of the lower and upper limits of the servo pulse in μs. + """ + return self._duty_limits[id] + + @log_to_debug + def set_position(self, id: int, position: float) -> None: + """ + Set the position of the servo. + + If the servo is disabled, this will enable it. + -1.0 to 1.0 may not be the full range of the servo, see set_duty_limits(). + + :param position: The position of the servo as a float between -1.0 and 1.0 + """ + output = self._find_output(id) + position = float_bounds_check( + position, -1.0, 1.0, + 'Servo position is a float between -1.0 and 1.0') + + duty_min, duty_max = self._duty_limits[id] + + setpoint = map_to_int(position, -1.0, 1.0, duty_min, duty_max) + output.port.write(f'SERVO:{output.idx}:SET:{setpoint}') + + @log_to_debug + def get_position(self, id: int) -> float | None: + """ + Get the position of the servo. + + If the servo is disabled, this will return None. + + :return: The position of the servo as a float between -1.0 and 1.0 or None if disabled. + """ + output = self._find_output(id) + response = output.port.query(f'SERVO:{output.idx}:GET?') + data = int(response) + if data == 0: + return None + duty_min, duty_max = self._duty_limits[id] + return map_to_float(data, duty_min, duty_max, -1.0, 1.0, precision=3) + + @log_to_debug + def disable(self, id: int) -> None: + """ + Disable the servo. + + This will cause this channel to output a 0% duty cycle. + + :param id: The ID of the servo. + """ + output = self._find_output(id) + output.port.write(f'SERVO:{output.idx}:DISABLE') + + @log_to_debug + def status(self, id: int) -> ServoStatus: + """ + The status of the board the servo is connected to. + + :param id: The ID of the servo. + :return: A named tuple of the watchdog fail and pgood status. + """ + output = self._find_output(id) + response = output.port.query('*STATUS?') + + return ServoStatus.from_status_response(response) + + @log_to_debug + def reset(self) -> None: + """ + Reset all servo boards. + + This will disable all servos. + """ + for board in self._get_boards(): + board.write('*RESET') + + @log_to_debug + def get_current(self, id: int) -> float: + """ + Get the current draw of the servo board the servo is connected to. + + This only includes the servos powered through the main port, not the aux port. + + :return: The current draw of the board in amps. + """ + output = self._find_output(id) + response = output.port.query('SERVO:I?') + return float(response) / 1000 + + @log_to_debug + def get_voltage(self, id: int) -> float: + """ + Get the voltage of the on-board regulator. + + :param id: The ID of the servo. + :return: The voltage of the on-board regulator in volts. + """ + output = self._find_output(id) + response = output.port.query('SERVO:V?') + return float(response) / 1000 + + def _find_output(self, id: int) -> BoardIdentifier: + try: + return self._outputs[id] + except IndexError: + raise ValueError(f"Output {id} does not exist") + + def _get_boards(self) -> list[SerialWrapper]: + return sorted( + set(output.port for output in self._outputs), + key=lambda x: x.identity.asset_tag, + ) + + def __repr__(self) -> str: + board_ports = self._get_boards() + return f"<{self.__class__.__qualname__} {board_ports}>" diff --git a/sbot/future/classless/utils.py b/sbot/future/classless/utils.py new file mode 100644 index 0000000..a064e5b --- /dev/null +++ b/sbot/future/classless/utils.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from typing import List, NamedTuple + +from sbot.serial_wrapper import SerialWrapper + +from .comp import Comp + + +class Utils: + def __init__(self, boards: BoardManager, comp: Comp): + self._boards = boards + # We need to trigger loading the metadata when wait_start is called + self._comp = comp + + def sleep(self, duration: float) -> None: + pass + + def wait_start(self) -> None: + pass + + def sound_buzzer(self, frequency: int, duration: int) -> None: + # Previously power_board.piezo.buzz + pass + + def load_boards(self) -> None: + self._boards.load_boards() + + +class BoardIdentifier(NamedTuple): + """An identifier for a single output on a board.""" + + port: SerialWrapper + idx: int + + +class BoardManager: + power: List[BoardIdentifier] + motors: List[BoardIdentifier] + servos: List[BoardIdentifier] + # Contains at most one arduino board + arduino: List[SerialWrapper] + # vision + leds: List[BoardIdentifier] + + # TODO all discovery and cleanup functions + + def load_boards(self) -> None: + pass