diff --git a/aioax25/peer.py b/aioax25/peer.py index b155982..a084e7d 100644 --- a/aioax25/peer.py +++ b/aioax25/peer.py @@ -928,10 +928,15 @@ def _on_negotiate_result(self, response, **kwargs): self._process_xid_winszrx(AX25_20_DEFAULT_XID_WINDOWSZRX) self._process_xid_acktimer(AX25_20_DEFAULT_XID_ACKTIMER) self._process_xid_retrycounter(AX25_20_DEFAULT_XID_RETRIES) + + # Downgrade 2.2 to 2.0, do not unwittingly "upgrade" 1.0 to 2.0! if self._protocol in (AX25Version.UNKNOWN, AX25Version.AX25_22): self._log.info("Downgrading to AX.25 2.0 due to failed XID") self._protocol = AX25Version.AX25_20 - self._modulo128 = False + + # AX.25 2.2 is required for Mod128, so if we get FRMR here, + # disable this unconditionally. + self._modulo128 = False elif self._protocol != AX25Version.AX25_22: # Clearly this station understands AX.25 2.2 self._log.info("Upgrading to AX.25 2.2 due to successful XID") diff --git a/tests/test_peer/test_connection.py b/tests/test_peer/test_connection.py index 2dd9425..cde9769 100644 --- a/tests/test_peer/test_connection.py +++ b/tests/test_peer/test_connection.py @@ -31,6 +31,9 @@ from aioax25.peer import AX25PeerState from .peer import TestingAX25Peer from ..mocks import DummyStation, DummyTimeout +from functools import partial + +from pytest import mark # Connection establishment @@ -98,6 +101,100 @@ def _negotiate(*args, **kwargs): pass +def test_on_incoming_connect_timeout_incoming(): + """ + Test if the application does not accept within the time-out, we reject the + connection. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + ) + + count = dict(reject=0) + + def _reject(): + count["reject"] += 1 + + peer.reject = _reject + + peer._state = AX25PeerState.INCOMING_CONNECTION + peer._ack_timeout_handle = DummyTimeout(None, None) + + peer._on_incoming_connect_timeout() + + assert peer._ack_timeout_handle is None + assert count == dict(reject=1) + + +def test_on_incoming_connect_timeout_otherstate(): + """ + Test if the incoming connection time-out fires whilst in another state, it + is ignored + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + ) + + count = dict(reject=0) + + def _reject(): + count["reject"] += 1 + + peer.reject = _reject + + peer._state = AX25PeerState.CONNECTED + peer._ack_timeout_handle = DummyTimeout(None, None) + + peer._on_incoming_connect_timeout() + + assert peer._ack_timeout_handle is not None + assert count == dict(reject=0) + + +def test_on_connect_response_ack(): + """ + Test if _on_connect_response receives an ACK, we enter the connected + state. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + ) + + peer._state = AX25PeerState.CONNECTING + + peer._on_connect_response(response="ack") + + assert peer._state == AX25PeerState.CONNECTED + + +def test_on_connect_response_other(): + """ + Test if _on_connect_response receives another response, we enter the + disconnected state. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + ) + + peer._state = AX25PeerState.CONNECTING + + peer._on_connect_response(response="nope") + + assert peer._state == AX25PeerState.DISCONNECTED + + # SABM(E) transmission @@ -3123,3 +3220,245 @@ def test_stop_ack_timer_notexisting(): peer._ack_timeout_handle = None peer._stop_ack_timer() + + +# AX.25 2.2 XID negotiation + + +@mark.parametrize("version", [AX25Version.AX25_10, AX25Version.AX25_20]) +def test_negotiate_notsupported(version): + """ + Test the peer refuses to perform XID if the protocol does not support it. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + peer._state = AX25PeerState.CONNECTING + peer._protocol = version + + try: + peer._negotiate(lambda **kwa: None) + assert False, "Should not have worked" + except RuntimeError as e: + assert str(e) == "%s does not support negotiation" % (version.value) + + +@mark.parametrize("version", [AX25Version.AX25_22, AX25Version.UNKNOWN]) +def test_negotiate_supported(version): + """ + Test the peer refuses to perform XID if the protocol does not support it. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub XID transmission + count = dict(send_xid=0) + + def _send_xid(cr): + count["send_xid"] += 1 + + peer._send_xid = _send_xid + + peer._state = AX25PeerState.CONNECTING + peer._protocol = version + + peer._negotiate(lambda **kwa: None) + + # Check we actually did request a XID transmission + assert count == dict(send_xid=1) + + # Trigger the DM callback to abort time-outs + assert peer._dmframe_handler is not None + peer._dmframe_handler() + + +@mark.parametrize( + "version, response", + [ + (axver, res) + for axver in (AX25Version.AX25_22, AX25Version.UNKNOWN) + for res in ("frmr", "dm") + ], +) +def test_on_negotiate_result_unsupported(version, response): + """ + Test we handle a response that indicates an AX.25 2.0 or earlier station. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub XID functions + xid_params = set() + + def _set_xid_param(param, value): + xid_params.add(param) + + for param in ( + "cop", + "hdlcoptfunc", + "ifieldlenrx", + "winszrx", + "acktimer", + "retrycounter", + ): + setattr( + peer, "_process_xid_%s" % param, partial(_set_xid_param, param) + ) + + assert peer._negotiated == False + peer._modulo128 = True + peer._protocol = version + + peer._on_negotiate_result(response=response) + + # Should downgrade to AX.25 2.0 + assert peer._negotiated is True + assert peer._modulo128 is False + assert peer._protocol is AX25Version.AX25_20 + + # Should have inserted defaults for AX.25 2.0 + assert xid_params == set( + [ + "acktimer", + "cop", + "hdlcoptfunc", + "ifieldlenrx", + "retrycounter", + "winszrx", + ] + ) + + +@mark.parametrize( + "version, response", + [ + (axver, res) + for axver in (AX25Version.AX25_20, AX25Version.AX25_10) + for res in ("frmr", "dm") + ], +) +def test_on_negotiate_result_unsupported_old(version, response): + """ + Test we do not accidentally "upgrade" on FRMR/DM in response to XID. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub XID functions + xid_params = set() + + def _set_xid_param(param, value): + xid_params.add(param) + + for param in ( + "cop", + "hdlcoptfunc", + "ifieldlenrx", + "winszrx", + "acktimer", + "retrycounter", + ): + setattr( + peer, "_process_xid_%s" % param, partial(_set_xid_param, param) + ) + + assert peer._negotiated == False + peer._modulo128 = True + peer._protocol = version + + peer._on_negotiate_result(response=response) + + # Should leave this as is! + assert peer._protocol is version + + # Should disable AX.25 2.2 features + assert peer._negotiated is True + assert peer._modulo128 is False + + # Should have inserted defaults for AX.25 2.0 + assert xid_params == set( + [ + "acktimer", + "cop", + "hdlcoptfunc", + "ifieldlenrx", + "retrycounter", + "winszrx", + ] + ) + + +@mark.parametrize( + "version", + [ + AX25Version.UNKNOWN, + AX25Version.AX25_22, + AX25Version.AX25_20, + AX25Version.AX25_10, + ], +) +def test_on_negotiate_result_success(version): + """ + Test we upgrade to AX.25 2.2 if XID successful. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub XID functions + xid_params = set() + + def _set_xid_param(param, value): + xid_params.add(param) + + for param in ( + "cop", + "hdlcoptfunc", + "ifieldlenrx", + "winszrx", + "acktimer", + "retrycounter", + ): + setattr( + peer, "_process_xid_%s" % param, partial(_set_xid_param, param) + ) + + assert peer._negotiated == False + peer._modulo128 = True + peer._protocol = version + + peer._on_negotiate_result(response="success") + + # Should bump to AX.25 2.2 + assert peer._protocol is AX25Version.AX25_22 + + # Should leave AX.25 2.2 features enabled + assert peer._negotiated is True + assert peer._modulo128 is True + + # Should not override XID parameters set by handler + assert xid_params == set([])