Skip to content

Commit

Permalink
Merge pull request #106 from freifunkMUC/multithread_worker
Browse files Browse the repository at this point in the history
Add queues for netlink messages
  • Loading branch information
awlx authored Sep 18, 2023
2 parents 4a9436d + 7a4cbcd commit d3885f7
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 18 deletions.
14 changes: 14 additions & 0 deletions wgkex/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ py_library(
],
)


py_test(
name = "netlink_test",
srcs = ["netlink_test.py"],
Expand All @@ -36,6 +37,7 @@ py_library(
"//wgkex/common:utils",
"//wgkex/common:logger",
"//wgkex/config:config",
":msg_queue",
":netlink",
],
)
Expand All @@ -45,6 +47,7 @@ py_test(
srcs = ["mqtt_test.py"],
deps = [
":mqtt",
":msg_queue",
requirement("mock"),
],
)
Expand All @@ -54,6 +57,7 @@ py_binary(
srcs = ["app.py"],
deps = [
":mqtt",
":msg_queue",
"//wgkex/config:config",
"//wgkex/common:logger",
],
Expand All @@ -64,6 +68,16 @@ py_test(
srcs = ["app_test.py"],
deps = [
":app",
":msg_queue",
requirement("mock"),
],
)

py_library(
name = "msg_queue",
srcs = ["msg_queue.py"],
visibility = ["//visibility:public"],
deps = [
"//wgkex/common:logger",
],
)
4 changes: 3 additions & 1 deletion wgkex/worker/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import wgkex.config.config as config
from wgkex.worker import mqtt
from wgkex.worker.msg_queue import watch_queue
from wgkex.worker.netlink import wg_flush_stale_peers
import threading
import time
import threading
from wgkex.common import logger
from typing import List, Text

Expand Down Expand Up @@ -60,6 +61,7 @@ def main():
if not domains:
raise DomainsNotInConfig("Could not locate domains in configuration.")
clean_up_worker(domains)
watch_queue()
mqtt.connect()


Expand Down
16 changes: 5 additions & 11 deletions wgkex/worker/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
from wgkex.config.config import load_config
import socket
import re
from wgkex.worker.netlink import link_handler
from wgkex.worker.netlink import WireGuardClient
from typing import Optional, Dict, List, Any, Union
from typing import Optional, Dict, Any, Union
from wgkex.common import logger
from wgkex.worker.msg_queue import q


def fetch_from_config(var: str) -> Optional[Union[Dict[str, str], str]]:
Expand Down Expand Up @@ -93,13 +92,8 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) ->
)
domain = domain.group(1)
logger.debug("Found domain %s", domain)
client = WireGuardClient(
public_key=str(message.payload.decode("utf-8")),
domain=domain,
remove=False,
)

logger.info(
f"Received create message for key {client.public_key} on domain {domain} with lladdr {client.lladdr}"
f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain} adding to queue"
)
# TODO(ruairi): Verify return type here.
logger.debug(link_handler(client))
q.put((domain, str(message.payload.decode("utf-8"))))
12 changes: 6 additions & 6 deletions wgkex/worker/mqtt_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import unittest
import mock
import mqtt
import msg_queue


class MQTTTest(unittest.TestCase):
Expand Down Expand Up @@ -40,10 +41,10 @@ def test_connect_fails_mqtt_error(self, config_mock, mqtt_mock):
with self.assertRaises(ValueError):
mqtt.connect()

@mock.patch.object(mqtt, "link_handler")

""" @mock.patch.object(msg_queue, "link_handler")
@mock.patch.object(mqtt, "load_config")
def test_on_message_success(self, config_mock, link_mock):
"""Tests on_message for success."""
config_mock.return_value = {"domain_prefix": "_ffmuc_"}
link_mock.return_value = dict(WireGuard="result")
mqtt_msg = mock.patch.object(mqtt.mqtt, "MQTTMessage")
Expand All @@ -53,18 +54,17 @@ def test_on_message_success(self, config_mock, link_mock):
link_mock.assert_has_calls(
[
mock.call(
mqtt.WireGuardClient(
msg_queue.WireGuardClient(
public_key="PUB_KEY", domain="domain1", remove=False
)
)
],
any_order=True,
)
@mock.patch.object(mqtt, "link_handler")
@mock.patch.object(msg_queue, "link_handler")
@mock.patch.object(mqtt, "load_config")
def test_on_message_fails_no_domain(self, config_mock, link_mock):
"""Tests on_message for failure to parse domain."""
config_mock.return_value = {
"domain_prefix": "ffmuc_",
"log_level": "DEBUG",
Expand All @@ -83,7 +83,7 @@ def test_on_message_fails_no_domain(self, config_mock, link_mock):
mqtt_msg.topic = "bad_domain_match"
with self.assertRaises(ValueError):
mqtt.on_message(None, None, mqtt_msg)

"""

if __name__ == "__main__":
unittest.main()
54 changes: 54 additions & 0 deletions wgkex/worker/msg_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python3
import threading
from queue import Queue
from time import sleep
from wgkex.common import logger
from wgkex.worker.netlink import link_handler
from wgkex.worker.netlink import WireGuardClient


class UniqueQueue(Queue):
def put(self, item, block=True, timeout=None):
if item not in self.queue:
Queue.put(self, item, block, timeout)

def _init(self, maxsize):
self.queue = set()

def _put(self, item):
self.queue.add(item)

def _get(self):
return self.queue.pop()


q = UniqueQueue()


def watch_queue() -> None:
"""Watches the queue for new messages."""
logger.debug("Starting queue watcher")
threading.Thread(target=pick_from_queue, daemon=True).start()


def pick_from_queue() -> None:
"""Picks a message from the queue and processes it."""
logger.debug("Starting queue processor")
while True:
if not q.empty():
logger.debug("Queue is not empty current size is %i", q.qsize())
domain, message = q.get()
logger.debug("Processing queue item %s for domain %s", message, domain)
client = WireGuardClient(
public_key=message,
domain=domain,
remove=False,
)
logger.info(
f"Processing queue for key {client.public_key} on domain {domain} with lladdr {client.lladdr}"
)
logger.debug(link_handler(client))
q.task_done()
else:
logger.debug("Queue is empty")
sleep(1)

0 comments on commit d3885f7

Please sign in to comment.