Skip to content

Commit

Permalink
Fix ignore test
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhaus committed Jan 16, 2024
1 parent 3895811 commit 641e4ef
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 103 deletions.
41 changes: 41 additions & 0 deletions tests/mqtt_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Utilities for testing MQTT."""
import asyncio
from collections.abc import Callable
import datetime
import json
from unittest.mock import MagicMock, Mock

from aiomqtt import Client

from deebot_client.event_bus import EventBus
from deebot_client.models import DeviceInfo
from deebot_client.mqtt_client import MqttClient, SubscriberInfo


async def verify_subscribe(
test_client: Client, device_info: DeviceInfo, mock: Mock, *, expected_called: bool
) -> None:
command = "test"
data = json.dumps({"test": str(datetime.datetime.now())}).encode("utf-8")
topic = f"iot/atr/{command}/{device_info.did}/{device_info.get_class}/{device_info.resource}/j"
await test_client.publish(topic, data)

await asyncio.sleep(0.1)
if expected_called:
mock.assert_called_with(command, data)
else:
mock.assert_not_called()

mock.reset_mock()


async def subscribe(
mqtt_client: MqttClient, device_info: DeviceInfo
) -> tuple[Mock, Mock, Callable[[], None]]:
events = Mock(spec=EventBus)
callback = MagicMock()
unsubscribe = await mqtt_client.subscribe(
SubscriberInfo(device_info, events, callback)
)
await asyncio.sleep(0.1)
return (events, callback, unsubscribe)
116 changes: 13 additions & 103 deletions tests/test_mqtt_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
from collections.abc import Callable
import datetime
import json
import logging
Expand All @@ -16,43 +15,11 @@
from deebot_client.commands.json.battery import GetBattery
from deebot_client.commands.json.volume import SetVolume
from deebot_client.const import DataType
from deebot_client.event_bus import EventBus
from deebot_client.exceptions import AuthenticationError
from deebot_client.models import Configuration, DeviceInfo
from deebot_client.mqtt_client import MqttClient, MqttConfiguration, SubscriberInfo
from deebot_client.mqtt_client import MqttClient, MqttConfiguration

from .fixtures.mqtt_server import MqttServer

_WAITING_AFTER_RESTART = 30


async def _verify_subscribe(
test_client: Client, device_info: DeviceInfo, mock: Mock, *, expected_called: bool
) -> None:
command = "test"
data = json.dumps({"test": str(datetime.datetime.now())}).encode("utf-8")
topic = f"iot/atr/{command}/{device_info.did}/{device_info.get_class}/{device_info.resource}/j"
await test_client.publish(topic, data)

await asyncio.sleep(0.1)
if expected_called:
mock.assert_called_with(command, data)
else:
mock.assert_not_called()

mock.reset_mock()


async def _subscribe(
mqtt_client: MqttClient, device_info: DeviceInfo
) -> tuple[Mock, Mock, Callable[[], None]]:
events = Mock(spec=EventBus)
callback = MagicMock()
unsubscribe = await mqtt_client.subscribe(
SubscriberInfo(device_info, events, callback)
)
await asyncio.sleep(0.1)
return (events, callback, unsubscribe)
from .mqtt_util import subscribe, verify_subscribe


async def test_last_message_received_at(
Expand All @@ -75,63 +42,6 @@ async def test_last_message_received_at(
assert mqtt_client.last_message_received_at == expected


@pytest.mark.skip(reason="Wait for sbtinstruments/aiomqtt#232 be merged")
@pytest.mark.timeout(_WAITING_AFTER_RESTART + 10)
async def test_client_reconnect_on_broker_error(
mqtt_client: MqttClient,
mqtt_server: MqttServer,
device_info: DeviceInfo,
mqtt_config: MqttConfiguration,
caplog: pytest.LogCaptureFixture,
) -> None:
(_, callback, _) = await _subscribe(mqtt_client, device_info)
async with Client(
hostname=mqtt_config.hostname,
port=mqtt_config.port,
identifier="Test-helper",
tls_context=mqtt_config.ssl_context,
) as client:
# test client cannot be used as we restart the broker in this test
await _verify_subscribe(client, device_info, callback, expected_called=True)

caplog.clear()
mqtt_server.stop()
await asyncio.sleep(0.1)

assert (
"deebot_client.mqtt_client",
logging.WARNING,
"Connection lost; Reconnecting in 5 seconds ...",
) in caplog.record_tuples
caplog.clear()

mqtt_server.run()

expected_log_tuple = (
"deebot_client.mqtt_client",
logging.DEBUG,
"All mqtt tasks created",
)
for i in range(_WAITING_AFTER_RESTART):
print(f"Wait for success reconnect... {i}/{_WAITING_AFTER_RESTART}")
if expected_log_tuple in caplog.record_tuples:
async with Client(
hostname=mqtt_config.hostname,
port=mqtt_config.port,
identifier="Test-helper",
tls_context=mqtt_config.ssl_context,
) as client:
# test client cannot be used as we restart the broker in this test
await _verify_subscribe(
client, device_info, callback, expected_called=True
)
return

await asyncio.sleep(1)

pytest.fail("Reconnect failed")


_test_MqttConfiguration_data = [
("cn", None, "mq.ecouser.net"),
("cn", "localhost", "localhost"),
Expand Down Expand Up @@ -182,38 +92,38 @@ def test_MqttConfiguration_hostname_none(config: Configuration) -> None:
async def test_client_bot_subscription(
mqtt_client: MqttClient, device_info: DeviceInfo, test_mqtt_client: Client
) -> None:
(_, callback, unsubscribe) = await _subscribe(mqtt_client, device_info)
(_, callback, unsubscribe) = await subscribe(mqtt_client, device_info)

await _verify_subscribe(
await verify_subscribe(
test_mqtt_client, device_info, callback, expected_called=True
)

unsubscribe()
await asyncio.sleep(0.1)

await _verify_subscribe(
await verify_subscribe(
test_mqtt_client, device_info, callback, expected_called=False
)


async def test_client_reconnect_manual(
mqtt_client: MqttClient, device_info: DeviceInfo, test_mqtt_client: Client
) -> None:
(_, callback, _) = await _subscribe(mqtt_client, device_info)
(_, callback, _) = await subscribe(mqtt_client, device_info)

await _verify_subscribe(
await verify_subscribe(
test_mqtt_client, device_info, callback, expected_called=True
)

await mqtt_client.disconnect()
await _verify_subscribe(
await verify_subscribe(
test_mqtt_client, device_info, callback, expected_called=False
)

await mqtt_client.connect()
await asyncio.sleep(0.1)

await _verify_subscribe(
await verify_subscribe(
test_mqtt_client, device_info, callback, expected_called=True
)

Expand Down Expand Up @@ -244,7 +154,7 @@ async def test_p2p_success(
test_mqtt_client: Client,
) -> None:
"""Test p2p workflow on SetVolume."""
(events, _, _) = await _subscribe(mqtt_client, device_info)
(events, _, _) = await subscribe(mqtt_client, device_info)
assert len(mqtt_client._received_p2p_commands) == 0

command_object = Mock(spec=SetVolume)
Expand Down Expand Up @@ -293,7 +203,7 @@ async def test_p2p_not_supported(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that unsupported command will be logged."""
await _subscribe(mqtt_client, device_info)
await subscribe(mqtt_client, device_info)
command_name: str = GetBattery.name

await _publish_p2p(
Expand Down Expand Up @@ -344,7 +254,7 @@ async def test_p2p_to_late(
"""Test p2p when response comes in to late."""
# reduce ttl to 1 seconds
mqtt_client._received_p2p_commands = TTLCache(maxsize=60 * 60, ttl=1)
await _subscribe(mqtt_client, device_info)
await subscribe(mqtt_client, device_info)
assert len(mqtt_client._received_p2p_commands) == 0

command_object = Mock(spec=SetVolume)
Expand Down Expand Up @@ -393,7 +303,7 @@ async def test_p2p_parse_error(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test p2p parse error."""
await _subscribe(mqtt_client, device_info)
await subscribe(mqtt_client, device_info)

command_object = Mock(spec=SetVolume)
command_name = SetVolume.name
Expand Down
79 changes: 79 additions & 0 deletions tests/test_mqtt_client_reconnect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import asyncio
from collections.abc import Generator
import logging

from aiomqtt import Client
import pytest

from deebot_client.models import DeviceInfo
from deebot_client.mqtt_client import MqttClient, MqttConfiguration

from .fixtures.mqtt_server import MqttServer
from .mqtt_util import subscribe, verify_subscribe

_WAITING_AFTER_RESTART = 30


@pytest.fixture
def mqtt_server() -> Generator[MqttServer, None, None]:
server = MqttServer()
server.config.options["ports"] = {"1883/tcp": 54321}
server.run()
yield server
server.stop()


@pytest.mark.timeout(_WAITING_AFTER_RESTART + 10)
async def test_client_reconnect_on_broker_error(
mqtt_client: MqttClient,
mqtt_server: MqttServer,
device_info: DeviceInfo,
mqtt_config: MqttConfiguration,
caplog: pytest.LogCaptureFixture,
) -> None:
(_, callback, _) = await subscribe(mqtt_client, device_info)
async with Client(
hostname=mqtt_config.hostname,
port=mqtt_config.port,
identifier="Test-helper",
tls_context=mqtt_config.ssl_context,
) as client:
# test client cannot be used as we restart the broker in this test
await verify_subscribe(client, device_info, callback, expected_called=True)

caplog.clear()
mqtt_server.stop()
await asyncio.sleep(0.1)

assert (
"deebot_client.mqtt_client",
logging.WARNING,
"Connection lost; Reconnecting in 5 seconds ...",
) in caplog.record_tuples
caplog.clear()

mqtt_server.run()

expected_log_tuple = (
"deebot_client.mqtt_client",
logging.DEBUG,
"All mqtt tasks created",
)
for i in range(_WAITING_AFTER_RESTART):
print(f"Wait for success reconnect... {i}/{_WAITING_AFTER_RESTART}")
if expected_log_tuple in caplog.record_tuples:
async with Client(
hostname=mqtt_config.hostname,
port=mqtt_config.port,
identifier="Test-helper",
tls_context=mqtt_config.ssl_context,
) as client:
# test client cannot be used as we restart the broker in this test
await verify_subscribe(
client, device_info, callback, expected_called=True
)
return

await asyncio.sleep(1)

pytest.fail("Reconnect failed")

0 comments on commit 641e4ef

Please sign in to comment.