Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up communications #352

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions sbot/arduino.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,19 @@ def analog_value(self) -> float:
ADC_MIN = 0

self._check_if_disabled()
if self.mode not in ANALOG_READ_MODES:
raise IOError(f'Analog read is not supported in {self.mode}')
if not self._supports_analog:
raise IOError('Pin does not support analog read')
response = self._serial.query(f'PIN:{self._index}:ANALOG:GET?')
raise IOError(f'Analog read is not supported on pin {self._index}')

# Combine the mode and response queries into a single pipeline
mode, response = self._serial.query_multi([
f'PIN:{self._index}:MODE:GET?',
f'PIN:{self._index}:ANALOG:GET?',
])
mode = GPIOPinMode(mode)

if mode not in ANALOG_READ_MODES:
raise IOError(f'Analog read is not supported in {self.mode}')

# map the response from the ADC range to the voltage range
return map_to_float(int(response), ADC_MIN, ADC_MAX, 0.0, 5.0)

Expand Down
166 changes: 129 additions & 37 deletions sbot/serial_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
from __future__ import annotations

import itertools
import logging
import sys
import threading
Expand Down Expand Up @@ -122,47 +123,102 @@ def stop(self) -> None:
"""
self._disconnect()

def _connect_if_needed(self) -> None:
if not self.serial.is_open:
if not self._connect():
# If the serial port cannot be opened raise an error,
# this will be caught by the retry decorator
raise BoardDisconnectionError((
f'Connection to board {self.identity.board_type}:'
f'{self.identity.asset_tag} could not be established',
))

@retry(times=3, exceptions=(BoardDisconnectionError, UnicodeDecodeError))
def query(self, data: str) -> str:
def query_multi(self, commands: list[str]) -> list[str]:
"""
Send a command to the board and return the response.

This method will automatically reconnect to the board and retry the command
This method will automatically reconnect to the board and retry the commands
up to 3 times on serial errors.

:param data: The data to write to the board.
:param commands: The commands to write to the board.
:raises BoardDisconnectionError: If the serial connection fails during the transaction,
including failing to respond to the command.
:return: The response from the board with the trailing newline removed.
:return: The responses from the board with the trailing newlines removed.
"""
# Verify no command has a newline in it, and build a command `bytes` from the
# list of commands
encoded_commands: list[bytes] = []
invalid_commands: list[tuple[str, str]] = []

for command in commands:
if '\n' in command:
invalid_commands.append(("contains newline", command))
else:
try:
byte_form = command.encode(encoding='utf-8')
except UnicodeEncodeError as e:
invalid_commands.append((str(e), command))
else:
encoded_commands.append(byte_form)
encoded_commands.append(b'\n')

if invalid_commands:
invalid_commands.sort()

invalid_command_groups = dict(itertools.groupby(
invalid_commands,
key=lambda x: x[0],
))

error_message = "\n".join(
["Invalid commands:"] +
[
f" {reason}: " + ", ".join(
repr(command)
for _, command in grouped_commands
)
for reason, grouped_commands in invalid_command_groups.items()
],
)
raise ValueError(error_message)

full_commands = b''.join(encoded_commands)

with self._lock:
if not self.serial.is_open:
if not self._connect():
# If the serial port cannot be opened raise an error,
# this will be caught by the retry decorator
raise BoardDisconnectionError((
f'Connection to board {self.identity.board_type}:'
f'{self.identity.asset_tag} could not be established',
))
# If the serial port is not open, try to connect
self._connect_if_needed() # TODO: Write me

# Contain all the serial IO in a try-catch; on error, disconnect and raise an error
try:
logger.log(TRACE, f'Serial write - {data!r}')
cmd = data + '\n'
self.serial.write(cmd.encode())

response = self.serial.readline()
try:
response_str = response.decode().rstrip('\n')
except UnicodeDecodeError as e:
logger.warning(
f"Board {self.identity.board_type}:{self.identity.asset_tag} "
f"returned invalid characters: {response!r}")
raise e
logger.log(
TRACE, f'Serial read - {response_str!r}')

if b'\n' not in response:
# If readline times out no error is raised, it returns an incomplete string
# Send the commands to the board
self.serial.write(full_commands)

# Log the commands
for command in commands:
logger.log(TRACE, f"Serial write - {command!r}")

# Read as many lines as there are commands
responses_binary = [
self.serial.readline()
for _ in range(len(commands))
]

# Log the responses. For backwards compatibility reasons, we decode
# these separately here before any error processing, so that the
# logs are correct even if an error occurs.
for response_binary in responses_binary:
response_decoded = response_binary.decode(
"utf-8",
errors="replace",
).rstrip('\n')
logger.log(TRACE, f"Serial read - {response_decoded!r}")

# Check all responses have a trailing newline (an incomplete
# response will not).
# This is within the lock and try-catch to ensure the serial port
# is closed on error.
if not all(response.endswith(b'\n') for response in responses_binary):
logger.warning((
f'Connection to board {self.identity.board_type}:'
f'{self.identity.asset_tag} timed out waiting for response'
Expand All @@ -176,15 +232,51 @@ def query(self, data: str) -> str:
'disconnected during transaction'
))

if response_str.startswith('NACK'):
_, error_msg = response_str.split(':', maxsplit=1)
logger.error((
f'Board {self.identity.board_type}:{self.identity.asset_tag} '
f'returned NACK on write command: {error_msg}'
))
raise RuntimeError(error_msg)
# Decode all the responses as UTF-8
try:
responses_decoded = [
response.decode("utf-8").rstrip('\n')
for response in responses_binary
]
except UnicodeDecodeError as e:
logger.warning(
f"Board {self.identity.board_type}:{self.identity.asset_tag} "
f"returned invalid characters: {responses_binary!r}")
raise e

# Collect any NACK responses; if any, raise an error
nack_prefix = 'NACK:'
nack_responses = [
response
for response in responses_decoded
if response.startswith(nack_prefix)
]

if nack_responses:
errors = [response[len(nack_prefix):] for response in nack_responses]
# We can't use exception groups due to needing to support Python 3.8
raise (
RuntimeError(errors[0])
if len(errors) == 1
else RuntimeError("Multiple errors: " + ", ".join(errors))
)

# Return the list of responses
return responses_decoded

def query(self, data: str) -> str:
"""
Send a command to the board and return the response.

return response_str
This method will automatically reconnect to the board and retry the command
up to 3 times on serial errors.

:param data: The data to write to the board.
:raises BoardDisconnectionError: If the serial connection fails during the transaction,
including failing to respond to the command.
:return: The response from the board with the trailing newline removed.
"""
return self.query_multi([data])[0]

def write(self, data: str) -> None:
"""
Expand Down
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,19 @@ def query(self, request: str) -> str:
"""
# Assert that we have not run out of responses
# and that the request is the next one we expect
assert self.request_index < len(self.responses)
if self.request_index >= len(self.responses):
raise AssertionError(f"Unexpected request: {request}")
assert request == self.responses[self.request_index][0]

# Fetch the response and increment the request index
response = self.responses[self.request_index][1]
self.request_index += 1
return response

def query_multi(self, commands: list[str]) -> list[str]:
"""Send multiple commands and return the responses."""
return [self.query(command) for command in commands]

def write(self, request: str) -> None:
"""Send a command without waiting for a response."""
_ = self.query(request)
Expand Down
Loading
Loading