-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP: Add web service to nfc2klipper for writing tags
- Loading branch information
Showing
9 changed files
with
433 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,6 @@ dist/ | |
downloads/ | ||
eggs/ | ||
.eggs/ | ||
lib/ | ||
lib64/ | ||
parts/ | ||
sdist/ | ||
|
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
"""Moonraker Web Client""" | ||
|
||
import requests | ||
|
||
|
||
# pylint: disable=R0903 | ||
class MoonrakerWebClient: | ||
"""Moonraker Web Client""" | ||
|
||
def __init__(self, url: str): | ||
self.url = url | ||
|
||
def set_spool_and_filament(self, spool: int, filament: int): | ||
"""Calls moonraker with the current spool & filament""" | ||
|
||
commands = { | ||
"commands": [ | ||
f"SET_ACTIVE_SPOOL ID={spool}", | ||
f"SET_ACTIVE_FILAMENT ID={filament}", | ||
] | ||
} | ||
|
||
response = requests.post( | ||
self.url + "/api/printer/command", timeout=10, json=commands | ||
) | ||
if response.status_code != 200: | ||
raise ValueError(f"Request to moonraker failed: {response}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
""" NFC tag handling """ | ||
|
||
import ndef | ||
import nfc | ||
|
||
|
||
SPOOL = "SPOOL" | ||
FILAMENT = "FILAMENT" | ||
NDEF_TEXT_TYPE = "urn:nfc:wkt:T" | ||
|
||
|
||
class NfcHandler: | ||
"""NFC Tag handling""" | ||
|
||
def __init__(self, nfc_device: str): | ||
self.status = "" | ||
self.nfc_device = nfc_device | ||
|
||
def _on_nfc_connect(self, tag, spool: int, filament: int) -> bool: | ||
"""Write given spool/filament ids to the tag""" | ||
try: | ||
if tag.ndef and tag.ndef.is_writeable: | ||
tag.ndef.records = [ | ||
ndef.TextRecord(f"{SPOOL}:{spool}\n{FILAMENT}:{filament}\n") | ||
] | ||
return True | ||
self.status = "Tag is write protected" | ||
except Exception as ex: # pylint: disable=W0718 | ||
print(ex) | ||
self.status = "Got error while writing" | ||
return False | ||
|
||
def write_tag(self, record): | ||
"""Write the choosen records's data to the tag""" | ||
|
||
spool = record["id"] | ||
filament = record["filament"]["id"] | ||
|
||
self.status = "Written" | ||
|
||
clf = nfc.ContactlessFrontend(self.nfc_device) | ||
clf.connect( | ||
rdwr={"on-connect": lambda tag: self._on_nfc_connect(tag, spool, filament)} | ||
) | ||
clf.close() | ||
|
||
@classmethod | ||
def _get_data_from_ndef_records( | ||
cls, records: ndef.TextRecord | ||
) -> tuple[None | int, None | int]: | ||
"""Find wanted data from the NDEF records. | ||
>>> import ndef | ||
>>> record0 = ndef.TextRecord("") | ||
>>> record1 = ndef.TextRecord("SPOOL:23\\n") | ||
>>> record2 = ndef.TextRecord("FILAMENT:14\\n") | ||
>>> record3 = ndef.TextRecord("SPOOL:23\\nFILAMENT:14\\n") | ||
>>> _get_data_from_ndef_records([record0]) | ||
(None, None) | ||
>>> _get_data_from_ndef_records([record3]) | ||
('23', '14') | ||
>>> _get_data_from_ndef_records([record1]) | ||
('23', None) | ||
>>> _get_data_from_ndef_records([record2]) | ||
(None, '14') | ||
>>> _get_data_from_ndef_records([record0, record3]) | ||
('23', '14') | ||
>>> _get_data_from_ndef_records([record3, record0]) | ||
('23', '14') | ||
>>> _get_data_from_ndef_records([record1, record2]) | ||
('23', '14') | ||
>>> _get_data_from_ndef_records([record2, record1]) | ||
('23', '14') | ||
""" | ||
|
||
spool = None | ||
filament = None | ||
|
||
for record in records: | ||
if record.type == NDEF_TEXT_TYPE: | ||
for line in record.text.splitlines(): | ||
line = line.split(":") | ||
if len(line) == 2: | ||
if line[0] == SPOOL: | ||
spool = line[1] | ||
if line[0] == FILAMENT: | ||
filament = line[1] | ||
else: | ||
print(f"Read other record: {record}", flush=True) | ||
|
||
return spool, filament | ||
|
||
def on_nfc_connect(self, tag) -> bool: | ||
"""Handles a read tag""" | ||
|
||
if tag.ndef is None: | ||
print("The tag doesn't have NDEF records", flush=True) | ||
return True | ||
|
||
_spool, _filament = NfcHandler._get_data_from_ndef_records(tag.ndef.records) | ||
|
||
# if not args.clear: | ||
# if not (spool and filament): | ||
# print("Did not find spool and filament records in tag", flush=True) | ||
# if args.clear or (spool and filament): | ||
# if not spool: | ||
# spool = 0 | ||
# if not filament: | ||
# filament = 0 | ||
# set_spool_and_filament(args.url, spool, filament) | ||
|
||
# Don't let connect return until the tag is removed: | ||
return True | ||
|
||
def run(self): | ||
""" Run the NFC handler, won't return """ | ||
# Open NFC reader. Will throw an exception if it fails. | ||
clf = nfc.ContactlessFrontend(self.nfc_device) | ||
|
||
while True: | ||
clf.connect(rdwr={"on-connect": self.on_nfc_connect}) | ||
# No tag connected anymore. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
"""Spoolman client""" | ||
|
||
import json | ||
import requests | ||
|
||
|
||
# pylint: disable=R0903 | ||
class SpoolmanClient: | ||
"""Spoolman Web Client""" | ||
|
||
def __init__(self, url: str): | ||
if url.endswith("/"): | ||
url = url[:-1] | ||
self.url = url | ||
|
||
def get_spools(self): | ||
"""Get the spools from spoolman""" | ||
url = self.url + "/api/v1/spool" | ||
response = requests.get(url, timeout=10) | ||
if response.status_code != 200: | ||
raise ValueError(f"Request to spoolman failed: {response}") | ||
records = json.loads(response.text) | ||
return records |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
{ | ||
/// The address the web server listens to, | ||
/// use 0.0.0.0 for all IPv4 | ||
"web_address": "0.0.0.0", | ||
|
||
/// The port the web server listens to | ||
"web_port": 5001, | ||
|
||
/// Clear the spool & filament info if no tag can be read | ||
"clear-spool": false, | ||
|
||
/// Which NFC reader to use, see | ||
/// https://nfcpy.readthedocs.io/en/latest/topics/get-started.html#open-a-local-device | ||
"nfc-device": "ttyAMA0", | ||
|
||
/// URL for the moonraker installation | ||
"moonraker-url": "http://mainsailos.local", | ||
|
||
/// URL for the moonraker installation | ||
"spoolman-url": "http://mainsailos.local:7912", | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
#!/usr/bin/env python3 | ||
|
||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
# TODO: Spawn a thread for the NFC handling. | ||
# TODO: While connected, periodically check if a tag should be written. | ||
# TODO: Wait in web client tell NFC handler to write tag, wait for it to be done. | ||
# TODO: When written tell web client its done. | ||
# TODO: Improve page layout. | ||
# TODO: Fix web server. | ||
|
||
"""Program to set current filament & spool in klipper, and write to tags. """ | ||
|
||
from flask import Flask, render_template | ||
import json5 | ||
import requests | ||
|
||
from lib.moonraker_web_client import MoonrakerWebClient | ||
from lib.nfc_handler import NfcHandler | ||
from lib.spoolman_client import SpoolmanClient | ||
|
||
SPOOL = "SPOOL" | ||
FILAMENT = "FILAMENT" | ||
NDEF_TEXT_TYPE = "urn:nfc:wkt:T" | ||
|
||
with open("nfc2klipper_with_webservice-config.json5", "r", encoding="utf-8") as fp: | ||
args = json5.load(fp) | ||
|
||
spoolman = SpoolmanClient(args['spoolman-url']) | ||
moonraker = MoonrakerWebClient(args['moonraker-url']) | ||
|
||
app = Flask(__name__) | ||
|
||
|
||
def set_spool_and_filament(url: str, spool: int, filament: int): | ||
"""Calls moonraker with the current spool & filament""" | ||
|
||
if "old_spool" not in set_spool_and_filament.__dict__: | ||
set_spool_and_filament.old_spool = None | ||
set_spool_and_filament.old_filament = None | ||
|
||
if ( | ||
set_spool_and_filament.old_spool == spool | ||
and set_spool_and_filament.old_filament == filament | ||
): | ||
print("Read same spool & filament", flush=True) | ||
return | ||
|
||
print(f"Sending spool #{spool}, filament #{filament} to klipper", flush=True) | ||
|
||
commands = { | ||
"commands": [ | ||
f"SET_ACTIVE_SPOOL ID={spool}", | ||
f"SET_ACTIVE_FILAMENT ID={filament}", | ||
] | ||
} | ||
|
||
# In case the post fails, we might not know if the server has received | ||
# it or not, so set them to None: | ||
set_spool_and_filament.old_spool = None | ||
set_spool_and_filament.old_filament = None | ||
|
||
try: | ||
response = requests.post( | ||
url + "/api/printer/command", timeout=10, json=commands | ||
) | ||
if response.status_code != 200: | ||
raise ValueError(f"Request to moonraker failed: {response}") | ||
except Exception as ex: # pylint: disable=W0718 | ||
print(ex) | ||
|
||
set_spool_and_filament.old_spool = spool | ||
set_spool_and_filament.old_filament = filament | ||
|
||
|
||
def get_data_from_ndef_records(records): | ||
"""Find wanted data from the NDEF records. | ||
>>> import ndef | ||
>>> record0 = ndef.TextRecord("") | ||
>>> record1 = ndef.TextRecord("SPOOL:23\\n") | ||
>>> record2 = ndef.TextRecord("FILAMENT:14\\n") | ||
>>> record3 = ndef.TextRecord("SPOOL:23\\nFILAMENT:14\\n") | ||
>>> get_data_from_ndef_records([record0]) | ||
(None, None) | ||
>>> get_data_from_ndef_records([record3]) | ||
('23', '14') | ||
>>> get_data_from_ndef_records([record1]) | ||
('23', None) | ||
>>> get_data_from_ndef_records([record2]) | ||
(None, '14') | ||
>>> get_data_from_ndef_records([record0, record3]) | ||
('23', '14') | ||
>>> get_data_from_ndef_records([record3, record0]) | ||
('23', '14') | ||
>>> get_data_from_ndef_records([record1, record2]) | ||
('23', '14') | ||
>>> get_data_from_ndef_records([record2, record1]) | ||
('23', '14') | ||
""" | ||
|
||
spool = None | ||
filament = None | ||
|
||
for record in records: | ||
if record.type == NDEF_TEXT_TYPE: | ||
for line in record.text.splitlines(): | ||
line = line.split(":") | ||
if len(line) == 2: | ||
if line[0] == SPOOL: | ||
spool = line[1] | ||
if line[0] == FILAMENT: | ||
filament = line[1] | ||
else: | ||
print(f"Read other record: {record}", flush=True) | ||
|
||
return spool, filament | ||
|
||
|
||
def on_nfc_connect(tag): | ||
"""Handles a read tag""" | ||
|
||
if tag.ndef is None: | ||
print("The tag doesn't have NDEF records", flush=True) | ||
return True | ||
|
||
spool, filament = get_data_from_ndef_records(tag.ndef.records) | ||
|
||
if not args.get("clear_spool"): | ||
if not (spool and filament): | ||
print("Did not find spool and filament records in tag", flush=True) | ||
if args.get("clear_spool") or (spool and filament): | ||
if not spool: | ||
spool = 0 | ||
if not filament: | ||
filament = 0 | ||
set_spool_and_filament(args["moonraker_url"], spool, filament) | ||
|
||
# Don't let connect return until the tag is removed: | ||
return True | ||
|
||
|
||
@app.route("/w/<int:spool>/<int:filament>") | ||
def write_tag(spool, filament): | ||
""" | ||
The web-api to write the spool & filament data to NFC/RFID tag | ||
""" | ||
print(f" write spool={spool}, filament={filament}") | ||
if filament > 1: | ||
return "Didn't find NFC tag", 503 # Service Unavailable | ||
return "OK" | ||
|
||
|
||
@app.route("/") | ||
def index(): | ||
""" | ||
Returns the main index page. | ||
""" | ||
spools = spoolman.get_spools() | ||
|
||
return render_template("index.html", spools=spools) | ||
|
||
if __name__ == "__main__": | ||
|
||
nfc_handler = NfcHandler(args["nfc-device"]) | ||
|
||
if args.get("clear_spool"): | ||
# Start by unsetting current spool & filament: | ||
moonraker.set_spool_and_filament(0, 0) | ||
|
||
app.run(args["web_address"], port=args["web_port"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
flask==3.0.3 | ||
json5==0.9.25 | ||
nfcpy==1.0.4 | ||
requests==2.31.0 | ||
npyscreen==4.10.5 | ||
requests==2.31.0 |
Oops, something went wrong.