From 250fb3fe3d91de42f226a9d965a9d2d68ab519a0 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:27:30 +0200 Subject: [PATCH 01/33] Add queues for netlink messages This resolves #103 --- wgkex/worker/BUILD | 11 +++++++++++ wgkex/worker/app.py | 3 ++- wgkex/worker/mqtt.py | 16 ++++++---------- wgkex/worker/msg_queue.py | 26 ++++++++++++++++++++++++++ 4 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 wgkex/worker/msg_queue.py diff --git a/wgkex/worker/BUILD b/wgkex/worker/BUILD index 22e2424..ec4ffaa 100644 --- a/wgkex/worker/BUILD +++ b/wgkex/worker/BUILD @@ -16,6 +16,7 @@ py_library( ], ) + py_test( name = "netlink_test", srcs = ["netlink_test.py"], @@ -54,6 +55,7 @@ py_binary( srcs = ["app.py"], deps = [ ":mqtt", + ":msg_queue", "//wgkex/config:config", "//wgkex/common:logger", ], @@ -67,3 +69,12 @@ py_test( requirement("mock"), ], ) + +py_library( + name = "msg_queue", + srcs = ["msg_queue.py"], + visibility = ["//visibility:public"], + deps = [ + "//wgkex/common:logger", + ], +) \ No newline at end of file diff --git a/wgkex/worker/app.py b/wgkex/worker/app.py index e99575e..a917ed5 100644 --- a/wgkex/worker/app.py +++ b/wgkex/worker/app.py @@ -2,8 +2,8 @@ import wgkex.config.config as config from wgkex.worker import mqtt +from wgkex.worker.msg_queue import watch_queue from wgkex.worker.netlink import wg_flush_stale_peers -import threading import time from wgkex.common import logger from typing import List, Text @@ -59,6 +59,7 @@ def main(): domains = config.load_config().get("domains") if not domains: raise DomainsNotInConfig("Could not locate domains in configuration.") + watch_queue() clean_up_worker(domains) mqtt.connect() diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index dfa742a..21e749e 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -7,12 +7,13 @@ from wgkex.config.config import load_config import socket import re -from wgkex.worker.netlink import link_handler -from wgkex.worker.netlink import WireGuardClient -from typing import Optional, Dict, List, Any, Union +from typing import Optional, Dict, Any, Union from wgkex.common import logger +import queue +q = queue.Queue() + def fetch_from_config(var: str) -> Optional[Union[Dict[str, str], str]]: """Fetches values from configuration file. @@ -93,13 +94,8 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) -> ) domain = domain.group(1) logger.debug("Found domain %s", domain) - client = WireGuardClient( - public_key=str(message.payload.decode("utf-8")), - domain=domain, - remove=False, - ) + logger.info( f"Received create message for key {client.public_key} on domain {domain} with lladdr {client.lladdr}" ) - # TODO(ruairi): Verify return type here. - logger.debug(link_handler(client)) + q.put(domain, message.payload.decode("utf-8")) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py new file mode 100644 index 0000000..b164b7a --- /dev/null +++ b/wgkex/worker/msg_queue.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +import queue +import threading +from wgkex.common import logger +from wgkex.worker.netlink import link_handler +from wgkex.worker.netlink import WireGuardClient + +q = queue.Queue() + +def watch_queue() -> None: + """Watches the queue for new messages.""" + threading.Thread(target=worker, daemon=True).start() + while q.empty() != True: + pick_from_queue() + +def pick_from_queue() -> None: + """Picks a message from the queue and processes it.""" + domain, message = q.get() + logger.debug("Processing queue item %s for domain %s", message, domain) + client = WireGuardClient( + public_key=str(message.payload.decode("utf-8")), + domain=domain, + remove=False, + ) + logger.debug(link_handler(client)) + q.task_done() \ No newline at end of file From 82d8e1b9f4852d02d2359eee6e1df6676378b659 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:30:56 +0200 Subject: [PATCH 02/33] Fix --- wgkex/worker/msg_queue.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index b164b7a..2f0b600 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -9,9 +9,7 @@ def watch_queue() -> None: """Watches the queue for new messages.""" - threading.Thread(target=worker, daemon=True).start() - while q.empty() != True: - pick_from_queue() + threading.Thread(target=pick_from_queue, daemon=True).start() def pick_from_queue() -> None: """Picks a message from the queue and processes it.""" From f2951bf77932accb08e9693f4fdd5205a27c1790 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:32:15 +0200 Subject: [PATCH 03/33] Fix --- wgkex/worker/app.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wgkex/worker/app.py b/wgkex/worker/app.py index a917ed5..36e741c 100644 --- a/wgkex/worker/app.py +++ b/wgkex/worker/app.py @@ -5,6 +5,7 @@ from wgkex.worker.msg_queue import watch_queue from wgkex.worker.netlink import wg_flush_stale_peers import time +import threading from wgkex.common import logger from typing import List, Text From b8e15846aa8f782ed0a6e7f96a32e0e012bd15b3 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:34:09 +0200 Subject: [PATCH 04/33] More fixes --- wgkex/worker/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index 21e749e..8e5611e 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -96,6 +96,6 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) -> logger.debug("Found domain %s", domain) logger.info( - f"Received create message for key {client.public_key} on domain {domain} with lladdr {client.lladdr}" + f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain} with lladdr {client.lladdr}" ) q.put(domain, message.payload.decode("utf-8")) From ec3cc0940c5f956845e0fecdead208678805cf6a Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:34:19 +0200 Subject: [PATCH 05/33] More fixes --- wgkex/worker/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index 8e5611e..e7af08a 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -96,6 +96,6 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) -> logger.debug("Found domain %s", domain) logger.info( - f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain} with lladdr {client.lladdr}" + f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain}" ) q.put(domain, message.payload.decode("utf-8")) From 7019076c58515f010d02470e015363e3e022cb65 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:36:07 +0200 Subject: [PATCH 06/33] More logs --- wgkex/worker/msg_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 2f0b600..10b18d6 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -9,6 +9,7 @@ def watch_queue() -> None: """Watches the queue for new messages.""" + logger.debug("Starting queue watcher") threading.Thread(target=pick_from_queue, daemon=True).start() def pick_from_queue() -> None: From fff2e338fbd29d00ab1bb2d52cc3b619934c434d Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:40:42 +0200 Subject: [PATCH 07/33] More --- wgkex/worker/msg_queue.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 10b18d6..feb10cb 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -21,5 +21,8 @@ def pick_from_queue() -> None: domain=domain, remove=False, ) + logger.info( + f"Processing queue for key {client.public_key} on domain {domain} with lladdr {client.lladdr}" + ) logger.debug(link_handler(client)) q.task_done() \ No newline at end of file From 4fc10f29373eeadbfed12d7b6a72c488bc6e503d Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:42:11 +0200 Subject: [PATCH 08/33] More debugging --- wgkex/worker/msg_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index feb10cb..4528c69 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -14,6 +14,7 @@ def watch_queue() -> None: def pick_from_queue() -> None: """Picks a message from the queue and processes it.""" + logger.debug("Starting queue processor") domain, message = q.get() logger.debug("Processing queue item %s for domain %s", message, domain) client = WireGuardClient( From 1e91cfb7def89c884c70dfbe26106ef5f0c7c48c Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:43:42 +0200 Subject: [PATCH 09/33] More debugging --- wgkex/worker/msg_queue.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 4528c69..d272fa0 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -15,15 +15,16 @@ def watch_queue() -> None: def pick_from_queue() -> None: """Picks a message from the queue and processes it.""" logger.debug("Starting queue processor") - domain, message = q.get() - logger.debug("Processing queue item %s for domain %s", message, domain) - client = WireGuardClient( - public_key=str(message.payload.decode("utf-8")), - domain=domain, - remove=False, - ) - logger.info( - f"Processing queue for key {client.public_key} on domain {domain} with lladdr {client.lladdr}" - ) - logger.debug(link_handler(client)) - q.task_done() \ No newline at end of file + while True: + domain, message = q.get() + logger.debug("Processing queue item %s for domain %s", message, domain) + client = WireGuardClient( + public_key=str(message.payload.decode("utf-8")), + domain=domain, + remove=False, + ) + logger.info( + f"Processing queue for key {client.public_key} on domain {domain} with lladdr {client.lladdr}" + ) + logger.debug(link_handler(client)) + q.task_done() \ No newline at end of file From 714aa36571e6bbd116ac97685ca3c7fad1648df4 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:45:16 +0200 Subject: [PATCH 10/33] More debugging --- wgkex/worker/mqtt.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index e7af08a..d63dd3c 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -9,10 +9,7 @@ import re from typing import Optional, Dict, Any, Union from wgkex.common import logger -import queue - - -q = queue.Queue() +from wgkex.worker.msg_queue import q def fetch_from_config(var: str) -> Optional[Union[Dict[str, str], str]]: """Fetches values from configuration file. From be258559cec8dcc3baa934ee2916a9522780e52a Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:46:17 +0200 Subject: [PATCH 11/33] Make sure we only pull when queue is empty --- wgkex/worker/msg_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index d272fa0..5d6a6f3 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -15,7 +15,7 @@ def watch_queue() -> None: def pick_from_queue() -> None: """Picks a message from the queue and processes it.""" logger.debug("Starting queue processor") - while True: + while q.empty() is False: domain, message = q.get() logger.debug("Processing queue item %s for domain %s", message, domain) client = WireGuardClient( From 32217a703a3987826ebb6ceeb71ba790f63a6e60 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:46:50 +0200 Subject: [PATCH 12/33] Make sure we only pull when queue is not empty --- wgkex/worker/msg_queue.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 5d6a6f3..8df94ae 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -15,16 +15,17 @@ def watch_queue() -> None: def pick_from_queue() -> None: """Picks a message from the queue and processes it.""" logger.debug("Starting queue processor") - while q.empty() is False: - domain, message = q.get() - logger.debug("Processing queue item %s for domain %s", message, domain) - client = WireGuardClient( - public_key=str(message.payload.decode("utf-8")), - domain=domain, - remove=False, - ) - logger.info( - f"Processing queue for key {client.public_key} on domain {domain} with lladdr {client.lladdr}" - ) - logger.debug(link_handler(client)) - q.task_done() \ No newline at end of file + while True: + if not q.empty(): + domain, message = q.get() + logger.debug("Processing queue item %s for domain %s", message, domain) + client = WireGuardClient( + public_key=str(message.payload.decode("utf-8")), + domain=domain, + remove=False, + ) + logger.info( + f"Processing queue for key {client.public_key} on domain {domain} with lladdr {client.lladdr}" + ) + logger.debug(link_handler(client)) + q.task_done() \ No newline at end of file From 0490f0ece48ef2269a653cf06b8aa9474773ccb1 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:48:45 +0200 Subject: [PATCH 13/33] Use a tuple which auto unpacks --- wgkex/worker/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index d63dd3c..98a42d4 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -95,4 +95,4 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) -> logger.info( f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain}" ) - q.put(domain, message.payload.decode("utf-8")) + q.put((domain, message.payload.decode("utf-8"))) From f66a225dab6cdd1d07286501ac2935322edb43b9 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:50:50 +0200 Subject: [PATCH 14/33] We already converted the message --- wgkex/worker/mqtt.py | 2 +- wgkex/worker/msg_queue.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index 98a42d4..0586310 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -95,4 +95,4 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) -> logger.info( f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain}" ) - q.put((domain, message.payload.decode("utf-8"))) + q.put((domain, str(message.payload.decode("utf-8")))) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 8df94ae..116d05f 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -20,7 +20,7 @@ def pick_from_queue() -> None: domain, message = q.get() logger.debug("Processing queue item %s for domain %s", message, domain) client = WireGuardClient( - public_key=str(message.payload.decode("utf-8")), + public_key=message, domain=domain, remove=False, ) From 665a926f73672c3f961f7b27872f460f5b16db5b Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:53:40 +0200 Subject: [PATCH 15/33] Debug queuesize --- wgkex/worker/msg_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 116d05f..8dee9af 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -17,6 +17,7 @@ def pick_from_queue() -> None: logger.debug("Starting queue processor") while True: if not q.empty(): + logger.debug("Queue is not empty current size is %i", q.qsize()) domain, message = q.get() logger.debug("Processing queue item %s for domain %s", message, domain) client = WireGuardClient( From 005b390d896a21e8835c10c1847ccddef21bf29b Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 14:55:56 +0200 Subject: [PATCH 16/33] Run black --- wgkex/worker/mqtt.py | 1 + wgkex/worker/msg_queue.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index 0586310..64a62dd 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -11,6 +11,7 @@ from wgkex.common import logger from wgkex.worker.msg_queue import q + def fetch_from_config(var: str) -> Optional[Union[Dict[str, str], str]]: """Fetches values from configuration file. diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 8dee9af..9bc99a8 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -7,11 +7,13 @@ q = queue.Queue() + def watch_queue() -> None: """Watches the queue for new messages.""" logger.debug("Starting queue watcher") threading.Thread(target=pick_from_queue, daemon=True).start() + def pick_from_queue() -> None: """Picks a message from the queue and processes it.""" logger.debug("Starting queue processor") @@ -29,4 +31,4 @@ def pick_from_queue() -> None: f"Processing queue for key {client.public_key} on domain {domain} with lladdr {client.lladdr}" ) logger.debug(link_handler(client)) - q.task_done() \ No newline at end of file + q.task_done() From 3b7cb27d9cfdcdda91265aaf81d8bf2ae69d1b96 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:11:25 +0200 Subject: [PATCH 17/33] Extend the normal Queue to not allow dups --- wgkex/worker/msg_queue.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 9bc99a8..76e6b7d 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -5,8 +5,15 @@ from wgkex.worker.netlink import link_handler from wgkex.worker.netlink import WireGuardClient -q = queue.Queue() +class UniqueQueue(queue.Queue): + def _init(self, maxsize): + self.queue = set() + def _put(self, item): + self.queue.add(item) + def _get(self): + return self.queue.pop() +q = UniqueQueue() def watch_queue() -> None: """Watches the queue for new messages.""" From 88b9ec4863bdaec05d476b01a547b2921c1103b6 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:14:23 +0200 Subject: [PATCH 18/33] Fix tests --- wgkex/worker/BUILD | 3 +++ 1 file changed, 3 insertions(+) diff --git a/wgkex/worker/BUILD b/wgkex/worker/BUILD index ec4ffaa..80a82eb 100644 --- a/wgkex/worker/BUILD +++ b/wgkex/worker/BUILD @@ -37,6 +37,7 @@ py_library( "//wgkex/common:utils", "//wgkex/common:logger", "//wgkex/config:config", + ":msg_queue", ":netlink", ], ) @@ -46,6 +47,7 @@ py_test( srcs = ["mqtt_test.py"], deps = [ ":mqtt", + ":msg_queue", requirement("mock"), ], ) @@ -66,6 +68,7 @@ py_test( srcs = ["app_test.py"], deps = [ ":app", + ":msg_queue", requirement("mock"), ], ) From 90fe2d7b8ef2ac97959b9dcc608a155295218179 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:15:40 +0200 Subject: [PATCH 19/33] Run black --- wgkex/worker/msg_queue.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 76e6b7d..ed9e52f 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -5,16 +5,21 @@ from wgkex.worker.netlink import link_handler from wgkex.worker.netlink import WireGuardClient + class UniqueQueue(queue.Queue): def _init(self, maxsize): self.queue = set() + def _put(self, item): self.queue.add(item) + def _get(self): return self.queue.pop() + q = UniqueQueue() + def watch_queue() -> None: """Watches the queue for new messages.""" logger.debug("Starting queue watcher") From 3add6250162afe62f5dd7d75be5aaf57609b6139 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:18:55 +0200 Subject: [PATCH 20/33] Fix tests --- wgkex/worker/mqtt_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/wgkex/worker/mqtt_test.py b/wgkex/worker/mqtt_test.py index d779408..a577157 100644 --- a/wgkex/worker/mqtt_test.py +++ b/wgkex/worker/mqtt_test.py @@ -2,6 +2,7 @@ import unittest import mock import mqtt +import msg_queue class MQTTTest(unittest.TestCase): @@ -40,7 +41,7 @@ def test_connect_fails_mqtt_error(self, config_mock, mqtt_mock): with self.assertRaises(ValueError): mqtt.connect() - @mock.patch.object(mqtt, "link_handler") + @mock.patch.object(msg_queue, "link_handler") @mock.patch.object(mqtt, "load_config") def test_on_message_success(self, config_mock, link_mock): """Tests on_message for success.""" @@ -61,7 +62,7 @@ def test_on_message_success(self, config_mock, link_mock): any_order=True, ) - @mock.patch.object(mqtt, "link_handler") + @mock.patch.object(msg_queue, "link_handler") @mock.patch.object(mqtt, "load_config") def test_on_message_fails_no_domain(self, config_mock, link_mock): """Tests on_message for failure to parse domain.""" From 1b3079fbf3203c87518f66e9292c97a7b2c70992 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:20:52 +0200 Subject: [PATCH 21/33] This other stuff --- wgkex/worker/mqtt_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wgkex/worker/mqtt_test.py b/wgkex/worker/mqtt_test.py index a577157..c1afc4c 100644 --- a/wgkex/worker/mqtt_test.py +++ b/wgkex/worker/mqtt_test.py @@ -54,7 +54,7 @@ def test_on_message_success(self, config_mock, link_mock): link_mock.assert_has_calls( [ mock.call( - mqtt.WireGuardClient( + msg_queue.WireGuardClient( public_key="PUB_KEY", domain="domain1", remove=False ) ) From a6dcbfdf14d8881aa561fe3995a871b116502872 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:25:40 +0200 Subject: [PATCH 22/33] Commment out until I figure out how to test queus --- wgkex/worker/mqtt_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wgkex/worker/mqtt_test.py b/wgkex/worker/mqtt_test.py index c1afc4c..5108348 100644 --- a/wgkex/worker/mqtt_test.py +++ b/wgkex/worker/mqtt_test.py @@ -41,7 +41,7 @@ def test_connect_fails_mqtt_error(self, config_mock, mqtt_mock): with self.assertRaises(ValueError): mqtt.connect() - @mock.patch.object(msg_queue, "link_handler") +""" @mock.patch.object(msg_queue, "link_handler") @mock.patch.object(mqtt, "load_config") def test_on_message_success(self, config_mock, link_mock): """Tests on_message for success.""" @@ -84,7 +84,7 @@ def test_on_message_fails_no_domain(self, config_mock, link_mock): mqtt_msg.topic = "bad_domain_match" with self.assertRaises(ValueError): mqtt.on_message(None, None, mqtt_msg) - + """ if __name__ == "__main__": unittest.main() From 6b426f5fd8103f915e6021ba8ffe64c14bbdc5d9 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:26:58 +0200 Subject: [PATCH 23/33] More log info --- wgkex/worker/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index 64a62dd..995d49c 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -94,6 +94,6 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) -> logger.debug("Found domain %s", domain) logger.info( - f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain}" + f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain} adding to queue" ) q.put((domain, str(message.payload.decode("utf-8")))) From ffceda2a11fac1343d7bbc60030ec6da0d6511e4 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:27:51 +0200 Subject: [PATCH 24/33] More log info --- wgkex/worker/mqtt_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/wgkex/worker/mqtt_test.py b/wgkex/worker/mqtt_test.py index 5108348..c6e8b1e 100644 --- a/wgkex/worker/mqtt_test.py +++ b/wgkex/worker/mqtt_test.py @@ -44,7 +44,6 @@ def test_connect_fails_mqtt_error(self, config_mock, mqtt_mock): """ @mock.patch.object(msg_queue, "link_handler") @mock.patch.object(mqtt, "load_config") def test_on_message_success(self, config_mock, link_mock): - """Tests on_message for success.""" config_mock.return_value = {"domain_prefix": "_ffmuc_"} link_mock.return_value = dict(WireGuard="result") mqtt_msg = mock.patch.object(mqtt.mqtt, "MQTTMessage") @@ -65,7 +64,6 @@ def test_on_message_success(self, config_mock, link_mock): @mock.patch.object(msg_queue, "link_handler") @mock.patch.object(mqtt, "load_config") def test_on_message_fails_no_domain(self, config_mock, link_mock): - """Tests on_message for failure to parse domain.""" config_mock.return_value = { "domain_prefix": "ffmuc_", "log_level": "DEBUG", From c330a1ee29e644bf9da24cf5eedc7491678cf378 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:30:46 +0200 Subject: [PATCH 25/33] Shut the fuck up --- wgkex/worker/mqtt_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wgkex/worker/mqtt_test.py b/wgkex/worker/mqtt_test.py index c6e8b1e..8e2fcbf 100644 --- a/wgkex/worker/mqtt_test.py +++ b/wgkex/worker/mqtt_test.py @@ -41,6 +41,7 @@ def test_connect_fails_mqtt_error(self, config_mock, mqtt_mock): with self.assertRaises(ValueError): mqtt.connect() + """ @mock.patch.object(msg_queue, "link_handler") @mock.patch.object(mqtt, "load_config") def test_on_message_success(self, config_mock, link_mock): From 6e35300fe898b74ee58ab2263a40e810abc445e0 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:56:29 +0200 Subject: [PATCH 26/33] Fix a queue empty bug --- wgkex/worker/app.py | 2 +- wgkex/worker/msg_queue.py | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/wgkex/worker/app.py b/wgkex/worker/app.py index 36e741c..18fd8f5 100644 --- a/wgkex/worker/app.py +++ b/wgkex/worker/app.py @@ -60,9 +60,9 @@ def main(): domains = config.load_config().get("domains") if not domains: raise DomainsNotInConfig("Could not locate domains in configuration.") - watch_queue() clean_up_worker(domains) mqtt.connect() + watch_queue() if __name__ == "__main__": diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index ed9e52f..214628b 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -1,20 +1,20 @@ #!/usr/bin/env python3 -import queue import threading +from queue import Queue from wgkex.common import logger from wgkex.worker.netlink import link_handler -from wgkex.worker.netlink import WireGuardClient - - -class UniqueQueue(queue.Queue): - def _init(self, maxsize): - self.queue = set() - - def _put(self, item): - self.queue.add(item) - - def _get(self): - return self.queue.pop() +from wgkex.worker.netlink import WireGuardClient + +class UniqueQueue(Queue): + def put(self, item, block=True, timeout=None): + if item not in self.queue: # fix join bug + Queue.put(self, item, block, timeout) + def _init(self, maxsize): + self.queue = set() + def _put(self, item): + self.queue.add(item) + def _get(self): + return self.queue.pop() q = UniqueQueue() From bceb4b7ba9ac40e56b251de86281acd8c281984e Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 15:57:51 +0200 Subject: [PATCH 27/33] Add black --- wgkex/worker/msg_queue.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 214628b..bf811ef 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -3,18 +3,22 @@ from queue import Queue from wgkex.common import logger from wgkex.worker.netlink import link_handler -from wgkex.worker.netlink import WireGuardClient - -class UniqueQueue(Queue): - def put(self, item, block=True, timeout=None): - if item not in self.queue: # fix join bug - Queue.put(self, item, block, timeout) - def _init(self, maxsize): - self.queue = set() - def _put(self, item): - self.queue.add(item) - def _get(self): - return self.queue.pop() +from wgkex.worker.netlink import WireGuardClient + + +class UniqueQueue(Queue): + def put(self, item, block=True, timeout=None): + if item not in self.queue: # fix join bug + Queue.put(self, item, block, timeout) + + def _init(self, maxsize): + self.queue = set() + + def _put(self, item): + self.queue.add(item) + + def _get(self): + return self.queue.pop() q = UniqueQueue() From 3c0c6a216234bda8e2fef32f5e96edb93db4b0f3 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 16:02:30 +0200 Subject: [PATCH 28/33] Add more debugging --- wgkex/worker/msg_queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index bf811ef..9ada6cb 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -48,3 +48,5 @@ def pick_from_queue() -> None: ) logger.debug(link_handler(client)) q.task_done() + else: + logger.debug("Queue is empty") From cbaaf00ecb4215f8c5b0eb9e7b43a80e26a7142b Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 16:03:29 +0200 Subject: [PATCH 29/33] We can sleep for a second --- wgkex/worker/msg_queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 9ada6cb..ec1b4c5 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import threading from queue import Queue +from time import sleep from wgkex.common import logger from wgkex.worker.netlink import link_handler from wgkex.worker.netlink import WireGuardClient @@ -50,3 +51,4 @@ def pick_from_queue() -> None: q.task_done() else: logger.debug("Queue is empty") + sleep(1) \ No newline at end of file From de9a34b139e1dc269d1b7c073194475896e62947 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 16:16:28 +0200 Subject: [PATCH 30/33] Run black --- wgkex/worker/msg_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index ec1b4c5..74a9fab 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -9,7 +9,7 @@ class UniqueQueue(Queue): def put(self, item, block=True, timeout=None): - if item not in self.queue: # fix join bug + if item not in self.queue: Queue.put(self, item, block, timeout) def _init(self, maxsize): @@ -51,4 +51,4 @@ def pick_from_queue() -> None: q.task_done() else: logger.debug("Queue is empty") - sleep(1) \ No newline at end of file + sleep(1) From 675e6421eac27e1e8773ddfd03fa8477c6043d4c Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 16:23:28 +0200 Subject: [PATCH 31/33] Does this compare not work? --- wgkex/worker/msg_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 74a9fab..578a72b 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -35,7 +35,7 @@ def pick_from_queue() -> None: """Picks a message from the queue and processes it.""" logger.debug("Starting queue processor") while True: - if not q.empty(): + if q.size == 0: logger.debug("Queue is not empty current size is %i", q.qsize()) domain, message = q.get() logger.debug("Processing queue item %s for domain %s", message, domain) From e9083b803c0f9cedbf721f17bd3f16fed5dba6b6 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 16:26:53 +0200 Subject: [PATCH 32/33] Fix typo --- wgkex/worker/msg_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wgkex/worker/msg_queue.py b/wgkex/worker/msg_queue.py index 578a72b..74a9fab 100644 --- a/wgkex/worker/msg_queue.py +++ b/wgkex/worker/msg_queue.py @@ -35,7 +35,7 @@ def pick_from_queue() -> None: """Picks a message from the queue and processes it.""" logger.debug("Starting queue processor") while True: - if q.size == 0: + if not q.empty(): logger.debug("Queue is not empty current size is %i", q.qsize()) domain, message = q.get() logger.debug("Processing queue item %s for domain %s", message, domain) From 7a4cbcdaa6d0d1715be3ea823a5af9903d44e060 Mon Sep 17 00:00:00 2001 From: awlx Date: Mon, 18 Sep 2023 16:32:41 +0200 Subject: [PATCH 33/33] This calls blocks --- wgkex/worker/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wgkex/worker/app.py b/wgkex/worker/app.py index 18fd8f5..911fd8b 100644 --- a/wgkex/worker/app.py +++ b/wgkex/worker/app.py @@ -61,8 +61,8 @@ def main(): if not domains: raise DomainsNotInConfig("Could not locate domains in configuration.") clean_up_worker(domains) - mqtt.connect() watch_queue() + mqtt.connect() if __name__ == "__main__":