diff --git a/aioax25/station.py b/aioax25/station.py index 9b7d6b9..1b42261 100644 --- a/aioax25/station.py +++ b/aioax25/station.py @@ -142,14 +142,20 @@ def detach(self): ) def getpeer( - self, callsign, ssid=None, repeaters=None, create=True, **kwargs + self, + callsign, + ssid=None, + repeaters=None, + command=True, + create=True, + **kwargs ): """ Retrieve an instance of a peer context. This creates the peer object if it doesn't already exist unless create is set to False (in which case it will raise KeyError). """ - address = AX25Address.decode(callsign, ssid).normalised + address = AX25Address.decode(callsign, ssid).normcopy(ch=command) try: return self._peers[address] except KeyError: @@ -199,7 +205,9 @@ def _on_receive(self, frame, **kwargs): # If we're still here, then we don't handle unsolicited frames # of this type, so pass it to a handler if we have one. peer = self.getpeer( - frame.header.source, repeaters=frame.header.repeaters.reply + frame.header.source, + repeaters=frame.header.repeaters.reply, + command=frame.header.source.ch, ) self._log.debug("Passing frame to peer %s: %s", peer.address, frame) peer._on_receive(frame) diff --git a/tests/test_station/test_getpeer.py b/tests/test_station/test_getpeer.py index db572e5..2b36025 100644 --- a/tests/test_station/test_getpeer.py +++ b/tests/test_station/test_getpeer.py @@ -18,17 +18,29 @@ def test_unknown_peer_nocreate_keyerror(): except KeyError as e: assert str(e) == ( "AX25Address(callsign=VK4BWI, ssid=0, " - "ch=False, res0=True, res1=True, extension=False)" + "ch=True, res0=True, res1=True, extension=False)" ) -def test_unknown_peer_create_instance(): +def test_unknown_peer_create_instance_ch(): """ - Test fetching an unknown peer with create=True generates peer + Test fetching an unknown peer with create=True generates peer with C/H set """ station = AX25Station(interface=DummyInterface(), callsign="VK4MSL-5") peer = station.getpeer("VK4BWI", create=True) assert isinstance(peer, AX25Peer) + assert peer.address.ch is True + + +def test_unknown_peer_create_instance_noch(): + """ + Test fetching an unknown peer with create=True and command=False generates + peer with C/H clear + """ + station = AX25Station(interface=DummyInterface(), callsign="VK4MSL-5") + peer = station.getpeer("VK4BWI", create=True, command=False) + assert isinstance(peer, AX25Peer) + assert peer.address.ch is False def test_known_peer_fetch_instance(): @@ -36,7 +48,7 @@ def test_known_peer_fetch_instance(): Test fetching an known peer returns that known peer """ station = AX25Station(interface=DummyInterface(), callsign="VK4MSL-5") - mypeer = DummyPeer(station, AX25Address("VK4BWI")) + mypeer = DummyPeer(station, AX25Address("VK4BWI", ch=True)) # Inject the peer station._peers[mypeer._address] = mypeer @@ -44,3 +56,24 @@ def test_known_peer_fetch_instance(): # Retrieve the peer instance peer = station.getpeer("VK4BWI") assert peer is mypeer + + +def test_known_peer_fetch_instance_ch(): + """ + Test fetching peers differentiates command bits + """ + station = AX25Station(interface=DummyInterface(), callsign="VK4MSL-5") + mypeer_in = DummyPeer(station, AX25Address("VK4BWI", ch=False)) + mypeer_out = DummyPeer(station, AX25Address("VK4BWI", ch=True)) + + # Inject the peers + station._peers[mypeer_in._address] = mypeer_in + station._peers[mypeer_out._address] = mypeer_out + + # Retrieve the peer instance + peer = station.getpeer("VK4BWI", command=True) + assert peer is mypeer_out + + # Retrieve the other peer instance + peer = station.getpeer("VK4BWI", command=False) + assert peer is mypeer_in diff --git a/tests/test_station/test_receive.py b/tests/test_station/test_receive.py index 4bcaac6..c8ef135 100644 --- a/tests/test_station/test_receive.py +++ b/tests/test_station/test_receive.py @@ -132,3 +132,46 @@ def stub_on_test_frame(*args, **kwargs): assert rx_call_kwargs == {} assert len(rx_call_args) == 1 assert rx_call_args[0] is txframe + + +def test_route_incoming_msg_ch(): + """ + Test passing a frame considers C/H bits. + """ + interface = DummyInterface() + station = AX25Station(interface=interface, callsign="VK4MSL-5") + + # Stub out _on_test_frame + def stub_on_test_frame(*args, **kwargs): + assert False, "Should not have been called" + + station._on_test_frame = stub_on_test_frame + + # Inject a couple of peers + peer1 = DummyPeer(station, AX25Address("VK4BWI", ssid=7, ch=False)) + peer2 = DummyPeer(station, AX25Address("VK4BWI", ssid=7, ch=True)) + station._peers[peer1._address] = peer1 + station._peers[peer2._address] = peer2 + + # Pass in the message + txframe = AX25UnnumberedInformationFrame( + destination="VK4MSL-5", + source="VK4BWI-7*", + cr=True, + pid=0xAB, + payload=b"This is a test frame", + ) + station._on_receive(frame=txframe) + + # There should be no replies queued + assert interface.bind_calls == [] + assert interface.unbind_calls == [] + assert interface.transmit_calls == [] + + # This should have gone to peer2, not peer1 + assert peer1.on_receive_calls == [] + assert len(peer2.on_receive_calls) == 1 + (rx_call_args, rx_call_kwargs) = peer2.on_receive_calls.pop() + assert rx_call_kwargs == {} + assert len(rx_call_args) == 1 + assert rx_call_args[0] is txframe