-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP: Add web service to nfc2klipper for writing tags
- Loading branch information
Showing
10 changed files
with
403 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,6 @@ dist/ | |
downloads/ | ||
eggs/ | ||
.eggs/ | ||
lib/ | ||
lib64/ | ||
parts/ | ||
sdist/ | ||
|
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
"""Moonraker Web Client""" | ||
|
||
import requests | ||
|
||
|
||
# pylint: disable=R0903 | ||
class MoonrakerWebClient: | ||
"""Moonraker Web Client""" | ||
|
||
def __init__(self, url: str): | ||
self.url = url | ||
|
||
def set_spool_and_filament(self, spool: int, filament: int): | ||
"""Calls moonraker with the current spool & filament""" | ||
|
||
commands = { | ||
"commands": [ | ||
f"SET_ACTIVE_SPOOL ID={spool}", | ||
f"SET_ACTIVE_FILAMENT ID={filament}", | ||
] | ||
} | ||
|
||
response = requests.post( | ||
self.url + "/api/printer/command", timeout=10, json=commands | ||
) | ||
if response.status_code != 200: | ||
raise ValueError(f"Request to moonraker failed: {response}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
""" NFC tag handling """ | ||
|
||
from threading import Lock | ||
|
||
import ndef | ||
import nfc | ||
|
||
|
||
SPOOL = "SPOOL" | ||
FILAMENT = "FILAMENT" | ||
NDEF_TEXT_TYPE = "urn:nfc:wkt:T" | ||
|
||
|
||
class NfcHandler: | ||
"""NFC Tag handling""" | ||
|
||
def __init__(self, nfc_device: str): | ||
self.status = "" | ||
self.nfc_device = nfc_device | ||
self.spool = None | ||
self.filament = None | ||
self.write_lock = Lock() | ||
self.write_lock.acquire() # pylint: disable=R1732 | ||
self.on_nfc_no_tag_present = None | ||
self.on_nfc_tag_present = None | ||
|
||
def set_no_tag_present_callback(self, on_nfc_no_tag_present): | ||
"""Sets a callback that will be called when no tag is present""" | ||
self.on_nfc_no_tag_present = on_nfc_no_tag_present | ||
|
||
def set_tag_present_callback(self, on_nfc_tag_present): | ||
"""Sets a callback that will be called when a tag has been read""" | ||
self.on_nfc_tag_present = on_nfc_tag_present | ||
|
||
def _on_nfc_connect(self, tag, spool: int, filament: int) -> bool: | ||
"""Write given spool/filament ids to the tag""" | ||
try: | ||
if tag.ndef and tag.ndef.is_writeable: | ||
tag.ndef.records = [ | ||
ndef.TextRecord(f"{SPOOL}:{spool}\n{FILAMENT}:{filament}\n") | ||
] | ||
return True | ||
self.status = "Tag is write protected" | ||
except Exception as ex: # pylint: disable=W0718 | ||
print(ex) | ||
self.status = "Got error while writing" | ||
return False | ||
|
||
def write_tag(self, record): | ||
"""Write the choosen records's data to the tag""" | ||
|
||
spool = record["id"] | ||
filament = record["filament"]["id"] | ||
|
||
self.status = "Written" | ||
|
||
clf = nfc.ContactlessFrontend(self.nfc_device) | ||
clf.connect( | ||
rdwr={"on-connect": lambda tag: self._on_nfc_connect(tag, spool, filament)} | ||
) | ||
clf.close() | ||
|
||
@classmethod | ||
def get_data_from_ndef_records(cls, records: ndef.TextRecord): | ||
"""Find wanted data from the NDEF records. | ||
>>> import ndef | ||
>>> record0 = ndef.TextRecord("") | ||
>>> record1 = ndef.TextRecord("SPOOL:23\\n") | ||
>>> record2 = ndef.TextRecord("FILAMENT:14\\n") | ||
>>> record3 = ndef.TextRecord("SPOOL:23\\nFILAMENT:14\\n") | ||
>>> NfcHandler.get_data_from_ndef_records([record0]) | ||
(None, None) | ||
>>> NfcHandler.get_data_from_ndef_records([record3]) | ||
('23', '14') | ||
>>> NfcHandler.get_data_from_ndef_records([record1]) | ||
('23', None) | ||
>>> NfcHandler.get_data_from_ndef_records([record2]) | ||
(None, '14') | ||
>>> NfcHandler.get_data_from_ndef_records([record0, record3]) | ||
('23', '14') | ||
>>> NfcHandler.get_data_from_ndef_records([record3, record0]) | ||
('23', '14') | ||
>>> NfcHandler.get_data_from_ndef_records([record1, record2]) | ||
('23', '14') | ||
>>> NfcHandler.get_data_from_ndef_records([record2, record1]) | ||
('23', '14') | ||
""" | ||
|
||
spool = None | ||
filament = None | ||
|
||
for record in records: | ||
if record.type == NDEF_TEXT_TYPE: | ||
for line in record.text.splitlines(): | ||
line = line.split(":") | ||
if len(line) == 2: | ||
if line[0] == SPOOL: | ||
spool = line[1] | ||
if line[0] == FILAMENT: | ||
filament = line[1] | ||
else: | ||
print(f"Read other record: {record}", flush=True) | ||
|
||
return spool, filament | ||
|
||
def on_nfc_connect(self, tag) -> bool: | ||
"""Handles a read tag""" | ||
|
||
if tag.ndef is None: | ||
print("The tag doesn't have NDEF records", flush=True) | ||
if self.on_nfc_no_tag_present: | ||
self.on_nfc_no_tag_present() | ||
return True | ||
|
||
spool, filament = NfcHandler.get_data_from_ndef_records(tag.ndef.records) | ||
|
||
if self.on_nfc_tag_present: | ||
self.on_nfc_tag_present(spool, filament) | ||
|
||
# Don't let connect return until the tag is removed: | ||
return True | ||
|
||
def write_to_tag(self, spool: int, filament: int) -> bool: | ||
"""Writes spool & filament info to tag. Returns true if worked.""" | ||
self.spool = spool | ||
self.filament = filament | ||
if self.write_lock.acquire(timeout=30): # pylint: disable=R1732 | ||
return True | ||
self.spool = None | ||
self.filament = None | ||
return False | ||
|
||
def run(self): | ||
"""Run the NFC handler, won't return""" | ||
# Open NFC reader. Will throw an exception if it fails. | ||
print(self.nfc_device) | ||
clf = nfc.ContactlessFrontend(self.nfc_device) | ||
|
||
# TODO: Implement proper handling | ||
|
||
while True: | ||
clf.connect(rdwr={"on-connect": self.on_nfc_connect}) | ||
# No tag connected anymore. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
"""Spoolman client""" | ||
|
||
import json | ||
import requests | ||
|
||
|
||
# pylint: disable=R0903 | ||
class SpoolmanClient: | ||
"""Spoolman Web Client""" | ||
|
||
def __init__(self, url: str): | ||
if url.endswith("/"): | ||
url = url[:-1] | ||
self.url = url | ||
|
||
def get_spools(self): | ||
"""Get the spools from spoolman""" | ||
url = self.url + "/api/v1/spool" | ||
response = requests.get(url, timeout=10) | ||
if response.status_code != 200: | ||
raise ValueError(f"Request to spoolman failed: {response}") | ||
records = json.loads(response.text) | ||
return records |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
{ | ||
/// The address the web server listens to, | ||
/// use 0.0.0.0 for all IPv4 | ||
"web_address": "0.0.0.0", | ||
|
||
/// The port the web server listens to | ||
"web_port": 5001, | ||
|
||
/// Clear the spool & filament info if no tag can be read | ||
"clear-spool": false, | ||
|
||
/// Which NFC reader to use, see | ||
/// https://nfcpy.readthedocs.io/en/latest/topics/get-started.html#open-a-local-device | ||
"nfc-device": "ttyAMA0", | ||
|
||
/// URL for the moonraker installation | ||
"moonraker-url": "http://mainsailos.local", | ||
|
||
/// URL for the moonraker installation | ||
"spoolman-url": "http://mainsailos.local:7912", | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
#!/usr/bin/env python3 | ||
|
||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
"""Program to set current filament & spool in klipper, and write to tags. """ | ||
|
||
import threading | ||
|
||
from flask import Flask, render_template | ||
import json5 | ||
|
||
from lib.moonraker_web_client import MoonrakerWebClient | ||
from lib.nfc_handler import NfcHandler | ||
from lib.spoolman_client import SpoolmanClient | ||
|
||
SPOOL = "SPOOL" | ||
FILAMENT = "FILAMENT" | ||
NDEF_TEXT_TYPE = "urn:nfc:wkt:T" | ||
|
||
with open("nfc2klipper_with_webservice-config.json5", "r", encoding="utf-8") as fp: | ||
args = json5.load(fp) | ||
|
||
spoolman = SpoolmanClient(args["spoolman-url"]) | ||
moonraker = MoonrakerWebClient(args["moonraker-url"]) | ||
nfc_handler = NfcHandler(args["nfc-device"]) | ||
|
||
|
||
app = Flask(__name__) | ||
|
||
|
||
def set_spool_and_filament(spool: int, filament: int): | ||
"""Calls moonraker with the current spool & filament""" | ||
|
||
if "old_spool" not in set_spool_and_filament.__dict__: | ||
set_spool_and_filament.old_spool = None | ||
set_spool_and_filament.old_filament = None | ||
|
||
if ( | ||
set_spool_and_filament.old_spool == spool | ||
and set_spool_and_filament.old_filament == filament | ||
): | ||
print("Read same spool & filament", flush=True) | ||
return | ||
|
||
print(f"Sending spool #{spool}, filament #{filament} to klipper", flush=True) | ||
|
||
# In case the post fails, we might not know if the server has received | ||
# it or not, so set them to None: | ||
set_spool_and_filament.old_spool = None | ||
set_spool_and_filament.old_filament = None | ||
|
||
try: | ||
moonraker.set_spool_and_filament(spool, filament) | ||
except Exception as ex: # pylint: disable=W0718 | ||
print(ex) | ||
return | ||
|
||
set_spool_and_filament.old_spool = spool | ||
set_spool_and_filament.old_filament = filament | ||
|
||
|
||
@app.route("/w/<int:spool>/<int:filament>") | ||
def write_tag(spool, filament): | ||
""" | ||
The web-api to write the spool & filament data to NFC/RFID tag | ||
""" | ||
print(f" write spool={spool}, filament={filament}") | ||
if nfc_handler.write_to_tag(spool, filament): | ||
return "OK" | ||
return ("Failed to write to tag", 502) | ||
|
||
|
||
@app.route("/") | ||
def index(): | ||
""" | ||
Returns the main index page. | ||
""" | ||
spools = spoolman.get_spools() | ||
|
||
return render_template("index.html", spools=spools) | ||
|
||
|
||
def on_nfc_tag_present(spool, filament): | ||
"""Handles a read tag""" | ||
|
||
if not args.get("clear_spool"): | ||
if not (spool and filament): | ||
print("Did not find spool and filament records in tag", flush=True) | ||
if args.get("clear_spool") or (spool and filament): | ||
if not spool: | ||
spool = 0 | ||
if not filament: | ||
filament = 0 | ||
set_spool_and_filament(spool, filament) | ||
|
||
|
||
def on_nfc_no_tag_present(): | ||
""" Called when no tag is present (or tag without data) """ | ||
if args.get("clear_spool"): | ||
set_spool_and_filament(0, 0) | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
if args.get("clear_spool"): | ||
# Start by unsetting current spool & filament: | ||
set_spool_and_filament(0, 0) | ||
|
||
nfc_handler.set_no_tag_present_callback(on_nfc_no_tag_present) | ||
nfc_handler.set_tag_present_callback(on_nfc_tag_present) | ||
|
||
thread = threading.Thread(target=nfc_handler.run) | ||
print("Starting nfc-handler") | ||
thread.run() | ||
|
||
print("Starting web server") | ||
app.run(args["web_address"], port=args["web_port"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
flask==3.0.3 | ||
json5==0.9.25 | ||
nfcpy==1.0.4 | ||
requests==2.31.0 | ||
npyscreen==4.10.5 | ||
requests==2.31.0 |
Oops, something went wrong.