From 76f1aaea0fe21122b52b5cb2b12e1c6e8cf0c380 Mon Sep 17 00:00:00 2001 From: Stuart Longland Date: Wed, 8 May 2024 14:26:26 +1000 Subject: [PATCH] peer unit tests: Test ACK timer handling --- tests/test_peer/test_connection.py | 115 +++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/tests/test_peer/test_connection.py b/tests/test_peer/test_connection.py index 6713209..2dd9425 100644 --- a/tests/test_peer/test_connection.py +++ b/tests/test_peer/test_connection.py @@ -3008,3 +3008,118 @@ def _start_disconnect_ack_timer(): assert peer._state == AX25PeerState.DISCONNECTING assert actions == ["sent-disc", "start-ack-timer"] assert peer._uaframe_handler == peer._on_disconnect + + +# ACK timer handling + + +def test_start_connect_ack_timer(): + """ + Test _start_connect_ack_timer schedules _on_incoming_connect_timeout + to fire after _ack_timeout. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + count = dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=0) + + def _on_incoming_connect_timeout(): + count["on_incoming_connect_timeout"] += 1 + + peer._on_incoming_connect_timeout = _on_incoming_connect_timeout + + def _on_disc_ua_timeout(): + count["on_disc_ua_timeout"] += 1 + + peer._on_disc_ua_timeout = _on_disc_ua_timeout + + assert peer._ack_timeout_handle is None + + peer._start_connect_ack_timer() + + assert peer._ack_timeout_handle is not None + assert peer._ack_timeout_handle.delay == peer._ack_timeout + + assert count == dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=0) + peer._ack_timeout_handle.callback() + assert count == dict(on_incoming_connect_timeout=1, on_disc_ua_timeout=0) + + +def test_start_disconnect_ack_timer(): + """ + Test _start_disconnect_ack_timer schedules _on_disc_ua_timeout + to fire after _ack_timeout. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + count = dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=0) + + def _on_incoming_connect_timeout(): + count["on_incoming_connect_timeout"] += 1 + + peer._on_incoming_connect_timeout = _on_incoming_connect_timeout + + def _on_disc_ua_timeout(): + count["on_disc_ua_timeout"] += 1 + + peer._on_disc_ua_timeout = _on_disc_ua_timeout + + assert peer._ack_timeout_handle is None + + peer._start_disconnect_ack_timer() + + assert peer._ack_timeout_handle is not None + assert peer._ack_timeout_handle.delay == peer._ack_timeout + + assert count == dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=0) + peer._ack_timeout_handle.callback() + assert count == dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=1) + + +def test_stop_ack_timer_existing(): + """ + Test _stop_ack_timer cancels the existing time-out. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + timeout = DummyTimeout(None, None) + peer._ack_timeout_handle = timeout + + peer._stop_ack_timer() + + assert peer._ack_timeout_handle is None + assert timeout.cancelled is True + + +def test_stop_ack_timer_notexisting(): + """ + Test _stop_ack_timer does nothing if no time-out pending. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + peer._ack_timeout_handle = None + + peer._stop_ack_timer()