diff --git a/amqtt/broker.py b/amqtt/broker.py index fb25bf76..0ea461e2 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -9,6 +9,7 @@ import re from asyncio import CancelledError from collections import deque +from enum import Enum from functools import partial from transitions import Machine, MachineError @@ -43,6 +44,11 @@ EVENT_BROKER_MESSAGE_RECEIVED = "broker_message_received" +class Action(Enum): + subscribe = "subscribe" + publish = "publish" + + class BrokerException(Exception): pass @@ -620,21 +626,33 @@ async def client_connected( % client_session.client_id ) break - await self.plugins_manager.fire_event( - EVENT_BROKER_MESSAGE_RECEIVED, - client_id=client_session.client_id, - message=app_message, - ) - await self._broadcast_message( - client_session, app_message.topic, app_message.data + + # See if the user is allowed to publish to this topic. + permitted = await self.topic_filtering( + client_session, topic=app_message.topic, action=Action.publish ) - if app_message.publish_packet.retain_flag: - self.retain_message( - client_session, + if not permitted: + self.logger.info( + "%s forbidden TOPIC %s sent in PUBLISH message.", + client_session.client_id, app_message.topic, - app_message.data, - app_message.qos, ) + else: + await self.plugins_manager.fire_event( + EVENT_BROKER_MESSAGE_RECEIVED, + client_id=client_session.client_id, + message=app_message, + ) + await self._broadcast_message( + client_session, app_message.topic, app_message.data + ) + if app_message.publish_packet.retain_flag: + self.retain_message( + client_session, + app_message.topic, + app_message.data, + app_message.qos, + ) wait_deliver = asyncio.Task( handler.mqtt_deliver_next_message(), loop=self._loop ) @@ -703,7 +721,7 @@ async def authenticate(self, session: Session, listener): # If all plugins returned True, authentication is success return auth_result - async def topic_filtering(self, session: Session, topic): + async def topic_filtering(self, session: Session, topic, action: Action): """ This method call the topic_filtering method on registered plugins to check that the subscription is allowed. User is considered allowed if all plugins called return True. @@ -713,7 +731,8 @@ async def topic_filtering(self, session: Session, topic): - None if topic filtering can't be achieved (then plugin result is then ignored) :param session: :param listener: - :param topic: Topic in which the client wants to subscribe + :param topic: Topic in which the client wants to subscribe / publish + :param action: What is being done with the topic? subscribe or publish :return: """ topic_plugins = None @@ -724,6 +743,7 @@ async def topic_filtering(self, session: Session, topic): "topic_filtering", session=session, topic=topic, + action=action, filter_plugins=topic_plugins, ) topic_result = True @@ -773,7 +793,9 @@ async def add_subscription(self, subscription, session): # [MQTT-4.7.1-3] + wildcard character must occupy entire level return 0x80 # Check if the client is authorised to connect to the topic - permitted = await self.topic_filtering(session, topic=a_filter) + permitted = await self.topic_filtering( + session, topic=a_filter, action=Action.subscribe + ) if not permitted: return 0x80 qos = subscription[1] diff --git a/amqtt/plugins/topic_checking.py b/amqtt/plugins/topic_checking.py index b52a156f..aa2179f1 100644 --- a/amqtt/plugins/topic_checking.py +++ b/amqtt/plugins/topic_checking.py @@ -1,3 +1,6 @@ +from ..broker import Action + + class BaseTopicPlugin: def __init__(self, context): self.context = context @@ -7,6 +10,7 @@ def __init__(self, context): self.context.logger.warning( "'topic-check' section not found in context configuration" ) + self.topic_config = None def topic_filtering(self, *args, **kwargs): if not self.topic_config: @@ -66,11 +70,24 @@ async def topic_filtering(self, *args, **kwargs): if filter_result: session = kwargs.get("session", None) req_topic = kwargs.get("topic", None) + action = kwargs.get("action", None) + + # hbmqtt and older amqtt do not support publish filtering + if action == Action.publish and "publish-acl" not in self.topic_config: + # maintain backward compatibility, assume permitted + return True + if req_topic: username = session.username if username is None: username = "anonymous" - allowed_topics = self.topic_config["acl"].get(username, None) + + if action == Action.publish: + acl = self.topic_config["publish-acl"] + elif action == Action.subscribe: + acl = self.topic_config["acl"] + + allowed_topics = acl.get(username, None) if allowed_topics: for allowed_topic in allowed_topics: if self.topic_ac(req_topic, allowed_topic): @@ -80,3 +97,5 @@ async def topic_filtering(self, *args, **kwargs): return False else: return False + else: + return False diff --git a/poetry.lock b/poetry.lock index 49dcd837..1a789525 100644 --- a/poetry.lock +++ b/poetry.lock @@ -429,6 +429,17 @@ pytest = ">=4.6" [package.extras] testing = ["fields", "hunter", "process-tests (==2.0.2)", "six", "pytest-xdist", "virtualenv"] +[[package]] +name = "pytest-logdog" +version = "0.1.0" +description = "Pytest plugin to test logging" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +pytest = ">=6.2.0" + [[package]] name = "pyyaml" version = "5.4.1" @@ -565,7 +576,7 @@ ci = ["coveralls"] [metadata] lock-version = "1.1" python-versions = "^3.7" -content-hash = "8eea41d1ef5f01f58a973a749a766740aa529fc0b94a8fcf71c6e91babdd6613" +content-hash = "2030ca9b6173701a0b94bd85c8c7b26f30ea9f7ad5be320a2b97c51c847afb62" [metadata.files] appdirs = [ @@ -800,6 +811,10 @@ pytest-cov = [ {file = "pytest-cov-2.11.1.tar.gz", hash = "sha256:359952d9d39b9f822d9d29324483e7ba04a3a17dd7d05aa6beb7ea01e359e5f7"}, {file = "pytest_cov-2.11.1-py2.py3-none-any.whl", hash = "sha256:bdb9fdb0b85a7cc825269a4c56b48ccaa5c7e365054b6038772c32ddcdc969da"}, ] +pytest-logdog = [ + {file = "pytest-logdog-0.1.0.tar.gz", hash = "sha256:b84aca02b6b609bda8bfcd6d0207a428b146cd706d14c7095a3ba79429ab534b"}, + {file = "pytest_logdog-0.1.0-py3-none-any.whl", hash = "sha256:4d5a4c46442ca7da73b1cf6c9ebea144958a0a6258ba19ad7bf877dec22400e8"}, +] pyyaml = [ {file = "PyYAML-5.4.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:3b2b1824fe7112845700f815ff6a489360226a5609b96ec2190a45e62a9fc922"}, {file = "PyYAML-5.4.1-cp27-cp27m-win32.whl", hash = "sha256:129def1b7c1bf22faffd67b8f3724645203b79d8f4cc81f674654d9902cb4393"}, diff --git a/pyproject.toml b/pyproject.toml index 244b6612..3564b571 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ pylint = "^2.7.2" black = "^20.8b1" flake8 = "^3.9.0" hypothesis = "^6.10.0" +pytest-logdog = "^0.1.0" [tool.poetry.scripts] diff --git a/tests/conftest.py b/tests/conftest.py index 5b58d47e..9aab33d9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,12 @@ import unittest.mock +import os.path import pytest import amqtt.broker +pytest_plugins = ["pytest_logdog"] + test_config = { "listeners": { "default": {"type": "tcp", "bind": "127.0.0.1:1883", "max_connections": 10}, @@ -15,6 +18,29 @@ } +test_config_acl = { + "listeners": { + "default": {"type": "tcp", "bind": "127.0.0.1:1884", "max_connections": 10}, + }, + "sys_interval": 0, + "auth": { + "plugins": ["auth_file"], + "password-file": os.path.join( + os.path.dirname(os.path.realpath(__file__)), "plugins", "passwd" + ), + }, + "topic-check": { + "enabled": True, + "plugins": ["topic_acl"], + "acl": { + "user1": ["public/#"], + "user2": ["#"], + }, + "publish-acl": {"user1": ["public/subtopic/#"]}, + }, +} + + @pytest.fixture(scope="function") def mock_plugin_manager(): with unittest.mock.patch("amqtt.broker.PluginManager") as plugin_manager: @@ -36,3 +62,19 @@ async def broker(mock_plugin_manager): if not broker.transitions.is_stopped(): await broker.shutdown() + + +@pytest.fixture(scope="function") +async def acl_broker(): + broker = amqtt.broker.Broker( + test_config_acl, plugin_namespace="amqtt.broker.plugins" + ) + await broker.start() + assert broker.transitions.is_started() + assert broker._sessions == {} + assert "default" in broker._servers + + yield broker + + if not broker.transitions.is_stopped(): + await broker.shutdown() diff --git a/tests/plugins/passwd b/tests/plugins/passwd index 07fda795..c2dfe648 100644 --- a/tests/plugins/passwd +++ b/tests/plugins/passwd @@ -1,3 +1,7 @@ # Test password file user:$6$1sSXVMBVKMF$uDStq59YfiuFxVF1Gi/.i/Li7Vwf5iTwg8LovLKmCvM5FsRNJM.OPWHhXwI2.4AscLZXSPQVxpIlX6laUl9570 -test_user:$6$.c9f9sAzs5YXX2de$GSdOi3iFwHJRCIJn1W63muDFQAL29yoFmU/TXcwDB42F2BZg3zcN5uKBM0.1PjwdMpWHRydbhXWSc3uWKSmKr. \ No newline at end of file +test_user:$6$.c9f9sAzs5YXX2de$GSdOi3iFwHJRCIJn1W63muDFQAL29yoFmU/TXcwDB42F2BZg3zcN5uKBM0.1PjwdMpWHRydbhXWSc3uWKSmKr. +# Password for these is "${USER}password" +user1:$6$h.fV0zYsXI$8wKblqETpztRKcPD6OLWZc1mU4nW5yQ713R5ECs7EwJa7oas/yrhI2itUdhETI8BvmtfGy65ltAMap9gHkzdc1 +user2:$6$bUyF8v0mTo94$IJMa2BlCd6/mAM5N5s6heFB2ewC4j3CDkFb9mYIoarXg4OxiZVnLyh20IWHf6GY8i4sc5m2D9nWLrtbnIO5o8. +user3:$6$YOjKg1kYEeGibb$HlVa2EvQbF1ssSQs.lFnS1NhoTBQLG5YF7h0z4komAEvNJw6m4gay81MRp.lt4PSbcVrimcuRbidR9cRZkfBb/ diff --git a/tests/plugins/test_topic_checking.py b/tests/plugins/test_topic_checking.py new file mode 100644 index 00000000..ae4be1ed --- /dev/null +++ b/tests/plugins/test_topic_checking.py @@ -0,0 +1,508 @@ +import pytest +import logging + +from amqtt.plugins.manager import BaseContext +from amqtt.plugins.topic_checking import ( + BaseTopicPlugin, + TopicTabooPlugin, + TopicAccessControlListPlugin, + Action, +) +from amqtt.session import Session + + +# Base plug-in object + + +@pytest.mark.asyncio +async def test_base_no_config(logdog): + """ + Check BaseTopicPlugin returns false if no topic-check is present. + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {} + + plugin = BaseTopicPlugin(context) + authorised = plugin.topic_filtering() + assert authorised is False + + # Should have printed a couple of warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 2 + assert log_records[0].levelno == logging.WARN + assert ( + log_records[0].message + == "'topic-check' section not found in context configuration" + ) + + assert log_records[1].levelno == logging.WARN + assert log_records[1].message == "'auth' section not found in context configuration" + assert pile.is_empty() + + +@pytest.mark.asyncio +async def test_base_empty_config(logdog): + """ + Check BaseTopicPlugin returns false if topic-check is empty. + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {}} + + plugin = BaseTopicPlugin(context) + authorised = plugin.topic_filtering() + assert authorised is False + + # Should have printed just one warning + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 1 + assert log_records[0].levelno == logging.WARN + assert log_records[0].message == "'auth' section not found in context configuration" + + +@pytest.mark.asyncio +async def test_base_disabled_config(logdog): + """ + Check BaseTopicPlugin returns true if disabled. (it doesn't actually check) + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {"enabled": False}} + + plugin = BaseTopicPlugin(context) + authorised = plugin.topic_filtering() + assert authorised is True + + # Should NOT have printed warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 0 + + +@pytest.mark.asyncio +async def test_base_enabled_config(logdog): + """ + Check BaseTopicPlugin returns true if enabled. + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {"enabled": True}} + + plugin = BaseTopicPlugin(context) + authorised = plugin.topic_filtering() + assert authorised is True + + # Should NOT have printed warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 0 + + +# Taboo plug-in + + +@pytest.mark.asyncio +async def test_taboo_empty_config(logdog): + """ + Check TopicTabooPlugin returns false if topic-check absent. + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {} + + plugin = TopicTabooPlugin(context) + assert (await plugin.topic_filtering()) is False + + # Should have printed a couple of warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 2 + assert log_records[0].levelno == logging.WARN + assert ( + log_records[0].message + == "'topic-check' section not found in context configuration" + ) + assert log_records[1].levelno == logging.WARN + assert log_records[1].message == "'auth' section not found in context configuration" + + +@pytest.mark.asyncio +async def test_taboo_disabled(logdog): + """ + Check TopicTabooPlugin returns true if checking disabled. + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {"enabled": False}} + + session = Session() + session.username = "anybody" + + plugin = TopicTabooPlugin(context) + assert ( + await plugin.topic_filtering(session=session, topic="not/prohibited") + ) is True + + # Should NOT have printed warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 0 + + +@pytest.mark.asyncio +async def test_taboo_not_taboo_topic(logdog): + """ + Check TopicTabooPlugin returns true if topic not taboo + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {"enabled": True}} + + session = Session() + session.username = "anybody" + + plugin = TopicTabooPlugin(context) + assert ( + await plugin.topic_filtering(session=session, topic="not/prohibited") + ) is True + + # Should NOT have printed warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 0 + + +@pytest.mark.asyncio +async def test_taboo_anon_taboo_topic(logdog): + """ + Check TopicTabooPlugin returns false if topic is taboo and session is anonymous. + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {"enabled": True}} + + session = Session() + session.username = "" + + plugin = TopicTabooPlugin(context) + assert ( + await plugin.topic_filtering(session=session, topic="prohibited") + ) is False + + # Should NOT have printed warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 0 + + +@pytest.mark.asyncio +async def test_taboo_notadmin_taboo_topic(logdog): + """ + Check TopicTabooPlugin returns false if topic is taboo and user is not "admin". + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {"enabled": True}} + + session = Session() + session.username = "notadmin" + + plugin = TopicTabooPlugin(context) + assert ( + await plugin.topic_filtering(session=session, topic="prohibited") + ) is False + + # Should NOT have printed warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 0 + + +@pytest.mark.asyncio +async def test_taboo_admin_taboo_topic(logdog): + """ + Check TopicTabooPlugin returns true if topic is taboo and user is "admin". + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {"enabled": True}} + + session = Session() + session.username = "admin" + + plugin = TopicTabooPlugin(context) + assert ( + await plugin.topic_filtering(session=session, topic="prohibited") + ) is True + + # Should NOT have printed warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 0 + + +# TopicAccessControlListPlugin tests + + +def test_topic_ac_not_match(): + """ + Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match. + """ + assert ( + TopicAccessControlListPlugin.topic_ac("a/topic/to/match", "a/topic/to/notmatch") + is False + ) + + +def test_topic_ac_not_match_longer_acl(): + """ + Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match and ACL topic is longer. + """ + assert TopicAccessControlListPlugin.topic_ac("topic", "topic/is/longer") is False + + +def test_topic_ac_not_match_longer_rq(): + """ + Test TopicAccessControlListPlugin.topic_ac returns false if topics do not match and RQ topic is longer. + """ + assert TopicAccessControlListPlugin.topic_ac("topic/is/longer", "topic") is False + + +def test_topic_ac_match_exact(): + """ + Test TopicAccessControlListPlugin.topic_ac returns true if topics match exactly. + """ + assert TopicAccessControlListPlugin.topic_ac("exact/topic", "exact/topic") is True + + +def test_topic_ac_match_plus(): + """ + Test TopicAccessControlListPlugin.topic_ac correctly handles '+' wildcard. + """ + assert ( + TopicAccessControlListPlugin.topic_ac( + "a/topic/anything/value", "a/topic/+/value" + ) + is True + ) + + +def test_topic_ac_match_hash(): + """ + Test TopicAccessControlListPlugin.topic_ac correctly handles '#' wildcard. + """ + assert ( + TopicAccessControlListPlugin.topic_ac( + "topic/prefix/and/suffix", "topic/prefix/#" + ) + is True + ) + + +@pytest.mark.asyncio +async def test_taclp_empty_config(logdog): + """ + Check TopicAccessControlListPlugin returns false if topic-check absent. + """ + with logdog() as pile: + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {} + + plugin = TopicAccessControlListPlugin(context) + assert (await plugin.topic_filtering()) is False + + # Should have printed a couple of warnings + log_records = list(pile.drain(name="testlog")) + assert len(log_records) == 2 + assert ( + log_records[0].message + == "'topic-check' section not found in context configuration" + ) + assert log_records[1].message == "'auth' section not found in context configuration" + + +@pytest.mark.asyncio +async def test_taclp_true_disabled(logdog): + """ + Check TopicAccessControlListPlugin returns true if topic checking is disabled. + """ + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {"enabled": False}} + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.publish, session=session, topic="a/topic" + ) + assert authorised is True + + +@pytest.mark.asyncio +async def test_taclp_true_no_pub_acl(logdog): + """ + Check TopicAccessControlListPlugin returns true if action=publish and no publish-acl given. + (This is for backward-compatibility with existing installations.) + """ + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = {"topic-check": {"enabled": True}} + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.publish, session=session, topic="a/topic" + ) + assert authorised is True + + +@pytest.mark.asyncio +async def test_taclp_false_sub_no_topic(logdog): + """ + Check TopicAccessControlListPlugin returns false user there is no topic. + """ + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = { + "topic-check": { + "enabled": True, + "acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="" + ) + assert authorised is False + + +@pytest.mark.asyncio +async def test_taclp_false_sub_unknown_user(logdog): + """ + Check TopicAccessControlListPlugin returns false user is not listed in ACL. + """ + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = { + "topic-check": { + "enabled": True, + "acl": {"anotheruser": ["allowed/topic", "another/allowed/topic/#"]}, + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="allowed/topic" + ) + assert authorised is False + + +@pytest.mark.asyncio +async def test_taclp_false_sub_no_permission(logdog): + """ + Check TopicAccessControlListPlugin returns false if "acl" does not list allowed topic. + """ + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = { + "topic-check": { + "enabled": True, + "acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="forbidden/topic" + ) + assert authorised is False + + +@pytest.mark.asyncio +async def test_taclp_true_sub_permission(logdog): + """ + Check TopicAccessControlListPlugin returns true if "acl" lists allowed topic. + """ + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = { + "topic-check": { + "enabled": True, + "acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="allowed/topic" + ) + assert authorised is True + + +@pytest.mark.asyncio +async def test_taclp_true_pub_permission(logdog): + """ + Check TopicAccessControlListPlugin returns true if "publish-acl" lists allowed topic for publish action. + """ + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = { + "topic-check": { + "enabled": True, + "publish-acl": {"user": ["allowed/topic", "another/allowed/topic/#"]}, + } + } + + session = Session() + session.username = "user" + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.publish, session=session, topic="allowed/topic" + ) + assert authorised is True + + +@pytest.mark.asyncio +async def test_taclp_true_anon_sub_permission(logdog): + """ + Check TopicAccessControlListPlugin handles anonymous users. + """ + context = BaseContext() + context.logger = logging.getLogger("testlog") + context.config = { + "topic-check": { + "enabled": True, + "acl": {"anonymous": ["allowed/topic", "another/allowed/topic/#"]}, + } + } + + session = Session() + session.username = None + + plugin = TopicAccessControlListPlugin(context) + authorised = await plugin.topic_filtering( + action=Action.subscribe, session=session, topic="allowed/topic" + ) + assert authorised is True diff --git a/tests/test_broker.py b/tests/test_broker.py index f9689e5d..0d9d5b54 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -269,6 +269,96 @@ async def test_client_publish(broker, mock_plugin_manager): ) +@pytest.mark.asyncio +async def test_client_publish_acl_permitted(acl_broker): + sub_client = MQTTClient() + ret = await sub_client.connect("mqtt://user2:user2password@127.0.0.1:1884/") + assert ret == 0 + + ret = await sub_client.subscribe([("public/subtopic/test", QOS_0)]) + assert ret == [QOS_0] + + pub_client = MQTTClient() + ret = await pub_client.connect("mqtt://user1:user1password@127.0.0.1:1884/") + assert ret == 0 + + await pub_client.publish("public/subtopic/test", b"data", QOS_0) + + message = await sub_client.deliver_message(timeout=1) + await pub_client.disconnect() + await sub_client.disconnect() + + assert message is not None + assert message.topic == "public/subtopic/test" + assert message.data == b"data" + assert message.qos == QOS_0 + + +@pytest.mark.asyncio +async def test_client_publish_acl_forbidden(acl_broker): + sub_client = MQTTClient() + ret = await sub_client.connect("mqtt://user2:user2password@127.0.0.1:1884/") + assert ret == 0 + + ret = await sub_client.subscribe([("public/forbidden/test", QOS_0)]) + assert ret == [QOS_0] + + pub_client = MQTTClient() + ret = await pub_client.connect("mqtt://user1:user1password@127.0.0.1:1884/") + assert ret == 0 + + await pub_client.publish("public/forbidden/test", b"data", QOS_0) + + try: + await sub_client.deliver_message(timeout=1) + assert False, "Should not have worked" + except asyncio.TimeoutError: + pass + + await pub_client.disconnect() + await sub_client.disconnect() + + +@pytest.mark.asyncio +async def test_client_publish_acl_permitted_sub_forbidden(acl_broker): + sub_client1 = MQTTClient() + ret = await sub_client1.connect("mqtt://user2:user2password@127.0.0.1:1884/") + assert ret == 0 + + sub_client2 = MQTTClient() + ret = await sub_client2.connect("mqtt://user3:user3password@127.0.0.1:1884/") + assert ret == 0 + + ret = await sub_client1.subscribe([("public/subtopic/test", QOS_0)]) + assert ret == [QOS_0] + + ret = await sub_client2.subscribe([("public/subtopic/test", QOS_0)]) + assert ret == [0x80] + + pub_client = MQTTClient() + ret = await pub_client.connect("mqtt://user1:user1password@127.0.0.1:1884/") + assert ret == 0 + + await pub_client.publish("public/subtopic/test", b"data", QOS_0) + + message = await sub_client1.deliver_message(timeout=1) + + try: + await sub_client2.deliver_message(timeout=1) + assert False, "Should not have worked" + except asyncio.TimeoutError: + pass + + await pub_client.disconnect() + await sub_client1.disconnect() + await sub_client2.disconnect() + + assert message is not None + assert message.topic == "public/subtopic/test" + assert message.data == b"data" + assert message.qos == QOS_0 + + @pytest.mark.asyncio async def test_client_publish_dup(broker, event_loop): conn_reader, conn_writer = await asyncio.open_connection(