Skip to content

Commit

Permalink
station: Store and consider command bit state
Browse files Browse the repository at this point in the history
This appears to be indicative of which side sent the SABM: sender sends
SABM with its own SSID field clearing the C/H bit, and the destination
SSID setting the C/H bit.

So in the station context, we really have two separate peers:
- Peer with C/H cleared: is a peer that sent US a `SABM(E)`
- Peer with C/H set: is a peer that WE sent a `SABM(E)`

Default to C/H set to `True`, since most users will be making outbound
connections.
  • Loading branch information
sjlongland committed May 7, 2024
1 parent fca874c commit d2cf0ef
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 7 deletions.
14 changes: 11 additions & 3 deletions aioax25/station.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,20 @@ def detach(self):
)

def getpeer(
self, callsign, ssid=None, repeaters=None, create=True, **kwargs
self,
callsign,
ssid=None,
repeaters=None,
command=True,
create=True,
**kwargs
):
"""
Retrieve an instance of a peer context. This creates the peer
object if it doesn't already exist unless create is set to False
(in which case it will raise KeyError).
"""
address = AX25Address.decode(callsign, ssid).normalised
address = AX25Address.decode(callsign, ssid).normcopy(ch=command)
try:
return self._peers[address]
except KeyError:
Expand Down Expand Up @@ -199,7 +205,9 @@ def _on_receive(self, frame, **kwargs):
# If we're still here, then we don't handle unsolicited frames
# of this type, so pass it to a handler if we have one.
peer = self.getpeer(
frame.header.source, repeaters=frame.header.repeaters.reply
frame.header.source,
repeaters=frame.header.repeaters.reply,
command=frame.header.source.ch,
)
self._log.debug("Passing frame to peer %s: %s", peer.address, frame)
peer._on_receive(frame)
Expand Down
41 changes: 37 additions & 4 deletions tests/test_station/test_getpeer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,62 @@ def test_unknown_peer_nocreate_keyerror():
except KeyError as e:
assert str(e) == (
"AX25Address(callsign=VK4BWI, ssid=0, "
"ch=False, res0=True, res1=True, extension=False)"
"ch=True, res0=True, res1=True, extension=False)"
)


def test_unknown_peer_create_instance():
def test_unknown_peer_create_instance_ch():
"""
Test fetching an unknown peer with create=True generates peer
Test fetching an unknown peer with create=True generates peer with C/H set
"""
station = AX25Station(interface=DummyInterface(), callsign="VK4MSL-5")
peer = station.getpeer("VK4BWI", create=True)
assert isinstance(peer, AX25Peer)
assert peer.address.ch is True


def test_unknown_peer_create_instance_noch():
"""
Test fetching an unknown peer with create=True and command=False generates
peer with C/H clear
"""
station = AX25Station(interface=DummyInterface(), callsign="VK4MSL-5")
peer = station.getpeer("VK4BWI", create=True, command=False)
assert isinstance(peer, AX25Peer)
assert peer.address.ch is False


def test_known_peer_fetch_instance():
"""
Test fetching an known peer returns that known peer
"""
station = AX25Station(interface=DummyInterface(), callsign="VK4MSL-5")
mypeer = DummyPeer(station, AX25Address("VK4BWI"))
mypeer = DummyPeer(station, AX25Address("VK4BWI", ch=True))

# Inject the peer
station._peers[mypeer._address] = mypeer

# Retrieve the peer instance
peer = station.getpeer("VK4BWI")
assert peer is mypeer


def test_known_peer_fetch_instance_ch():
"""
Test fetching peers differentiates command bits
"""
station = AX25Station(interface=DummyInterface(), callsign="VK4MSL-5")
mypeer_in = DummyPeer(station, AX25Address("VK4BWI", ch=False))
mypeer_out = DummyPeer(station, AX25Address("VK4BWI", ch=True))

# Inject the peers
station._peers[mypeer_in._address] = mypeer_in
station._peers[mypeer_out._address] = mypeer_out

# Retrieve the peer instance
peer = station.getpeer("VK4BWI", command=True)
assert peer is mypeer_out

# Retrieve the other peer instance
peer = station.getpeer("VK4BWI", command=False)
assert peer is mypeer_in
43 changes: 43 additions & 0 deletions tests/test_station/test_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,46 @@ def stub_on_test_frame(*args, **kwargs):
assert rx_call_kwargs == {}
assert len(rx_call_args) == 1
assert rx_call_args[0] is txframe


def test_route_incoming_msg_ch():
"""
Test passing a frame considers C/H bits.
"""
interface = DummyInterface()
station = AX25Station(interface=interface, callsign="VK4MSL-5")

# Stub out _on_test_frame
def stub_on_test_frame(*args, **kwargs):
assert False, "Should not have been called"

station._on_test_frame = stub_on_test_frame

# Inject a couple of peers
peer1 = DummyPeer(station, AX25Address("VK4BWI", ssid=7, ch=False))
peer2 = DummyPeer(station, AX25Address("VK4BWI", ssid=7, ch=True))
station._peers[peer1._address] = peer1
station._peers[peer2._address] = peer2

# Pass in the message
txframe = AX25UnnumberedInformationFrame(
destination="VK4MSL-5",
source="VK4BWI-7*",
cr=True,
pid=0xAB,
payload=b"This is a test frame",
)
station._on_receive(frame=txframe)

# There should be no replies queued
assert interface.bind_calls == []
assert interface.unbind_calls == []
assert interface.transmit_calls == []

# This should have gone to peer2, not peer1
assert peer1.on_receive_calls == []
assert len(peer2.on_receive_calls) == 1
(rx_call_args, rx_call_kwargs) = peer2.on_receive_calls.pop()
assert rx_call_kwargs == {}
assert len(rx_call_args) == 1
assert rx_call_args[0] is txframe

0 comments on commit d2cf0ef

Please sign in to comment.