Skip to content

Commit

Permalink
peer unit tests: Test decoding of I/S-frames.
Browse files Browse the repository at this point in the history
  • Loading branch information
sjlongland committed May 7, 2024
1 parent 7bf880d commit e926d6b
Showing 1 changed file with 300 additions and 0 deletions.
300 changes: 300 additions & 0 deletions tests/test_peer/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
AX25DisconnectModeFrame,
AX25FrameRejectFrame,
AX25UnnumberedAcknowledgeFrame,
AX25UnnumberedInformationFrame,
AX25RawFrame,
AX25TestFrame,
AX25SetAsyncBalancedModeFrame,
AX25SetAsyncBalancedModeExtendedFrame,
Expand Down Expand Up @@ -295,6 +297,304 @@ def _on_receive_ua():
assert count == dict(ua=1)


def test_recv_ui():
"""
Test that UI is emitted by the received frame signal.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub idle time-out handling
peer._reset_idle_timeout = lambda: None

# Create a handler for receiving the UI
rx_frames = []

def _on_receive_frame(frame, **kwargs):
assert "peer" in kwargs
assert kwargs.pop("peer") is peer
assert kwargs == {}
rx_frames.append(frame)

peer.received_frame.connect(_on_receive_frame)

# Set the state
peer._state = AX25PeerState.CONNECTED

# Inject a frame
frame = AX25UnnumberedInformationFrame(
destination=AX25Address("VK4MSL-1"),
source=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
pid=0xF0,
payload=b"Testing 1 2 3 4",
)

peer._on_receive(frame)

# Our handler should have been called
assert len(rx_frames) == 1
assert rx_frames[0] is frame


def test_recv_raw_noconn():
"""
Test that a raw frame without a connection triggers a DM frame.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub idle time-out handling
peer._reset_idle_timeout = lambda: None

# Stub _send_dm
count = dict(send_dm=0)

def _send_dm():
count["send_dm"] += 1

peer._send_dm = _send_dm

# Set the state
peer._state = AX25PeerState.DISCONNECTED

# Inject a frame
peer._on_receive(
AX25RawFrame(
destination=AX25Address("VK4MSL-1"),
source=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
payload=b"\x00\x00Testing 1 2 3 4",
)
)


def test_recv_raw_mod8_iframe():
"""
Test that a I-frame with Mod8 connection is handled.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub idle time-out handling
peer._reset_idle_timeout = lambda: None

# Stub _on_receive_iframe
iframes = []

def _on_receive_iframe(frame):
iframes.append(frame)

peer._on_receive_iframe = _on_receive_iframe

# Stub _on_receive_sframe
sframes = []

def _on_receive_sframe(frame):
sframes.append(frame)

peer._on_receive_sframe = _on_receive_sframe

# Set the state
peer._state = AX25PeerState.CONNECTED
peer._modulo = 8

# Inject a frame
peer._on_receive(
AX25RawFrame(
destination=AX25Address("VK4MSL-1"),
source=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
payload=b"\xd4\xf0Testing 1 2 3 4",
)
)

# Our I-frame handler should have been called
assert len(iframes) == 1
assert isinstance(iframes[0], AX258BitInformationFrame)
assert iframes[0].pid == 0xf0
assert iframes[0].payload == b"Testing 1 2 3 4"

# Our S-frame handler should NOT have been called
assert sframes == []


def test_recv_raw_mod128_iframe():
"""
Test that a I-frame with Mod128 connection is handled.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub idle time-out handling
peer._reset_idle_timeout = lambda: None

# Stub _on_receive_iframe
iframes = []

def _on_receive_iframe(frame):
iframes.append(frame)

peer._on_receive_iframe = _on_receive_iframe

# Stub _on_receive_sframe
sframes = []

def _on_receive_sframe(frame):
sframes.append(frame)

peer._on_receive_sframe = _on_receive_sframe

# Set the state
peer._state = AX25PeerState.CONNECTED
peer._modulo = 128

# Inject a frame
peer._on_receive(
AX25RawFrame(
destination=AX25Address("VK4MSL-1"),
source=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
payload=b"\x04\x0d\xf0Testing 1 2 3 4",
)
)

# Our I-frame handler should have been called
assert len(iframes) == 1
assert isinstance(iframes[0], AX2516BitInformationFrame)
assert iframes[0].pid == 0xf0
assert iframes[0].payload == b"Testing 1 2 3 4"

# Our S-frame handler should NOT have been called
assert sframes == []


def test_recv_raw_mod8_sframe():
"""
Test that a S-frame with Mod8 connection is handled.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub idle time-out handling
peer._reset_idle_timeout = lambda: None

# Stub _on_receive_iframe
iframes = []

def _on_receive_iframe(frame):
iframes.append(frame)

peer._on_receive_iframe = _on_receive_iframe

# Stub _on_receive_sframe
sframes = []

def _on_receive_sframe(frame):
sframes.append(frame)

peer._on_receive_sframe = _on_receive_sframe

# Set the state
peer._state = AX25PeerState.CONNECTED
peer._modulo = 8

# Inject a frame
peer._on_receive(
AX25RawFrame(
destination=AX25Address("VK4MSL-1"),
source=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
payload=b"\x41",
)
)

# Our S-frame handler should have been called
assert len(sframes) == 1
assert isinstance(sframes[0], AX258BitReceiveReadyFrame)

# Our I-frame handler should NOT have been called
assert iframes == []


def test_recv_raw_mod128_sframe():
"""
Test that a S-frame with Mod128 connection is handled.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

# Stub idle time-out handling
peer._reset_idle_timeout = lambda: None

# Stub _on_receive_iframe
iframes = []

def _on_receive_iframe(frame):
iframes.append(frame)

peer._on_receive_iframe = _on_receive_iframe

# Stub _on_receive_sframe
sframes = []

def _on_receive_sframe(frame):
sframes.append(frame)

peer._on_receive_sframe = _on_receive_sframe

# Set the state
peer._state = AX25PeerState.CONNECTED
peer._modulo = 128

# Inject a frame
peer._on_receive(
AX25RawFrame(
destination=AX25Address("VK4MSL-1"),
source=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
payload=b"\x01\x5c",
)
)

# Our S-frame handler should have been called
assert len(sframes) == 1
assert isinstance(sframes[0], AX2516BitReceiveReadyFrame)

# Our I-frame handler should NOT have been called
assert iframes == []


def test_recv_disc():
"""
Test that DISC is handled.
Expand Down

0 comments on commit e926d6b

Please sign in to comment.