diff --git a/README.md b/README.md index ff0eb52..866026b 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,21 @@ their `received` signals and accept raw AX.25 frames via the `send` method. Any object passed to `send` is wrapped in a `bytes` call -- this will implicitly call the `__bytes__` method on the object you pass in. +#### Exception handling on the KISS device + +There are a couple of exception cases that are emitted via a signal, so that +any consumer of the KISS device can react to issues, such as the port failing +to open, transmission failures, or failures to shut down. + +```python +def _on_fail(action, exc_info, **kwargs): + # Put your error handling code here + # action is a string: "open", "send" or "close" + # exc_info is the output of sys.exc_info() at the time of the error + pass +kissdev.failed.connect(_on_fail) +``` + ### Setting up an AX.25 Interface The AX.25 interface is a logical routing and queueing layer which decodes the diff --git a/aioax25/kiss.py b/aioax25/kiss.py index f943976..b48f345 100644 --- a/aioax25/kiss.py +++ b/aioax25/kiss.py @@ -13,6 +13,7 @@ from binascii import b2a_hex import time import logging +from sys import exc_info # Constants @@ -45,12 +46,15 @@ class KISSDeviceState(Enum): - OPEN: Serial port is open, TNC in KISS mode. - CLOSING: Close instruction just received. Putting TNC back into TNC2-mode if requested then closing the port. + - FAILED: A critical error has occurred and the port is now no longer + functional. """ CLOSED = 0 OPENING = 1 OPEN = 2 CLOSING = 3 + FAILED = -1 # Command classes @@ -257,6 +261,13 @@ def __init__( self._send_block_size = send_block_size self._send_block_delay = send_block_delay + # Signal fired when the device enters the FAILED state + # Keyword arguments: + # - action: the action being performed at the time of failure + # ('open', 'send', 'close') + # - exc_info: the exception trace information for debugging + self.failed = Signal() + def _receive(self, data): """ Handle incoming data by appending to our receive buffer. The @@ -381,13 +392,13 @@ def _send_data(self): if self._log.isEnabledFor(logging.DEBUG): self._log.debug("XMIT RAW %r", b2a_hex(data).decode()) - self._send_raw_data(data) + self._try_send_raw_data(data) # If we are closing, wait for this to be sent if (self._state == KISSDeviceState.CLOSING) and ( len(self._tx_buffer) == 0 ): - self._close() + self._try_close() return if self._tx_buffer: @@ -413,9 +424,9 @@ def _send_kiss_cmd(self): command = command.encode("US-ASCII") self._rx_buffer = bytearray() for bv in command: - self._send_raw_data(bytes([bv])) + self._try_send_raw_data(bytes([bv])) time.sleep(0.1) - self._send_raw_data(b"\r") + self._try_send_raw_data(b"\r") self._loop.call_later(0.5, self._check_open) def _check_open(self): @@ -424,6 +435,35 @@ def _check_open(self): """ self._loop.call_soon(self._send_kiss_cmd) + def _try_open(self): + try: + self._open() + except: + self._on_fail("open", exc_info()) + raise + + def _try_send_raw_data(self, data): + try: + self._send_raw_data(data) + except: + self._on_fail("send", exc_info()) + raise + + def _try_close(self): + try: + self._close() + except: + self._on_fail("close", exc_info()) + raise + + def _on_fail(self, action, exc_info): + (ex_t, ex_v, _) = exc_info + self._log.warning( + "KISS device has failed: %s: %s", ex_t.__name__, ex_v + ) + self._state = KISSDeviceState.FAILED + self.failed.emit(action=action, exc_info=exc_info) + def __getitem__(self, port): """ Retrieve an instance of a specified port. @@ -446,7 +486,7 @@ def open(self): assert self.state == KISSDeviceState.CLOSED, "Device is not closed" self._log.debug("Opening device") self._state = KISSDeviceState.OPENING - self._open() + self._try_open() def close(self): assert self.state == KISSDeviceState.OPEN, "Device is not open" @@ -455,7 +495,12 @@ def close(self): if self._reset_on_close: self._send(KISSCmdReturn()) else: - self._close() + self._try_close() + + def reset(self): + assert self.state == KISSDeviceState.FAILED, "Device has not failed" + self._log.warning("Resetting device") + self._state = KISSDeviceState.CLOSED class BaseTransportDevice(BaseKISSDevice): @@ -510,6 +555,19 @@ def _on_close(self, exc=None): def _send_raw_data(self, data): self._transport.write(data) + def reset(self): + super(BaseTransportDevice, self).reset() + + try: + if self._transport: + self._transport.close() + except: + self._log.warning( + "Failed to close transport, ignoring!", exc_info=1 + ) + + self._transport = None + class SerialKISSDevice(BaseTransportDevice): """ @@ -536,22 +594,28 @@ def __init__(self, device, baudrate, *args, **kwargs): self._baudrate = baudrate async def _open_connection(self): - self._log.debug("Delegating to KISS serial device %r", self._device) - await create_serial_connection( - self._loop, - self._make_protocol, - self._device, - baudrate=self._baudrate, - bytesize=EIGHTBITS, - parity=PARITY_NONE, - stopbits=STOPBITS_ONE, - timeout=None, - xonxoff=False, - rtscts=False, - write_timeout=None, - dsrdtr=False, - inter_byte_timeout=None, - ) + try: + self._log.debug( + "Delegating to KISS serial device %r", self._device + ) + await create_serial_connection( + self._loop, + self._make_protocol, + self._device, + baudrate=self._baudrate, + bytesize=EIGHTBITS, + parity=PARITY_NONE, + stopbits=STOPBITS_ONE, + timeout=None, + xonxoff=False, + rtscts=False, + write_timeout=None, + dsrdtr=False, + inter_byte_timeout=None, + ) + except: + self._log.warning("Failed to open serial connection", exc_info=1) + self._on_fail("open", exc_info()) class TCPKISSDevice(BaseTransportDevice): @@ -621,9 +685,13 @@ def __init__( ) async def _open_connection(self): - await self._loop.create_connection( - self._make_protocol, **self._conn_args - ) + try: + await self._loop.create_connection( + self._make_protocol, **self._conn_args + ) + except: + self._log.warning("Failed to open TCP connection", exc_info=1) + self._on_fail("open", exc_info()) class SubprocKISSDevice(BaseTransportDevice): @@ -661,14 +729,18 @@ def _make_protocol(self): ) async def _open_connection(self): - if self._shell: - await self._loop.subprocess_shell( - self._make_protocol, " ".join(self._command) - ) - else: - await self._loop.subprocess_exec( - self._make_protocol, *self._command - ) + try: + if self._shell: + await self._loop.subprocess_shell( + self._make_protocol, " ".join(self._command) + ) + else: + await self._loop.subprocess_exec( + self._make_protocol, *self._command + ) + except: + self._log.warning("Failed to call subprocess", exc_info=1) + self._on_fail("open", exc_info()) def _send_raw_data(self, data): self._transport.get_pipe_transport(0).write(data) @@ -692,6 +764,8 @@ def __init__(self, device, port, log): self._log = log # Signal for receiving packets + # Keyword arguments: + # - frame: the raw KISS frame as a `bytes()` object self.received = Signal() @property diff --git a/tests/test_kiss/test_base.py b/tests/test_kiss/test_base.py index 15ad7ff..6bca1e0 100644 --- a/tests/test_kiss/test_base.py +++ b/tests/test_kiss/test_base.py @@ -32,6 +32,31 @@ def _send_raw_data(self, data): self.transmitted += data +class DummyKISSDeviceError(IOError): + pass + + +class FailingKISSDevice(BaseKISSDevice): + def __init__(self, **kwargs): + super(FailingKISSDevice, self).__init__(**kwargs) + + self.transmitted = bytearray() + self.open_calls = 0 + self.close_calls = 0 + + def _open(self): + self.open_calls += 1 + raise DummyKISSDeviceError("Open fails") + + def _close(self): + self.close_calls += 1 + raise DummyKISSDeviceError("Close fails") + + def _send_raw_data(self, data): + self.transmitted += data + raise DummyKISSDeviceError("Send fails") + + def test_constructor_own_loop(): """ Test constructor uses its own IOLoop if not given one @@ -47,11 +72,53 @@ def test_open(): loop = DummyLoop() kissdev = DummyKISSDevice(loop=loop) + failures = [] + + def _on_fail(**kwargs): + failures.append(kwargs) + + kissdev.failed.connect(_on_fail) + assert kissdev.open_calls == 0 kissdev.open() assert kissdev.open_calls == 1 + assert failures == [] + + +def test_open_fail(): + """ + Test an open call that fails triggers the failed signal + """ + loop = DummyLoop() + kissdev = FailingKISSDevice(loop=loop) + + failures = [] + + def _on_fail(**kwargs): + failures.append(kwargs) + + kissdev.failed.connect(_on_fail) + + assert kissdev.open_calls == 0 + try: + kissdev.open() + open_ex = None + except DummyKISSDeviceError as e: + assert str(e) == "Open fails" + open_ex = e + + assert kissdev.open_calls == 1 + assert kissdev.state == KISSDeviceState.FAILED + assert len(failures) == 1 + failure = failures.pop(0) + + assert failure.pop("action") == "open" + (ex_c, ex_v, _) = failure.pop("exc_info") + assert ex_c is DummyKISSDeviceError + assert ex_v is open_ex + def test_close(): """ @@ -60,6 +127,13 @@ def test_close(): loop = DummyLoop() kissdev = DummyKISSDevice(loop=loop, reset_on_close=False) + failures = [] + + def _on_fail(**kwargs): + failures.append(kwargs) + + kissdev.failed.connect(_on_fail) + # Force the port open kissdev._state = KISSDeviceState.OPEN @@ -69,6 +143,46 @@ def test_close(): kissdev.close() assert kissdev.close_calls == 1 + assert failures == [] + + +def test_close_fail(): + """ + Test a close call that fails triggers the failed signal + """ + loop = DummyLoop() + kissdev = FailingKISSDevice(loop=loop, reset_on_close=False) + + failures = [] + + def _on_fail(**kwargs): + failures.append(kwargs) + + kissdev.failed.connect(_on_fail) + + # Force the port open + kissdev._state = KISSDeviceState.OPEN + + assert kissdev.close_calls == 0 + + # Now try closing the port + try: + kissdev.close() + close_ex = None + except DummyKISSDeviceError as e: + assert str(e) == "Close fails" + close_ex = e + + assert kissdev.close_calls == 1 + assert kissdev.state == KISSDeviceState.FAILED + assert len(failures) == 1 + failure = failures.pop(0) + + assert failure.pop("action") == "close" + (ex_c, ex_v, _) = failure.pop("exc_info") + assert ex_c is DummyKISSDeviceError + assert ex_v is close_ex + def test_close_reset(): """ @@ -94,6 +208,22 @@ def test_close_reset(): assert func == kissdev._send_data +def test_reset(): + """ + Test a reset call resets a failed device + """ + loop = DummyLoop() + kissdev = DummyKISSDevice(loop=loop, reset_on_close=False) + + # Force the port failed + kissdev._state = KISSDeviceState.FAILED + + # Reset the device + kissdev.reset() + + assert kissdev._state == KISSDeviceState.CLOSED + + def test_receive(): """ Test that a call to _receive stashes the data then schedules _receive_frame. @@ -288,6 +418,13 @@ def test_send_data(): kissdev = DummyKISSDevice(loop=loop) kissdev._tx_buffer += b"test output data" + failures = [] + + def _on_fail(**kwargs): + failures.append(kwargs) + + kissdev.failed.connect(_on_fail) + # Send the data out. kissdev._send_data() @@ -297,6 +434,49 @@ def test_send_data(): # That should be the lot assert len(loop.calls) == 0 + # There should be no failures + assert failures == [] + + +def test_send_data_fail(): + """ + Test that _send_data puts device in failed state if send fails. + """ + loop = DummyLoop() + kissdev = FailingKISSDevice(loop=loop) + kissdev._tx_buffer += b"test output data" + + failures = [] + + def _on_fail(**kwargs): + failures.append(kwargs) + + kissdev.failed.connect(_on_fail) + + # Send the data out. + try: + kissdev._send_data() + send_ex = None + except DummyKISSDeviceError as e: + assert str(e) == "Send fails" + send_ex = e + + # We should now see this was "sent" and now in 'transmitted' + assert bytes(kissdev.transmitted) == b"test output data" + + # That should be the lot + assert len(loop.calls) == 0 + + # We should be in the failed state + assert kissdev.state == KISSDeviceState.FAILED + assert len(failures) == 1 + failure = failures.pop(0) + + assert failure.pop("action") == "send" + (ex_c, ex_v, _) = failure.pop("exc_info") + assert ex_c is DummyKISSDeviceError + assert ex_v is send_ex + def test_send_data_block_size_exceed_reschedule(): """ diff --git a/tests/test_kiss/test_serial.py b/tests/test_kiss/test_serial.py index 952c5ee..0de3ed2 100644 --- a/tests/test_kiss/test_serial.py +++ b/tests/test_kiss/test_serial.py @@ -27,7 +27,8 @@ def __init__( dsrdtr, inter_byte_timeout, ): - assert port == "/dev/ttyS0" + + assert port in ("/dev/ttyS0", "/dev/ttyS1") assert baudrate == 9600 assert bytesize == EIGHTBITS assert parity == PARITY_NONE @@ -95,37 +96,45 @@ def write(self, *args, **kwargs): # Stub the serial port connection factory async def dummy_create_serial_connection( - loop, proto_factory, *args, **kwargs + loop, proto_factory, device, *args, **kwargs ): future = loop.create_future() create_serial_conn_log.debug( "Creating new serial connection: " - "loop=%r proto=%r args=%r kwargs=%r", + "loop=%r proto=%r device=%r args=%r kwargs=%r", loop, proto_factory, + device, args, kwargs, ) def _open(): - create_serial_conn_log.debug("Creating objects") - # Create the objects - protocol = proto_factory() - port = DummySerial(*args, **kwargs) - transport = DummyTransport(loop, port) - - # Record the created object references - connections.append( - PortConnection(port=port, protocol=protocol, transport=transport) - ) + if device == "/dev/ttyS0": + create_serial_conn_log.debug("Creating objects") + # Create the objects + protocol = proto_factory() + port = DummySerial(device, *args, **kwargs) + transport = DummyTransport(loop, port) + + # Record the created object references + connections.append( + PortConnection( + port=port, protocol=protocol, transport=transport + ) + ) - # Pass the protocol the transport object - create_serial_conn_log.debug("Passing transport to protocol") - protocol.connection_made(transport) + # Pass the protocol the transport object + create_serial_conn_log.debug("Passing transport to protocol") + protocol.connection_made(transport) - # Finish up the future - create_serial_conn_log.debug("Finishing up") - future.set_result((protocol, transport)) + # Finish up the future + create_serial_conn_log.debug("Finishing up") + future.set_result((protocol, transport)) + else: + # Abort with an error + create_serial_conn_log.debug("Failing") + future.set_exception(IOError("Open device failed")) create_serial_conn_log.debug("Scheduled in IOLoop") loop.call_soon(_open) @@ -169,6 +178,41 @@ async def test_open(): assert kissdev.init_called +@asynctest +async def test_open_fail(): + """ + Test open failures are handled. + """ + loop = get_event_loop() + kissdev = TestDevice(device="/dev/ttyS1", baudrate=9600, loop=loop) + assert kissdev._transport is None + + failures = [] + + def _on_fail(**kwargs): + failures.append(kwargs) + + kissdev.failed.connect(_on_fail) + + kissdev.open() + await sleep(0.01) + + # We should NOT have created a new port + assert len(connections) == 0 + + # Connection should be in the failed state + assert kissdev.state == kiss.KISSDeviceState.FAILED + + # Failure should have been reported + assert failures + failure = failures.pop(0) + + assert failure.pop("action") == "open" + (ex_c, ex_v, _) = failure.pop("exc_info") + assert ex_c is IOError + assert str(ex_v) == "Open device failed" + + @asynctest async def test_close(): """ @@ -299,3 +343,85 @@ async def test_send_raw_data(): kissdev._send_raw_data(b"a test frame") assert bytes(connection.port.tx_buffer) == b"a test frame" + + +def test_reset_no_transport(): + """ + Test reset handles the "no transport" case + """ + loop = get_event_loop() + kissdev = TestDevice(device="/dev/ttyS0", baudrate=9600, loop=loop) + assert kissdev._transport is None + + # Inject state + kissdev._state = kiss.KISSDeviceState.FAILED + + # Reset + kissdev.reset() + + assert kissdev.state == kiss.KISSDeviceState.CLOSED + + +def test_reset_with_transport(): + """ + Test reset closes the transport if it exists + """ + loop = get_event_loop() + kissdev = TestDevice(device="/dev/ttyS0", baudrate=9600, loop=loop) + + assert kissdev._transport is None + + class MyTransport(object): + def __init__(self): + self.closed = False + + def close(self): + assert not self.closed + self.closed = True + + # Inject transport + transport = MyTransport() + kissdev._transport = transport + + # Inject state + kissdev._state = kiss.KISSDeviceState.FAILED + + # Reset + kissdev.reset() + + assert kissdev.state == kiss.KISSDeviceState.CLOSED + assert kissdev._transport is None + assert transport.closed + + +def test_reset_with_transport_err(): + """ + Test reset swallows close errors from the transport + """ + loop = get_event_loop() + kissdev = TestDevice(device="/dev/ttyS0", baudrate=9600, loop=loop) + + assert kissdev._transport is None + + class MyTransport(object): + def __init__(self): + self.closed = False + + def close(self): + assert not self.closed + self.closed = True + raise IOError("Whoopsie!") + + # Inject transport + transport = MyTransport() + kissdev._transport = transport + + # Inject state + kissdev._state = kiss.KISSDeviceState.FAILED + + # Reset + kissdev.reset() + + assert kissdev.state == kiss.KISSDeviceState.CLOSED + assert kissdev._transport is None + assert transport.closed diff --git a/tests/test_kiss/test_subproc.py b/tests/test_kiss/test_subproc.py index 25c90a2..d586c6d 100644 --- a/tests/test_kiss/test_subproc.py +++ b/tests/test_kiss/test_subproc.py @@ -91,6 +91,65 @@ async def _subprocess_shell(proto_factory, *args): loop.subprocess_shell = orig_subprocess_shell +@asynctest +async def test_open_connection_failure(): + """ + Test subprocess failure is detected and handled. + """ + # This will receive the arguments passed to subprocess_shell + connection_args = [] + + loop = get_event_loop() + + # Stub the subprocess_shell method + orig_subprocess_shell = loop.subprocess_shell + + async def _subprocess_shell(proto_factory, *args): + # proto_factory should give us a KISSSubprocessProtocol object + protocol = proto_factory() + assert isinstance(protocol, kiss.KISSSubprocessProtocol) + + connection_args.extend(args) + raise IOError("Exec failed") + + loop.subprocess_shell = _subprocess_shell + + try: + device = kiss.SubprocKISSDevice( + command=["kisspipecmd", "arg1", "arg2"], + shell=True, + loop=loop, + log=logging.getLogger(__name__), + ) + + failures = [] + + def _on_fail(**kwargs): + failures.append(kwargs) + + device.failed.connect(_on_fail) + + await device._open_connection() + + # Expect a connection attempt to have been made + assert connection_args == ["kisspipecmd arg1 arg2"] + + # Connection should be in the failed state + assert device.state == kiss.KISSDeviceState.FAILED + + # Failure should have been reported + assert failures + failure = failures.pop(0) + + assert failure.pop("action") == "open" + (ex_c, ex_v, _) = failure.pop("exc_info") + assert ex_c is IOError + assert str(ex_v) == "Exec failed" + finally: + # Restore mock + loop.subprocess_shell = orig_subprocess_shell + + def test_send_raw_data(): """ Test data written to the device gets written to the subprocess ``stdin``. diff --git a/tests/test_kiss/test_tcp.py b/tests/test_kiss/test_tcp.py index fbd9074..507ceee 100644 --- a/tests/test_kiss/test_tcp.py +++ b/tests/test_kiss/test_tcp.py @@ -58,3 +58,69 @@ async def _create_connection(proto_factory, **kwargs): finally: # Restore mock loop.create_connection = orig_create_connection + + +@asynctest +async def test_open_connection_fail(): + # This will receive the arguments passed to create_connection + connection_args = {} + + loop = get_event_loop() + + # Stub the create_connection method + orig_create_connection = loop.create_connection + + async def _create_connection(proto_factory, **kwargs): + # proto_factory should give us a KISSProtocol object + protocol = proto_factory() + assert isinstance(protocol, kiss.KISSProtocol) + + connection_args.update(kwargs) + raise IOError("Connection failed") + + loop.create_connection = _create_connection + + try: + device = kiss.TCPKISSDevice( + host="localhost", + port=5432, + loop=loop, + log=logging.getLogger(__name__), + ) + + failures = [] + + def _on_fail(**kwargs): + failures.append(kwargs) + + device.failed.connect(_on_fail) + + await device._open_connection() + + # Expect a connection attempt to have been made + assert connection_args == dict( + host="localhost", + port=5432, + ssl=None, + family=0, + proto=0, + flags=0, + sock=None, + local_addr=None, + server_hostname=None, + ) + + # Connection should be in the failed state + assert device.state == kiss.KISSDeviceState.FAILED + + # Failure should have been reported + assert failures + failure = failures.pop(0) + + assert failure.pop("action") == "open" + (ex_c, ex_v, _) = failure.pop("exc_info") + assert ex_c is IOError + assert str(ex_v) == "Connection failed" + finally: + # Restore mock + loop.create_connection = orig_create_connection