diff --git a/.gitignore b/.gitignore index 68bc17f..97a2b06 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,6 @@ dist/ downloads/ eggs/ .eggs/ -lib/ lib64/ parts/ sdist/ diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/moonraker_web_client.py b/lib/moonraker_web_client.py new file mode 100644 index 0000000..7f8b407 --- /dev/null +++ b/lib/moonraker_web_client.py @@ -0,0 +1,30 @@ +# SPDX-FileCopyrightText: 2024 Sebastian Andersson +# SPDX-License-Identifier: GPL-3.0-or-later + +"""Moonraker Web Client""" + +import requests + + +# pylint: disable=R0903 +class MoonrakerWebClient: + """Moonraker Web Client""" + + def __init__(self, url: str): + self.url = url + + def set_spool_and_filament(self, spool: int, filament: int): + """Calls moonraker with the current spool & filament""" + + commands = { + "commands": [ + f"SET_ACTIVE_SPOOL ID={spool}", + f"SET_ACTIVE_FILAMENT ID={filament}", + ] + } + + response = requests.post( + self.url + "/api/printer/command", timeout=10, json=commands + ) + if response.status_code != 200: + raise ValueError(f"Request to moonraker failed: {response}") diff --git a/lib/nfc_handler.py b/lib/nfc_handler.py new file mode 100644 index 0000000..84e5954 --- /dev/null +++ b/lib/nfc_handler.py @@ -0,0 +1,167 @@ +# SPDX-FileCopyrightText: 2024 Sebastian Andersson +# SPDX-License-Identifier: GPL-3.0-or-later + +""" NFC tag handling """ + +import time +from threading import Lock, Event + +import ndef +import nfc + + +SPOOL = "SPOOL" +FILAMENT = "FILAMENT" +NDEF_TEXT_TYPE = "urn:nfc:wkt:T" + + +# pylint: disable=R0902 +class NfcHandler: + """NFC Tag handling""" + + def __init__(self, nfc_device: str): + self.status = "" + self.nfc_device = nfc_device + self.on_nfc_no_tag_present = None + self.on_nfc_tag_present = None + self.should_stop_event = Event() + self.write_lock = Lock() + self.write_event = Event() + self.write_spool = None + self.write_filament = None + + def set_no_tag_present_callback(self, on_nfc_no_tag_present): + """Sets a callback that will be called when no tag is present""" + self.on_nfc_no_tag_present = on_nfc_no_tag_present + + def set_tag_present_callback(self, on_nfc_tag_present): + """Sets a callback that will be called when a tag has been read""" + self.on_nfc_tag_present = on_nfc_tag_present + + def _write_to_nfc_tag(self, tag, spool: int, filament: int) -> bool: + """Write given spool/filament ids to the tag""" + try: + if tag.ndef and tag.ndef.is_writeable: + tag.ndef.records = [ + ndef.TextRecord(f"{SPOOL}:{spool}\n{FILAMENT}:{filament}\n") + ] + return True + self.status = "Tag is write protected" + except Exception as ex: # pylint: disable=W0718 + print(ex) + self.status = "Got error while writing" + return False + + @classmethod + def get_data_from_ndef_records(cls, records: ndef.TextRecord): + """Find wanted data from the NDEF records. + + >>> import ndef + >>> record0 = ndef.TextRecord("") + >>> record1 = ndef.TextRecord("SPOOL:23\\n") + >>> record2 = ndef.TextRecord("FILAMENT:14\\n") + >>> record3 = ndef.TextRecord("SPOOL:23\\nFILAMENT:14\\n") + >>> NfcHandler.get_data_from_ndef_records([record0]) + (None, None) + >>> NfcHandler.get_data_from_ndef_records([record3]) + ('23', '14') + >>> NfcHandler.get_data_from_ndef_records([record1]) + ('23', None) + >>> NfcHandler.get_data_from_ndef_records([record2]) + (None, '14') + >>> NfcHandler.get_data_from_ndef_records([record0, record3]) + ('23', '14') + >>> NfcHandler.get_data_from_ndef_records([record3, record0]) + ('23', '14') + >>> NfcHandler.get_data_from_ndef_records([record1, record2]) + ('23', '14') + >>> NfcHandler.get_data_from_ndef_records([record2, record1]) + ('23', '14') + """ + + spool = None + filament = None + + for record in records: + if record.type == NDEF_TEXT_TYPE: + for line in record.text.splitlines(): + line = line.split(":") + if len(line) == 2: + if line[0] == SPOOL: + spool = line[1] + if line[0] == FILAMENT: + filament = line[1] + else: + print(f"Read other record: {record}", flush=True) + + return spool, filament + + def on_nfc_connect(self, tag) -> bool: + """Handles a read tag""" + + if tag.ndef is None: + print("The tag doesn't have NDEF records", flush=True) + if self.on_nfc_no_tag_present: + self.on_nfc_no_tag_present() + return True + + spool, filament = NfcHandler.get_data_from_ndef_records(tag.ndef.records) + + if self.on_nfc_tag_present: + self.on_nfc_tag_present(spool, filament) + + # Don't let connect return until the tag is removed: + return True + + def _set_write_info(self, spool, filament): + if self.write_lock.acquire(): # pylint: disable=R1732 + self.write_spool = spool + self.write_filament = filament + self.write_event.clear() + self.write_lock.release() + + def write_to_tag(self, spool: int, filament: int) -> bool: + """Writes spool & filament info to tag. Returns true if worked.""" + + self._set_write_info(spool, filament) + + if self.write_event.wait(timeout=30): + return True + + self._set_write_info(None, None) + + return False + + def stop(self): + """Call to stop the handler""" + self.should_stop_event.set() + + def run(self): + """Run the NFC handler, won't return""" + # Open NFC reader. Will throw an exception if it fails. + with nfc.ContactlessFrontend(self.nfc_device) as clf: + while not self.should_stop_event.is_set(): + tag = clf.connect(rdwr={"on-connect": lambda tag: False}) + if tag: + if self.write_lock.acquire(): # pylint: disable=R1732 + if self.write_spool: + if self._write_to_nfc_tag( + tag, self.write_spool, self.write_filament + ): + self.write_event.set() + self.write_lock.write_spool = None + self.write_lock.write_filament = None + self.write_lock.release() + if tag.ndef is None: + if self.on_nfc_no_tag_present: + self.on_nfc_no_tag_present() + else: + if self.on_nfc_tag_present: + spool, filament = NfcHandler.get_data_from_ndef_records( + tag.ndef.records + ) + self.on_nfc_tag_present(spool, filament) + while clf.connect(rdwr={"on-connect": lambda tag: False}): + time.sleep(0.1) + else: + time.sleep(0.1) diff --git a/lib/spoolman_client.py b/lib/spoolman_client.py new file mode 100644 index 0000000..7154e23 --- /dev/null +++ b/lib/spoolman_client.py @@ -0,0 +1,26 @@ +# SPDX-FileCopyrightText: 2024 Sebastian Andersson +# SPDX-License-Identifier: GPL-3.0-or-later + +"""Spoolman client""" + +import json +import requests + + +# pylint: disable=R0903 +class SpoolmanClient: + """Spoolman Web Client""" + + def __init__(self, url: str): + if url.endswith("/"): + url = url[:-1] + self.url = url + + def get_spools(self): + """Get the spools from spoolman""" + url = self.url + "/api/v1/spool" + response = requests.get(url, timeout=10) + if response.status_code != 200: + raise ValueError(f"Request to spoolman failed: {response}") + records = json.loads(response.text) + return records diff --git a/nfc2klipper_with_webservice-config.json5 b/nfc2klipper_with_webservice-config.json5 new file mode 100755 index 0000000..b2f0eba --- /dev/null +++ b/nfc2klipper_with_webservice-config.json5 @@ -0,0 +1,21 @@ +{ + /// The address the web server listens to, + /// use 0.0.0.0 for all IPv4 + "web_address": "0.0.0.0", + + /// The port the web server listens to + "web_port": 5001, + + /// Clear the spool & filament info if no tag can be read + "clear-spool": false, + + /// Which NFC reader to use, see + /// https://nfcpy.readthedocs.io/en/latest/topics/get-started.html#open-a-local-device + "nfc-device": "ttyAMA0", + + /// URL for the moonraker installation + "moonraker-url": "http://mainsailos.local", + + /// URL for the moonraker installation + "spoolman-url": "http://mainsailos.local:7912", +} diff --git a/nfc2klipper_with_webservice.py b/nfc2klipper_with_webservice.py new file mode 100755 index 0000000..6b33ac1 --- /dev/null +++ b/nfc2klipper_with_webservice.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: 2024 Sebastian Andersson +# SPDX-License-Identifier: GPL-3.0-or-later + +"""Program to set current filament & spool in klipper, and write to tags. """ + +import threading + +from flask import Flask, render_template +import json5 + +from lib.moonraker_web_client import MoonrakerWebClient +from lib.nfc_handler import NfcHandler +from lib.spoolman_client import SpoolmanClient + +SPOOL = "SPOOL" +FILAMENT = "FILAMENT" +NDEF_TEXT_TYPE = "urn:nfc:wkt:T" + +with open("nfc2klipper_with_webservice-config.json5", "r", encoding="utf-8") as fp: + args = json5.load(fp) + +spoolman = SpoolmanClient(args["spoolman-url"]) +moonraker = MoonrakerWebClient(args["moonraker-url"]) +nfc_handler = NfcHandler(args["nfc-device"]) + + +app = Flask(__name__) + + +def set_spool_and_filament(spool: int, filament: int): + """Calls moonraker with the current spool & filament""" + + if "old_spool" not in set_spool_and_filament.__dict__: + set_spool_and_filament.old_spool = None + set_spool_and_filament.old_filament = None + + if ( + set_spool_and_filament.old_spool == spool + and set_spool_and_filament.old_filament == filament + ): + print("Read same spool & filament", flush=True) + return + + print(f"Sending spool #{spool}, filament #{filament} to klipper", flush=True) + + # In case the post fails, we might not know if the server has received + # it or not, so set them to None: + set_spool_and_filament.old_spool = None + set_spool_and_filament.old_filament = None + + try: + moonraker.set_spool_and_filament(spool, filament) + except Exception as ex: # pylint: disable=W0718 + print(ex) + return + + set_spool_and_filament.old_spool = spool + set_spool_and_filament.old_filament = filament + + +@app.route("/w//") +def write_tag(spool, filament): + """ + The web-api to write the spool & filament data to NFC/RFID tag + """ + print(f" write spool={spool}, filament={filament}") + if nfc_handler.write_to_tag(spool, filament): + return "OK" + return ("Failed to write to tag", 502) + + +@app.route("/") +def index(): + """ + Returns the main index page. + """ + spools = spoolman.get_spools() + + return render_template("index.html", spools=spools) + + +def on_nfc_tag_present(spool, filament): + """Handles a read tag""" + + if not args.get("clear_spool"): + if not (spool and filament): + print("Did not find spool and filament records in tag", flush=True) + if args.get("clear_spool") or (spool and filament): + if not spool: + spool = 0 + if not filament: + filament = 0 + set_spool_and_filament(spool, filament) + + +def on_nfc_no_tag_present(): + """Called when no tag is present (or tag without data)""" + if args.get("clear_spool"): + set_spool_and_filament(0, 0) + + +if __name__ == "__main__": + + if args.get("clear_spool"): + # Start by unsetting current spool & filament: + set_spool_and_filament(0, 0) + + nfc_handler.set_no_tag_present_callback(on_nfc_no_tag_present) + nfc_handler.set_tag_present_callback(on_nfc_tag_present) + + thread = threading.Thread(target=nfc_handler.run) + thread.daemon = True + print("Starting nfc-handler") + thread.start() + + print("Starting web server") + try: + app.run(args["web_address"], port=args["web_port"]) + except Exception: + nfc_handler.stop() + thread.join() + raise diff --git a/requirements.txt b/requirements.txt index 72037e9..7472e46 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +flask==3.0.3 +json5==0.9.25 nfcpy==1.0.4 -requests==2.31.0 npyscreen==4.10.5 +requests==2.31.0 diff --git a/templates/index.html b/templates/index.html new file mode 100755 index 0000000..b179940 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,56 @@ + + + + + + + + NFC2Klipper + + + + +
+

NFC22Klipper

+ +
    + {% for spool in spools|reverse %} +
  • + + {{spool['id']}}: {{spool['filament']['vendor']['name']}} - {{spool['filament']['name']}} +
  • + {% endfor %} +
+
+ + diff --git a/write_tags.py b/write_tags.py index 3545e01..2953bd3 100755 --- a/write_tags.py +++ b/write_tags.py @@ -113,9 +113,7 @@ def on_nfc_connect(self, tag, spool: int, filament: int) -> bool: def write_tag(self, record): """Write the choosen records's data to the tag""" - npyscreen.notify( - "Writing " + record_to_text(record), title="Writing to tag" - ) + npyscreen.notify("Writing " + record_to_text(record), title="Writing to tag") spool = record["id"] filament = record["filament"]["id"] @@ -124,11 +122,7 @@ def write_tag(self, record): clf = nfc.ContactlessFrontend(args.nfc_device) clf.connect( - rdwr={ - "on-connect": lambda tag: self.on_nfc_connect( - tag, spool, filament - ) - } + rdwr={"on-connect": lambda tag: self.on_nfc_connect(tag, spool, filament)} ) clf.close() npyscreen.notify(self.status, title="Writing to tag")