diff --git a/tests/test_peer/test_connection.py b/tests/test_peer/test_connection.py index ba9e658..b36ca0c 100644 --- a/tests/test_peer/test_connection.py +++ b/tests/test_peer/test_connection.py @@ -12,6 +12,8 @@ AX25DisconnectModeFrame, AX25FrameRejectFrame, AX25UnnumberedAcknowledgeFrame, + AX25UnnumberedInformationFrame, + AX25RawFrame, AX25TestFrame, AX25SetAsyncBalancedModeFrame, AX25SetAsyncBalancedModeExtendedFrame, @@ -295,6 +297,304 @@ def _on_receive_ua(): assert count == dict(ua=1) +def test_recv_ui(): + """ + Test that UI is emitted by the received frame signal. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Create a handler for receiving the UI + rx_frames = [] + + def _on_receive_frame(frame, **kwargs): + assert "peer" in kwargs + assert kwargs.pop("peer") is peer + assert kwargs == {} + rx_frames.append(frame) + + peer.received_frame.connect(_on_receive_frame) + + # Set the state + peer._state = AX25PeerState.CONNECTED + + # Inject a frame + frame = AX25UnnumberedInformationFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + pid=0xF0, + payload=b"Testing 1 2 3 4", + ) + + peer._on_receive(frame) + + # Our handler should have been called + assert len(rx_frames) == 1 + assert rx_frames[0] is frame + + +def test_recv_raw_noconn(): + """ + Test that a raw frame without a connection triggers a DM frame. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub _send_dm + count = dict(send_dm=0) + + def _send_dm(): + count["send_dm"] += 1 + + peer._send_dm = _send_dm + + # Set the state + peer._state = AX25PeerState.DISCONNECTED + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x00\x00Testing 1 2 3 4", + ) + ) + + +def test_recv_raw_mod8_iframe(): + """ + Test that a I-frame with Mod8 connection is handled. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub _on_receive_iframe + iframes = [] + + def _on_receive_iframe(frame): + iframes.append(frame) + + peer._on_receive_iframe = _on_receive_iframe + + # Stub _on_receive_sframe + sframes = [] + + def _on_receive_sframe(frame): + sframes.append(frame) + + peer._on_receive_sframe = _on_receive_sframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 8 + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\xd4\xf0Testing 1 2 3 4", + ) + ) + + # Our I-frame handler should have been called + assert len(iframes) == 1 + assert isinstance(iframes[0], AX258BitInformationFrame) + assert iframes[0].pid == 0xf0 + assert iframes[0].payload == b"Testing 1 2 3 4" + + # Our S-frame handler should NOT have been called + assert sframes == [] + + +def test_recv_raw_mod128_iframe(): + """ + Test that a I-frame with Mod128 connection is handled. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub _on_receive_iframe + iframes = [] + + def _on_receive_iframe(frame): + iframes.append(frame) + + peer._on_receive_iframe = _on_receive_iframe + + # Stub _on_receive_sframe + sframes = [] + + def _on_receive_sframe(frame): + sframes.append(frame) + + peer._on_receive_sframe = _on_receive_sframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 128 + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x04\x0d\xf0Testing 1 2 3 4", + ) + ) + + # Our I-frame handler should have been called + assert len(iframes) == 1 + assert isinstance(iframes[0], AX2516BitInformationFrame) + assert iframes[0].pid == 0xf0 + assert iframes[0].payload == b"Testing 1 2 3 4" + + # Our S-frame handler should NOT have been called + assert sframes == [] + + +def test_recv_raw_mod8_sframe(): + """ + Test that a S-frame with Mod8 connection is handled. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub _on_receive_iframe + iframes = [] + + def _on_receive_iframe(frame): + iframes.append(frame) + + peer._on_receive_iframe = _on_receive_iframe + + # Stub _on_receive_sframe + sframes = [] + + def _on_receive_sframe(frame): + sframes.append(frame) + + peer._on_receive_sframe = _on_receive_sframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 8 + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x41", + ) + ) + + # Our S-frame handler should have been called + assert len(sframes) == 1 + assert isinstance(sframes[0], AX258BitReceiveReadyFrame) + + # Our I-frame handler should NOT have been called + assert iframes == [] + + +def test_recv_raw_mod128_sframe(): + """ + Test that a S-frame with Mod128 connection is handled. + """ + station = DummyStation(AX25Address("VK4MSL", ssid=1)) + peer = TestingAX25Peer( + station=station, + address=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + locked_path=True, + ) + + # Stub idle time-out handling + peer._reset_idle_timeout = lambda: None + + # Stub _on_receive_iframe + iframes = [] + + def _on_receive_iframe(frame): + iframes.append(frame) + + peer._on_receive_iframe = _on_receive_iframe + + # Stub _on_receive_sframe + sframes = [] + + def _on_receive_sframe(frame): + sframes.append(frame) + + peer._on_receive_sframe = _on_receive_sframe + + # Set the state + peer._state = AX25PeerState.CONNECTED + peer._modulo = 128 + + # Inject a frame + peer._on_receive( + AX25RawFrame( + destination=AX25Address("VK4MSL-1"), + source=AX25Address("VK4MSL"), + repeaters=AX25Path("VK4RZB"), + payload=b"\x01\x5c", + ) + ) + + # Our S-frame handler should have been called + assert len(sframes) == 1 + assert isinstance(sframes[0], AX2516BitReceiveReadyFrame) + + # Our I-frame handler should NOT have been called + assert iframes == [] + + def test_recv_disc(): """ Test that DISC is handled.