Skip to content

Commit

Permalink
peer unit tests: Test ACK timer handling
Browse files Browse the repository at this point in the history
  • Loading branch information
sjlongland committed May 8, 2024
1 parent 2cc665a commit 76f1aae
Showing 1 changed file with 115 additions and 0 deletions.
115 changes: 115 additions & 0 deletions tests/test_peer/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3008,3 +3008,118 @@ def _start_disconnect_ack_timer():
assert peer._state == AX25PeerState.DISCONNECTING
assert actions == ["sent-disc", "start-ack-timer"]
assert peer._uaframe_handler == peer._on_disconnect


# ACK timer handling


def test_start_connect_ack_timer():
"""
Test _start_connect_ack_timer schedules _on_incoming_connect_timeout
to fire after _ack_timeout.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

count = dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=0)

def _on_incoming_connect_timeout():
count["on_incoming_connect_timeout"] += 1

peer._on_incoming_connect_timeout = _on_incoming_connect_timeout

def _on_disc_ua_timeout():
count["on_disc_ua_timeout"] += 1

peer._on_disc_ua_timeout = _on_disc_ua_timeout

assert peer._ack_timeout_handle is None

peer._start_connect_ack_timer()

assert peer._ack_timeout_handle is not None
assert peer._ack_timeout_handle.delay == peer._ack_timeout

assert count == dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=0)
peer._ack_timeout_handle.callback()
assert count == dict(on_incoming_connect_timeout=1, on_disc_ua_timeout=0)


def test_start_disconnect_ack_timer():
"""
Test _start_disconnect_ack_timer schedules _on_disc_ua_timeout
to fire after _ack_timeout.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

count = dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=0)

def _on_incoming_connect_timeout():
count["on_incoming_connect_timeout"] += 1

peer._on_incoming_connect_timeout = _on_incoming_connect_timeout

def _on_disc_ua_timeout():
count["on_disc_ua_timeout"] += 1

peer._on_disc_ua_timeout = _on_disc_ua_timeout

assert peer._ack_timeout_handle is None

peer._start_disconnect_ack_timer()

assert peer._ack_timeout_handle is not None
assert peer._ack_timeout_handle.delay == peer._ack_timeout

assert count == dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=0)
peer._ack_timeout_handle.callback()
assert count == dict(on_incoming_connect_timeout=0, on_disc_ua_timeout=1)


def test_stop_ack_timer_existing():
"""
Test _stop_ack_timer cancels the existing time-out.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

timeout = DummyTimeout(None, None)
peer._ack_timeout_handle = timeout

peer._stop_ack_timer()

assert peer._ack_timeout_handle is None
assert timeout.cancelled is True


def test_stop_ack_timer_notexisting():
"""
Test _stop_ack_timer does nothing if no time-out pending.
"""
station = DummyStation(AX25Address("VK4MSL", ssid=1))
peer = TestingAX25Peer(
station=station,
address=AX25Address("VK4MSL"),
repeaters=AX25Path("VK4RZB"),
locked_path=True,
)

peer._ack_timeout_handle = None

peer._stop_ack_timer()

0 comments on commit 76f1aae

Please sign in to comment.