diff --git a/tests/test_peer/test_connection.py b/tests/test_peer/test_connection.py index 16a9fa4..6713209 100644 --- a/tests/test_peer/test_connection.py +++ b/tests/test_peer/test_connection.py @@ -1858,6 +1858,568 @@ def _on_receive_sabm(frame): assert frames == [frame] +# RR Notification transmission, scheduling and cancellation + + +def test_cancel_rr_notification_notpending(): + """ + Test _cancel_rr_notification does nothing if not pending. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + assert peer._rr_notification_timeout_handle is None + + peer._cancel_rr_notification() + + assert peer._rr_notification_timeout_handle is None + + +def test_cancel_rr_notification_ispending(): + """ + Test _cancel_rr_notification cancels a pending notification. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + timeout = DummyTimeout(0, lambda: None) + peer._rr_notification_timeout_handle = timeout + + peer._cancel_rr_notification() + + assert peer._rr_notification_timeout_handle is None + assert timeout.cancelled is True + + +def test_schedule_rr_notification(): + """ + Test _schedule_rr_notification schedules a notification. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._schedule_rr_notification() + + assert peer._rr_notification_timeout_handle is not None + + +def test_send_rr_notification_connected(): + """ + Test _send_rr_notification sends a notification if connected. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._init_connection(False) + + count = dict(update_recv_seq=0) + + def _update_recv_seq(): + count["update_recv_seq"] += 1 + + peer._update_recv_seq = _update_recv_seq + + transmitted = [] + + def _transmit_frame(frame): + transmitted.append(frame) + + peer._transmit_frame = _transmit_frame + + peer._state = AX25PeerState.CONNECTED + + peer._send_rr_notification() + + assert count == dict(update_recv_seq=1) + assert len(transmitted) == 1 + assert isinstance(transmitted[0], AX258BitReceiveReadyFrame) + + +def test_send_rr_notification_disconnected(): + """ + Test _send_rr_notification sends a notification if connected. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._init_connection(False) + + count = dict(update_recv_seq=0) + + def _update_recv_seq(): + count["update_recv_seq"] += 1 + + peer._update_recv_seq = _update_recv_seq + + transmitted = [] + + def _transmit_frame(frame): + transmitted.append(frame) + + peer._transmit_frame = _transmit_frame + + peer._state = AX25PeerState.DISCONNECTED + + peer._send_rr_notification() + + assert count == dict(update_recv_seq=0) + assert len(transmitted) == 0 + + +# RNR transmission + + +def test_send_rnr_notification_connected(): + """ + Test _send_rnr_notification sends a notification if connected. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._init_connection(False) + + count = dict(update_recv_seq=0) + + def _update_recv_seq(): + count["update_recv_seq"] += 1 + + peer._update_recv_seq = _update_recv_seq + + transmitted = [] + + def _transmit_frame(frame): + transmitted.append(frame) + + peer._transmit_frame = _transmit_frame + + peer._state = AX25PeerState.CONNECTED + + peer._send_rnr_notification() + + assert count == dict(update_recv_seq=1) + assert len(transmitted) == 1 + assert isinstance(transmitted[0], AX258BitReceiveNotReadyFrame) + + +def test_send_rnr_notification_connected_recent(): + """ + Test _send_rnr_notification skips notification if the last was recent. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._init_connection(False) + + count = dict(update_recv_seq=0) + + def _update_recv_seq(): + count["update_recv_seq"] += 1 + + peer._update_recv_seq = _update_recv_seq + + transmitted = [] + + def _transmit_frame(frame): + transmitted.append(frame) + + peer._transmit_frame = _transmit_frame + + peer._state = AX25PeerState.CONNECTED + peer._last_rnr_sent = peer._loop.time() - (peer._rnr_interval / 2) + + peer._send_rnr_notification() + + assert count == dict(update_recv_seq=0) + assert len(transmitted) == 0 + + +def test_send_rnr_notification_disconnected(): + """ + Test _send_rnr_notification sends a notification if connected. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._init_connection(False) + + count = dict(update_recv_seq=0) + + def _update_recv_seq(): + count["update_recv_seq"] += 1 + + peer._update_recv_seq = _update_recv_seq + + transmitted = [] + + def _transmit_frame(frame): + transmitted.append(frame) + + peer._transmit_frame = _transmit_frame + + peer._state = AX25PeerState.DISCONNECTED + + peer._send_rnr_notification() + + assert count == dict(update_recv_seq=0) + assert len(transmitted) == 0 + + +# I-Frame transmission + + +def test_send_next_iframe_max_outstanding(): + """ + Test I-frame transmission is suppressed if too many frames are pending. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._init_connection(False) + + count = dict(update_send_seq=0, update_recv_seq=0) + + def _update_recv_seq(): + count["update_recv_seq"] += 1 + + peer._update_recv_seq = _update_recv_seq + + def _update_send_seq(): + count["update_send_seq"] += 1 + + peer._update_send_seq = _update_send_seq + + transmitted = [] + + def _transmit_frame(frame): + transmitted.append(frame) + + peer._transmit_frame = _transmit_frame + + state_updates = [] + + def _update_state(**kwargs): + state_updates.append(kwargs) + + peer._update_state = _update_state + + peer._state = AX25PeerState.CONNECTED + peer._pending_iframes = { + 0: (0xF0, b"Frame 1"), + 1: (0xF0, b"Frame 2"), + 2: (0xF0, b"Frame 3"), + 3: (0xF0, b"Frame 4"), + 4: (0xF0, b"Frame 5"), + 5: (0xF0, b"Frame 6"), + 6: (0xF0, b"Frame 7"), + 7: (0xF0, b"Frame 8"), + } + peer._max_outstanding = 8 + + peer._send_next_iframe() + + assert count == dict(update_send_seq=0, update_recv_seq=0) + assert state_updates == [] + assert transmitted == [] + + +def test_send_next_iframe_nothing_pending(): + """ + Test I-frame transmission is suppressed no data is pending. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._init_connection(False) + + count = dict(update_send_seq=0, update_recv_seq=0) + + def _update_recv_seq(): + count["update_recv_seq"] += 1 + + peer._update_recv_seq = _update_recv_seq + + def _update_send_seq(): + count["update_send_seq"] += 1 + + peer._update_send_seq = _update_send_seq + + transmitted = [] + + def _transmit_frame(frame): + transmitted.append(frame) + + peer._transmit_frame = _transmit_frame + + state_updates = [] + + def _update_state(**kwargs): + state_updates.append(kwargs) + + peer._update_state = _update_state + + peer._state = AX25PeerState.CONNECTED + peer._pending_iframes = { + 0: (0xF0, b"Frame 1"), + 1: (0xF0, b"Frame 2"), + 2: (0xF0, b"Frame 3"), + 3: (0xF0, b"Frame 4"), + } + peer._max_outstanding = 8 + peer._send_state = 4 + + peer._send_next_iframe() + + assert count == dict(update_send_seq=0, update_recv_seq=0) + assert state_updates == [] + assert transmitted == [] + + +def test_send_next_iframe_create_next(): + """ + Test I-frame transmission creates a new I-frame if there's data to send. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._init_connection(False) + + count = dict(update_send_seq=0, update_recv_seq=0) + + def _update_recv_seq(): + count["update_recv_seq"] += 1 + + peer._update_recv_seq = _update_recv_seq + + def _update_send_seq(): + count["update_send_seq"] += 1 + + peer._update_send_seq = _update_send_seq + + transmitted = [] + + def _transmit_frame(frame): + transmitted.append(frame) + + peer._transmit_frame = _transmit_frame + + state_updates = [] + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + + peer._update_state = _update_state + + peer._state = AX25PeerState.CONNECTED + peer._pending_iframes = { + 0: (0xF0, b"Frame 1"), + 1: (0xF0, b"Frame 2"), + 2: (0xF0, b"Frame 3"), + 3: (0xF0, b"Frame 4"), + } + peer._pending_data = [ + (0xF0, b"Frame 5"), + ] + peer._max_outstanding = 8 + peer._send_state = 4 + + peer._send_next_iframe() + + assert peer._pending_iframes == { + 0: (0xF0, b"Frame 1"), + 1: (0xF0, b"Frame 2"), + 2: (0xF0, b"Frame 3"), + 3: (0xF0, b"Frame 4"), + 4: (0xF0, b"Frame 5"), + } + assert peer._pending_data == [] + assert count == dict(update_send_seq=1, update_recv_seq=1) + assert state_updates == [ + dict(prop="_send_state", delta=1, comment="send next I-frame") + ] + assert transmitted[1:] == [] + frame = transmitted.pop(0) + assert isinstance(frame, AX258BitInformationFrame) + assert frame.payload == b"Frame 5" + + +def test_send_next_iframe_existing_next(): + """ + Test I-frame transmission sends existing next frame. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + peer._init_connection(False) + + count = dict(update_send_seq=0, update_recv_seq=0) + + def _update_recv_seq(): + count["update_recv_seq"] += 1 + + peer._update_recv_seq = _update_recv_seq + + def _update_send_seq(): + count["update_send_seq"] += 1 + + peer._update_send_seq = _update_send_seq + + transmitted = [] + + def _transmit_frame(frame): + transmitted.append(frame) + + peer._transmit_frame = _transmit_frame + + state_updates = [] + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + + peer._update_state = _update_state + + peer._state = AX25PeerState.CONNECTED + peer._pending_iframes = { + 0: (0xF0, b"Frame 1"), + 1: (0xF0, b"Frame 2"), + 2: (0xF0, b"Frame 3"), + 3: (0xF0, b"Frame 4"), + } + peer._pending_data = [ + (0xF0, b"Frame 5"), + ] + peer._max_outstanding = 8 + peer._send_state = 3 + + peer._send_next_iframe() + + assert peer._pending_iframes == { + 0: (0xF0, b"Frame 1"), + 1: (0xF0, b"Frame 2"), + 2: (0xF0, b"Frame 3"), + 3: (0xF0, b"Frame 4"), + } + assert peer._pending_data == [ + (0xF0, b"Frame 5"), + ] + assert count == dict(update_send_seq=1, update_recv_seq=1) + assert state_updates == [ + dict(prop="_send_state", delta=1, comment="send next I-frame") + ] + assert transmitted[1:] == [] + frame = transmitted.pop(0) + assert isinstance(frame, AX258BitInformationFrame) + assert frame.payload == b"Frame 4" + + +# Sequence number state updates + + +def test_update_send_seq(): + """ + Test _update_send_seq copies V(S) to N(S). + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + state_updates = [] + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + + peer._update_state = _update_state + + peer._send_seq = 2 + peer._send_state = 6 + + peer._update_send_seq() + assert state_updates == [ + dict(prop="_send_seq", value=6, comment="from V(S)") + ] + + +def test_update_recv_seq(): + """ + Test _update_recv_seq copies V(R) to N(R). + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path(), + ) + + state_updates = [] + + def _update_state(prop, **kwargs): + kwargs["prop"] = prop + state_updates.append(kwargs) + + peer._update_state = _update_state + + peer._recv_state = 6 + peer._recv_seq = 2 + + peer._update_recv_seq() + assert state_updates == [ + dict(prop="_recv_seq", value=6, comment="from V(R)") + ] + + # SABM(E) handling