Skip to content

Commit

Permalink
Initial parts of new API
Browse files Browse the repository at this point in the history
Adds API surface for most bits, still needs discovery
  • Loading branch information
WillB97 committed Jan 1, 2025
1 parent 1f39ac1 commit a3efc88
Show file tree
Hide file tree
Showing 8 changed files with 981 additions and 0 deletions.
3 changes: 3 additions & 0 deletions sbot/future/classless/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .robot import arduino, comp, motor, power, servo, utils

__all__ = ['arduino', 'comp', 'motor', 'power', 'servo', 'utils']
176 changes: 176 additions & 0 deletions sbot/future/classless/arduinos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""The Arduino module provides an interface to the Arduino firmware."""
from enum import Enum, IntEnum

from sbot.logging import log_to_debug
from sbot.serial_wrapper import SerialWrapper
from sbot.utils import map_to_float

from .utils import BoardManager


class GPIOPinMode(str, Enum):
"""The possible modes for a GPIO pin."""

INPUT = 'INPUT'
INPUT_PULLUP = 'INPUT_PULLUP'
OUTPUT = 'OUTPUT'


class AnalogPin(IntEnum):
"""The analog pins on the Arduino."""

A0 = 14
A1 = 15
A2 = 16
A3 = 17
A4 = 18
A5 = 19


DISABLED_PINS = (0, 1)
AVAILABLE_PINS = range(0, max(AnalogPin) + 1)

ADC_MAX = 1023 # 10 bit ADC
ADC_MIN = 0


class Arduino:
"""
The Arduino board interface.
This is intended to be used with Arduino Uno boards running the sbot firmware.
:param boards: The BoardManager object containing the arduino board references.
"""

__slots__ = ('_boards',)

def __init__(self, boards: BoardManager):
# Obtain a reference to the arduino
# This is contained in a list to allow for it to be populated later
self._boards = boards.arduino

@log_to_debug
def set_pin_mode(self, pin: int, mode: GPIOPinMode) -> None:
"""
Set the mode of the pin.
To do analog or digital reads set the mode to INPUT or INPUT_PULLUP.
To do digital writes set the mode to OUTPUT.
:param pin: The pin to set the mode of.
:param value: The mode to set the pin to.
:raises IOError: If the pin mode is not a GPIOPinMode.
:raises IOError: If this pin cannot be controlled.
"""
port = self._get_port()
self._validate_pin(pin)
if not isinstance(mode, GPIOPinMode):
raise IOError('Pin mode only supports being set to a GPIOPinMode')
port.write(f'PIN:{pin}:MODE:SET:{mode.value}')

@log_to_debug
def digital_read(self, pin: int) -> bool:
"""
Perform a digital read on the pin.
:param pin: The pin to read from.
:raises IOError: If the pin's current mode does not support digital read
:raises IOError: If this pin cannot be controlled.
:return: The digital value of the pin.
"""
port = self._get_port()
self._validate_pin(pin)
response = port.query(f'PIN:{pin}:DIGITAL:GET?')
return (response == '1')

@log_to_debug
def digital_write(self, pin: int, value: bool) -> None:
"""
Write a digital value to the pin.
:param pin: The pin to write to.
:param value: The value to write to the pin.
:raises IOError: If the pin's current mode does not support digital write.
:raises IOError: If this pin cannot be controlled.
"""
port = self._get_port()
self._validate_pin(pin)
try:
if value:
port.write(f'PIN:{pin}:DIGITAL:SET:1')
else:
port.write(f'PIN:{pin}:DIGITAL:SET:0')
except RuntimeError as e:
# The firmware returns a NACK if the pin is not in OUTPUT mode
if 'is not supported in' in str(e):
raise IOError(str(e))

@log_to_debug
def analog_read(self, pin: int) -> float:
"""
Get the analog voltage on the pin.
This is returned in volts. Only pins A0-A5 support analog reads.
:param pin: The pin to read from.
:raises IOError: If the pin or its current mode does not support analog read.
:raises IOError: If this pin cannot be controlled.
:return: The analog voltage on the pin, ranges from 0 to 5.
"""
port = self._get_port()
self._validate_pin(pin)
if pin not in AnalogPin:
raise IOError('Pin does not support analog read')
try:
response = port.query(f'PIN:{pin}:ANALOG:GET?')
except RuntimeError as e:
# The firmware returns a NACK if the pin is not in INPUT mode
if 'is not supported in' in str(e):
raise IOError(str(e))
# map the response from the ADC range to the voltage range
return map_to_float(int(response), ADC_MIN, ADC_MAX, 0.0, 5.0)

@log_to_debug
def measure_ultrasound_distance(self, pulse_pin: int, echo_pin: int) -> int:
"""
Measure the distance to an object using an ultrasound sensor.
The sensor can only measure distances up to 4000mm.
:param pulse_pin: The pin to send the ultrasound pulse from.
:param echo_pin: The pin to read the ultrasound echo from.
:raises ValueError: If either of the pins are invalid
:return: The distance measured by the ultrasound sensor in mm.
"""
port = self._get_port()
try: # bounds check
self._validate_pin(pulse_pin)
except (IndexError, IOError):
raise ValueError("Invalid pulse pin provided") from None
try:
self._validate_pin(echo_pin)
except (IndexError, IOError):
raise ValueError("Invalid echo pin provided") from None

response = port.query(f'ULTRASOUND:{pulse_pin}:{echo_pin}:MEASURE?')
return int(response)

def _validate_pin(self, pin: int) -> None:
if pin in DISABLED_PINS:
raise IOError('This pin cannot be controlled.')
if pin not in AVAILABLE_PINS:
raise IndexError(f'Pin {pin} is not available on the Arduino.')

def _get_port(self) -> SerialWrapper:
if len(self._boards) == 0:
raise RuntimeError("No Arduino connected")
return self._boards[0]

def __repr__(self) -> str:
try:
port = self._get_port()
except RuntimeError:
return f"<{self.__class__.__qualname__} no arduino connected>"
else:
return f"<{self.__class__.__qualname__} {port}>"
149 changes: 149 additions & 0 deletions sbot/future/classless/comp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
Implementation of loading metadata.
Metadata is a dictionary of information about the environment that the robot is running in.
It usually includes the starting zone and a flag indicating whether we are in
competition or development mode. Metadata is stored in a JSON file, typically on a
competition USB stick. The environment variable SBOT_METADATA_PATH specifies a directory
where it, and its children, are searched for the JSON file to load.
Example metadata file:
```json
{
"zone": 2,
"is_competition": true
}
```
"""
from __future__ import annotations

import json
import logging
import os
from pathlib import Path
from typing import TypedDict

from sbot.exceptions import MetadataKeyError, MetadataNotReadyError

logger = logging.getLogger(__name__)

# The name of the environment variable that specifies the path to search
# for metadata USB sticks
METADATA_ENV_VAR = "SBOT_METADATA_PATH"
# The name of the metadata file
METADATA_NAME = "metadata.json"


class Metadata(TypedDict):
"""
The structure of the metadata dictionary.
:param is_competition: Whether the robot is in competition mode
:param zone: The zone that the robot is in
"""

is_competition: bool
zone: int


# The default metadata to use if no file is found
DEFAULT_METADATA: Metadata = {
"is_competition": False,
"zone": 0,
}


class Comp:
"""
A collection of the robot metadata.
This class is used to load and access the metadata of the robot.
"""

def __init__(self) -> None:
self._metadata: Metadata | None = None

@property
def is_competition(self) -> bool:
"""
Whether the robot is in a competition environment.
This value is not available until wait_start has been called.
:raises MetadataNotReadyError: If the metadata has not been loaded
"""
if self._metadata is None:
raise MetadataNotReadyError()
return self._metadata["is_competition"]

@property
def zone(self) -> int:
"""
The zone that the robot is in.
This value is not available until wait_start has been called.
:raises MetadataNotReadyError: If the metadata has not been loaded
"""
if self._metadata is None:
raise MetadataNotReadyError()
return self._metadata["zone"]

def _load(self) -> None:
"""
Search for a metadata file and load it.
Searches the path identified by SBOT_METADATA_PATH and its children for
metadata.json (set by METADATA_NAME) and reads it.
"""
search_path = os.environ.get(METADATA_ENV_VAR)
if search_path:
search_root = Path(search_path)
if not search_root.is_dir():
logger.error(f"Metaddata path {search_path} does not exist")
return
for item in Path(search_path).iterdir():
try:
if item.is_dir() and (item / METADATA_NAME).exists():
self._metadata = _load_metadata(item / METADATA_NAME)
elif item.name == METADATA_NAME:
self._metadata = _load_metadata(item)
return
except PermissionError:
logger.debug(f"Unable to read {item}")
else:
logger.info(f"No JSON metadata files found in {search_path}")
else:
logger.info(f"{METADATA_ENV_VAR} not set, not loading metadata")

self._metadata = DEFAULT_METADATA


def _load_metadata(path: Path) -> Metadata:
"""
Load the metadata from a JSON file, found by `load`.
The file must be a JSON dictionary with the keys `is_competition` and `zone`.
:param path: The path to the metadata file
:raises RuntimeError: If the metadata file is invalid JSON
:raises TypeError: If the metadata file is not a JSON dictionary
:raises MetadataKeyError: If the metadata file is missing a required key
:return: The metadata dictionary
"""
logger.info(f"Loading metadata from {path}")
with path.open() as file:
try:
obj: Metadata = json.load(file)
except json.decoder.JSONDecodeError as e:
raise RuntimeError("Unable to load metadata.") from e

if not isinstance(obj, dict):
raise TypeError(f"Found metadata file, but format is invalid. Got: {obj}")

# check required keys exist at runtime
for key in Metadata.__annotations__.keys():
if key not in obj.keys():
raise MetadataKeyError(key)

return obj
Loading

0 comments on commit a3efc88

Please sign in to comment.