Skip to content

Commit

Permalink
Update for new board manager
Browse files Browse the repository at this point in the history
  • Loading branch information
WillB97 committed Jan 12, 2025
1 parent 8f3f4b0 commit 0794985
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 119 deletions.
53 changes: 53 additions & 0 deletions sbot/future/board_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,56 @@ def _custom_sort(self, identifier: str, sort_order: list[str]) -> None:
}

self.boards[identifier] = boards_sorted

def find_output(self, identifier: str, idx: int) -> OutputIdentifier:
"""
Find an output on a board.
:param identifier: The identifier of the board.
:param idx: The index of the output on the board.
:return: The OutputIdentifier for the output.
:raises ValueError: If the output does not exist on the board.
:raises KeyError: If no board with the given identifier is registered.
"""
try:
return self.outputs[identifier][idx]
except IndexError:
name = self._name_from_identifier(identifier)
raise ValueError(f"Output {idx} does not exist on {name}")
except KeyError:
raise KeyError(f"No board with identifier {identifier!r}")

def get_boards(self, identifier: str) -> dict[str, SerialWrapper]:
"""
Get all boards of a given type.
:param identifier: The identifier of the board type.
:return: A dictionary of asset tags to SerialWrapper objects.
:raises KeyError: If no board with the given identifier is registered.
"""
try:
return self.boards[identifier]
except KeyError:
raise KeyError(f"No board with identifier {identifier!r}")

def get_first_board(self, identifier: str) -> SerialWrapper:
"""
Get the first board of a given type.
:param identifier: The identifier of the board type.
:return: The SerialWrapper object for the first board.
:raises KeyError: If no board with the given identifier is registered.
"""
try:
return next(iter(self.boards[identifier].values()))
except KeyError:
raise KeyError(f"No board with identifier {identifier!r}") from None
except StopIteration:
name = self._name_from_identifier(identifier)
raise ValueError(f"No {name}s connected") from None

def _name_from_identifier(self, identifier: str) -> str:
for template in self._regisered_templates:
if template.identifier == identifier:
return template.name
return identifier
49 changes: 32 additions & 17 deletions sbot/future/classless/arduinos.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""The Arduino module provides an interface to the Arduino firmware."""
from enum import Enum, IntEnum

from sbot.future.board_manager import BoardManager, DiscoveryTemplate
from sbot.logging import log_to_debug
from sbot.serial_wrapper import SerialWrapper
from sbot.utils import map_to_float

from .utils import BoardManager


class GPIOPinMode(str, Enum):
"""The possible modes for a GPIO pin."""
Expand All @@ -33,6 +31,14 @@ class AnalogPin(IntEnum):
ADC_MAX = 1023 # 10 bit ADC
ADC_MIN = 0

SUPPORTED_VID_PIDS = {
(0x2341, 0x0043), # Arduino Uno rev 3
(0x2A03, 0x0043), # Arduino Uno rev 3
(0x1A86, 0x7523), # Uno
(0x10C4, 0xEA60), # Ruggeduino
(0x16D0, 0x0613), # Ruggeduino
}


class Arduino:
"""
Expand All @@ -43,11 +49,25 @@ class Arduino:
:param boards: The BoardManager object containing the arduino board references.
"""

__slots__ = ('_boards',)
__slots__ = ('_boards', '_identifier')

def __init__(self, boards: BoardManager):
# Obtain a reference to the arduino
# This is contained in a list to allow for it to be populated later
self._identifier = 'arduino'
template = DiscoveryTemplate(
identifier=self._identifier,
name='Arduino',
vid=0, # Populated later
pid=0,
board_type='Arduino',
sim_board_type='Arduino',
use_usb_serial=True,
delay_after_connect=2,
max_boards=1,
)
# Register all the possible Arduino USB IDs
for vid, pid in SUPPORTED_VID_PIDS:
BoardManager.register_board(template._replace(vid=vid, pid=pid))

self._boards = boards

@log_to_debug
Expand All @@ -63,7 +83,7 @@ def set_pin_mode(self, pin: int, mode: GPIOPinMode) -> None:
:raises IOError: If the pin mode is not a GPIOPinMode.
:raises IOError: If this pin cannot be controlled.
"""
port = self._get_port()
port = self._boards.get_first_board(self._identifier)
self._validate_pin(pin)
if not isinstance(mode, GPIOPinMode):
raise IOError('Pin mode only supports being set to a GPIOPinMode')
Expand All @@ -79,7 +99,7 @@ def digital_read(self, pin: int) -> bool:
:raises IOError: If this pin cannot be controlled.
:return: The digital value of the pin.
"""
port = self._get_port()
port = self._boards.get_first_board(self._identifier)
self._validate_pin(pin)
response = port.query(f'PIN:{pin}:DIGITAL:GET?')
return (response == '1')
Expand All @@ -94,7 +114,7 @@ def digital_write(self, pin: int, value: bool) -> None:
:raises IOError: If the pin's current mode does not support digital write.
:raises IOError: If this pin cannot be controlled.
"""
port = self._get_port()
port = self._boards.get_first_board(self._identifier)
self._validate_pin(pin)
try:
if value:
Expand All @@ -118,7 +138,7 @@ def analog_read(self, pin: int) -> float:
:raises IOError: If this pin cannot be controlled.
:return: The analog voltage on the pin, ranges from 0 to 5.
"""
port = self._get_port()
port = self._boards.get_first_board(self._identifier)
self._validate_pin(pin)
if pin not in AnalogPin:
raise IOError('Pin does not support analog read')
Expand All @@ -143,7 +163,7 @@ def measure_ultrasound_distance(self, pulse_pin: int, echo_pin: int) -> int:
:raises ValueError: If either of the pins are invalid
:return: The distance measured by the ultrasound sensor in mm.
"""
port = self._get_port()
port = self._boards.get_first_board(self._identifier)
try: # bounds check
self._validate_pin(pulse_pin)
except (IndexError, IOError):
Expand All @@ -162,14 +182,9 @@ def _validate_pin(self, pin: int) -> None:
if pin not in AVAILABLE_PINS:
raise IndexError(f'Pin {pin} is not available on the Arduino.')

def _get_port(self) -> SerialWrapper:
if self._boards.arduino is None:
raise RuntimeError("No Arduino connected")
return self._boards.arduino

def __repr__(self) -> str:
try:
port = self._get_port()
port = self._boards.get_first_board(self._identifier)
except RuntimeError:
return f"<{self.__class__.__qualname__} no arduino connected>"
else:
Expand Down
50 changes: 29 additions & 21 deletions sbot/future/classless/motors.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""The interface for a single motor board output over serial."""
from __future__ import annotations

import logging
from enum import IntEnum
from typing import ClassVar, NamedTuple

from sbot.future.board_manager import BoardManager, DiscoveryTemplate
from sbot.logging import log_to_debug
from sbot.serial_wrapper import SerialWrapper
from sbot.utils import float_bounds_check, map_to_float, map_to_int

from .utils import BoardIdentifier, BoardManager
logger = logging.getLogger(__name__)


class MotorPower(IntEnum):
Expand Down Expand Up @@ -50,12 +52,24 @@ class Motor:
:param boards: The BoardManager object containing the motor board references.
"""

__slots__ = ('_outputs',)
__slots__ = ('_boards', '_identifier')

def __init__(self, boards: BoardManager):
self._identifier = 'motor'
template = DiscoveryTemplate(
identifier=self._identifier,
name='motor board',
vid=0x0403,
pid=0x6001,
board_type='MCv4B',
num_outputs=2,
cleanup=self._cleanup,
sim_board_type='MotorBoard',
)
BoardManager.register_board(template)
# Obtain a reference to the list of output ports
# This may not have been populated yet
self._outputs = boards.motors
self._boards = boards

@log_to_debug
def set_power(self, id: int, power: float) -> None:
Expand All @@ -69,7 +83,7 @@ def set_power(self, id: int, power: float) -> None:
:param value: The power of the motor as a float between -1.0 and 1.0
or the special values MotorPower.COAST and MotorPower.BRAKE.
"""
output = self._find_output(id)
output = self._boards.find_output(self._identifier, id)
if power == MotorPower.COAST:
output.port.write(f'MOT:{output.idx}:DISABLE')
return
Expand All @@ -89,7 +103,7 @@ def get_power(self, id: int) -> float:
:return: The power of the motor as a float between -1.0 and 1.0
or the special value MotorPower.COAST.
"""
output = self._find_output(id)
output = self._boards.find_output(self._identifier, id)
response = output.port.query(f'MOT:{output.idx}:GET?')

data = response.split(':')
Expand All @@ -108,7 +122,7 @@ def status(self, id: int) -> MotorStatus:
:param id: The ID of the motor.
:return: The status of the board.
"""
output = self._find_output(id)
output = self._boards.find_output(self._identifier, id)
response = output.port.query('*STATUS?')
return MotorStatus.from_status_response(response)

Expand All @@ -120,7 +134,7 @@ def reset(self) -> None:
This command disables the motors and clears all faults.
:raise RuntimeError: If no motor boards are connected.
"""
boards = self._get_boards()
boards = self._boards.get_boards(self._identifier).values()
for board in boards:
board.write('*RESET')

Expand All @@ -132,7 +146,7 @@ def get_motor_current(self, id: int) -> float:
:param id: The ID of the motor.
:return: The current draw of the motor in amps.
"""
output = self._find_output(id)
output = self._boards.find_output(self._identifier, id)
response = output.port.query(f'MOT:{output.idx}:I?')
return float(response) / 1000

Expand All @@ -144,23 +158,17 @@ def in_fault(self, id: int) -> bool:
:param id: The ID of the motor.
:return: True if the motor is in a fault state, False otherwise.
"""
output = self._find_output(id)
output = self._boards.find_output(self._identifier, id)
response = output.port.query('*STATUS?')
return MotorStatus.from_status_response(response).output_faults[output.idx]

def _find_output(self, id: int) -> BoardIdentifier:
@staticmethod
def _cleanup(port: SerialWrapper) -> None:
try:
return self._outputs[id]
except IndexError:
raise ValueError(f"Output {id} does not exist")

def _get_boards(self) -> list[SerialWrapper]:
unique_boards = []
for output in self._outputs:
if output.port not in unique_boards:
unique_boards.append(output.port)
return unique_boards
port.write('*RESET')
except Exception:
logger.warning(f"Failed to cleanup motor board {port.identity.asset_tag}.")

def __repr__(self) -> str:
board_ports = self._get_boards()
board_ports = ", ".join(self._boards.get_boards(self._identifier).keys())
return f"<{self.__class__.__qualname__} {board_ports}>"
Loading

0 comments on commit 0794985

Please sign in to comment.