Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply ACL controls to PUBLISH operations #66

Merged
merged 33 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
980ce84
broker: Pass current action to topic filter.
sjlongland Jun 6, 2021
49e39d5
broker: Pass action=subscribe when checking topics
sjlongland Jun 6, 2021
88886db
broker: Check ACL on publish for permissions
sjlongland Jun 6, 2021
35dcccf
plugins.topic_checking: Check the `action` in `topic_acl`.
sjlongland Jun 6, 2021
6ab32a1
plugins.topic_checking: Initialise topic_config to None if not present.
sjlongland Jun 6, 2021
60f3bee
plugins.topic_checking tests: Test BaseTopicPlugin.
sjlongland Jun 6, 2021
ae27801
plugins.test_topic tests: Add tests for TopicTabooPlugin.
sjlongland Jun 6, 2021
be8f454
plugins.topic_checking tests: Throw test suite at `black`.
sjlongland Jun 6, 2021
c52a104
plugins.topic_checking: Test topic_ac static method
sjlongland Jun 6, 2021
c79ee4c
plugins.topic_checking: Return False if superclass refuses.
sjlongland Jun 6, 2021
1bc361b
plugins.topic_checking: Add remaining tests.
sjlongland Jun 6, 2021
3951ce0
plugin.topic_checking tests: Fix code formatting.
sjlongland Jun 6, 2021
3740761
test password file: Add some more users
sjlongland Jun 6, 2021
ea42422
test cases: Add fixture for ACL-enabled test server
sjlongland Jun 6, 2021
d953893
plugins.topic_checking: Clean up brackets.
sjlongland Jun 6, 2021
6256048
plugins.topic_checking tests: Fix comment statement
sjlongland Jun 6, 2021
47c407a
plugins.topic_checking tests: Drop copyright header
sjlongland Jun 6, 2021
f351627
broker tests: Add system test for publish authorisation
sjlongland Jun 6, 2021
0e03134
plugins.topic_checking tests: Add tests for enabled=False
sjlongland Jun 6, 2021
2536da8
plugins.topic_checking tests: Clean up `assert`/`await` calls
sjlongland Jun 6, 2021
2077ecf
amqtt.broker: Use Enum for topic checking action
sjlongland Jun 19, 2021
4789361
plugins.topic_checking tests: Fix tests broken by Enum change.
sjlongland Jul 4, 2021
c694765
poetry/pyproject: Pull in pytest-logdog
sjlongland Jul 4, 2021
ef1a109
test config: Enable `pytest_logdog` plug-in
sjlongland Jul 4, 2021
de8575f
plugins.topic_checking tests: Replace DummyLogger with `logdog` plug-in.
sjlongland Jul 4, 2021
cfe7ff5
plugins.topic_checking tests: Run code formatting tool.
sjlongland Jul 4, 2021
3c3ab50
broker: Pass through enum values not strings.
sjlongland Jul 4, 2021
f377605
plugins.topic_checking: De-indent logging checks
sjlongland Jul 7, 2021
a1dee1f
broker tests: Change passwords for user[1…3]
sjlongland Jul 7, 2021
245702f
Pass code through `black` formatter
sjlongland Jul 8, 2021
e4b1fc3
plugins.topic_checking tests: Fix issues identified by flake8
sjlongland Jul 8, 2021
57e88a8
broker tests: Clean up flake8 warnings
sjlongland Jul 8, 2021
a74abf7
broker tests: Fix reference to TimeoutError.
sjlongland Jul 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions amqtt/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,21 +620,32 @@ async def client_connected(
% client_session.client_id
)
break
await self.plugins_manager.fire_event(
EVENT_BROKER_MESSAGE_RECEIVED,
client_id=client_session.client_id,
message=app_message,
)
await self._broadcast_message(
client_session, app_message.topic, app_message.data

# See if the user is allowed to publish to this topic.
permitted = await self.topic_filtering(
client_session, topic=app_message.topic, action='publish'
)
if app_message.publish_packet.retain_flag:
self.retain_message(
client_session,
app_message.topic,
app_message.data,
app_message.qos,
if not permitted:
self.logger.info(
"%s forbidden TOPIC %s sent in PUBLISH message.",
client_session.client_id, app_message.topic
)
else:
await self.plugins_manager.fire_event(
EVENT_BROKER_MESSAGE_RECEIVED,
client_id=client_session.client_id,
message=app_message,
)
await self._broadcast_message(
client_session, app_message.topic, app_message.data
)
if app_message.publish_packet.retain_flag:
self.retain_message(
client_session,
app_message.topic,
app_message.data,
app_message.qos,
)
wait_deliver = asyncio.Task(
handler.mqtt_deliver_next_message(), loop=self._loop
)
Expand Down Expand Up @@ -703,7 +714,7 @@ async def authenticate(self, session: Session, listener):
# If all plugins returned True, authentication is success
return auth_result

async def topic_filtering(self, session: Session, topic):
async def topic_filtering(self, session: Session, topic, action: str):
"""
This method call the topic_filtering method on registered plugins to check that the subscription is allowed.
User is considered allowed if all plugins called return True.
Expand All @@ -713,7 +724,8 @@ async def topic_filtering(self, session: Session, topic):
- None if topic filtering can't be achieved (then plugin result is then ignored)
:param session:
:param listener:
:param topic: Topic in which the client wants to subscribe
:param topic: Topic in which the client wants to subscribe / publish
:param action: What is being done with the topic? subscribe or publish
:return:
"""
topic_plugins = None
Expand All @@ -724,6 +736,7 @@ async def topic_filtering(self, session: Session, topic):
"topic_filtering",
session=session,
topic=topic,
action=action,
filter_plugins=topic_plugins,
)
topic_result = True
Expand Down Expand Up @@ -773,7 +786,7 @@ async def add_subscription(self, subscription, session):
# [MQTT-4.7.1-3] + wildcard character must occupy entire level
return 0x80
# Check if the client is authorised to connect to the topic
permitted = await self.topic_filtering(session, topic=a_filter)
permitted = await self.topic_filtering(session, topic=a_filter, action='subscribe')
if not permitted:
return 0x80
qos = subscription[1]
Expand Down
18 changes: 17 additions & 1 deletion amqtt/plugins/topic_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def __init__(self, context):
self.context.logger.warning(
"'topic-check' section not found in context configuration"
)
self.topic_config = None

def topic_filtering(self, *args, **kwargs):
if not self.topic_config:
Expand Down Expand Up @@ -66,11 +67,24 @@ async def topic_filtering(self, *args, **kwargs):
if filter_result:
session = kwargs.get("session", None)
req_topic = kwargs.get("topic", None)
action = kwargs.get("action", None)

# hbmqtt and older amqtt do not support publish filtering
if (action == "publish") and ("publish-acl" not in self.topic_config):
# maintain backward compatibility, assume permitted
return True

if req_topic:
username = session.username
if username is None:
username = "anonymous"
allowed_topics = self.topic_config["acl"].get(username, None)

if action == "publish":
acl = self.topic_config["publish-acl"]
elif action == "subscribe":
acl = self.topic_config["acl"]

allowed_topics = acl.get(username, None)
if allowed_topics:
for allowed_topic in allowed_topics:
if self.topic_ac(req_topic, allowed_topic):
Expand All @@ -80,3 +94,5 @@ async def topic_filtering(self, *args, **kwargs):
return False
else:
return False
else:
return False
Loading