Skip to content

Commit

Permalink
WIP: Add web service to nfc2klipper for writing tags
Browse files Browse the repository at this point in the history
  • Loading branch information
bofh69 committed Oct 20, 2024
1 parent cb3f3d0 commit 0452b61
Show file tree
Hide file tree
Showing 10 changed files with 429 additions and 10 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
Expand Down
Empty file added lib/__init__.py
Empty file.
30 changes: 30 additions & 0 deletions lib/moonraker_web_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]>
# SPDX-License-Identifier: GPL-3.0-or-later

"""Moonraker Web Client"""

import requests


# pylint: disable=R0903
class MoonrakerWebClient:
"""Moonraker Web Client"""

def __init__(self, url: str):
self.url = url

def set_spool_and_filament(self, spool: int, filament: int):
"""Calls moonraker with the current spool & filament"""

commands = {
"commands": [
f"SET_ACTIVE_SPOOL ID={spool}",
f"SET_ACTIVE_FILAMENT ID={filament}",
]
}

response = requests.post(
self.url + "/api/printer/command", timeout=10, json=commands
)
if response.status_code != 200:
raise ValueError(f"Request to moonraker failed: {response}")
167 changes: 167 additions & 0 deletions lib/nfc_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]>
# SPDX-License-Identifier: GPL-3.0-or-later

""" NFC tag handling """

import time
from threading import Lock, Event

import ndef
import nfc


SPOOL = "SPOOL"
FILAMENT = "FILAMENT"
NDEF_TEXT_TYPE = "urn:nfc:wkt:T"


# pylint: disable=R0902
class NfcHandler:
"""NFC Tag handling"""

def __init__(self, nfc_device: str):
self.status = ""
self.nfc_device = nfc_device
self.on_nfc_no_tag_present = None
self.on_nfc_tag_present = None
self.should_stop_event = Event()
self.write_lock = Lock()
self.write_event = Event()
self.write_spool = None
self.write_filament = None

def set_no_tag_present_callback(self, on_nfc_no_tag_present):
"""Sets a callback that will be called when no tag is present"""
self.on_nfc_no_tag_present = on_nfc_no_tag_present

def set_tag_present_callback(self, on_nfc_tag_present):
"""Sets a callback that will be called when a tag has been read"""
self.on_nfc_tag_present = on_nfc_tag_present

def _write_to_nfc_tag(self, tag, spool: int, filament: int) -> bool:
"""Write given spool/filament ids to the tag"""
try:
if tag.ndef and tag.ndef.is_writeable:
tag.ndef.records = [
ndef.TextRecord(f"{SPOOL}:{spool}\n{FILAMENT}:{filament}\n")
]
return True
self.status = "Tag is write protected"
except Exception as ex: # pylint: disable=W0718
print(ex)
self.status = "Got error while writing"
return False

@classmethod
def get_data_from_ndef_records(cls, records: ndef.TextRecord):
"""Find wanted data from the NDEF records.
>>> import ndef
>>> record0 = ndef.TextRecord("")
>>> record1 = ndef.TextRecord("SPOOL:23\\n")
>>> record2 = ndef.TextRecord("FILAMENT:14\\n")
>>> record3 = ndef.TextRecord("SPOOL:23\\nFILAMENT:14\\n")
>>> NfcHandler.get_data_from_ndef_records([record0])
(None, None)
>>> NfcHandler.get_data_from_ndef_records([record3])
('23', '14')
>>> NfcHandler.get_data_from_ndef_records([record1])
('23', None)
>>> NfcHandler.get_data_from_ndef_records([record2])
(None, '14')
>>> NfcHandler.get_data_from_ndef_records([record0, record3])
('23', '14')
>>> NfcHandler.get_data_from_ndef_records([record3, record0])
('23', '14')
>>> NfcHandler.get_data_from_ndef_records([record1, record2])
('23', '14')
>>> NfcHandler.get_data_from_ndef_records([record2, record1])
('23', '14')
"""

spool = None
filament = None

for record in records:
if record.type == NDEF_TEXT_TYPE:
for line in record.text.splitlines():
line = line.split(":")
if len(line) == 2:
if line[0] == SPOOL:
spool = line[1]
if line[0] == FILAMENT:
filament = line[1]
else:
print(f"Read other record: {record}", flush=True)

return spool, filament

def on_nfc_connect(self, tag) -> bool:
"""Handles a read tag"""

if tag.ndef is None:
print("The tag doesn't have NDEF records", flush=True)
if self.on_nfc_no_tag_present:
self.on_nfc_no_tag_present()
return True

spool, filament = NfcHandler.get_data_from_ndef_records(tag.ndef.records)

if self.on_nfc_tag_present:
self.on_nfc_tag_present(spool, filament)

# Don't let connect return until the tag is removed:
return True

def _set_write_info(self, spool, filament):
if self.write_lock.acquire(): # pylint: disable=R1732
self.write_spool = spool
self.write_filament = filament
self.write_event.clear()
self.write_lock.release()

def write_to_tag(self, spool: int, filament: int) -> bool:
"""Writes spool & filament info to tag. Returns true if worked."""

self._set_write_info(spool, filament)

if self.write_event.wait(timeout=30):
return True

self._set_write_info(None, None)

return False

def stop(self):
"""Call to stop the handler"""
self.should_stop_event.set()

def run(self):
"""Run the NFC handler, won't return"""
# Open NFC reader. Will throw an exception if it fails.
with nfc.ContactlessFrontend(self.nfc_device) as clf:
while not self.should_stop_event.is_set():
tag = clf.connect(rdwr={"on-connect": lambda tag: False})
if tag:
if self.write_lock.acquire(): # pylint: disable=R1732
if self.write_spool:
if self._write_to_nfc_tag(
tag, self.write_spool, self.write_filament
):
self.write_event.set()
self.write_lock.write_spool = None
self.write_lock.write_filament = None
self.write_lock.release()
if tag.ndef is None:
if self.on_nfc_no_tag_present:
self.on_nfc_no_tag_present()
else:
if self.on_nfc_tag_present:
spool, filament = NfcHandler.get_data_from_ndef_records(
tag.ndef.records
)
self.on_nfc_tag_present(spool, filament)
while clf.connect(rdwr={"on-connect": lambda tag: False}):
time.sleep(0.1)
else:
time.sleep(0.1)
26 changes: 26 additions & 0 deletions lib/spoolman_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]>
# SPDX-License-Identifier: GPL-3.0-or-later

"""Spoolman client"""

import json
import requests


# pylint: disable=R0903
class SpoolmanClient:
"""Spoolman Web Client"""

def __init__(self, url: str):
if url.endswith("/"):
url = url[:-1]
self.url = url

def get_spools(self):
"""Get the spools from spoolman"""
url = self.url + "/api/v1/spool"
response = requests.get(url, timeout=10)
if response.status_code != 200:
raise ValueError(f"Request to spoolman failed: {response}")
records = json.loads(response.text)
return records
21 changes: 21 additions & 0 deletions nfc2klipper_with_webservice-config.json5
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
/// The address the web server listens to,
/// use 0.0.0.0 for all IPv4
"web_address": "0.0.0.0",

/// The port the web server listens to
"web_port": 5001,

/// Clear the spool & filament info if no tag can be read
"clear-spool": false,

/// Which NFC reader to use, see
/// https://nfcpy.readthedocs.io/en/latest/topics/get-started.html#open-a-local-device
"nfc-device": "ttyAMA0",

/// URL for the moonraker installation
"moonraker-url": "http://mainsailos.local",

/// URL for the moonraker installation
"spoolman-url": "http://mainsailos.local:7912",
}
124 changes: 124 additions & 0 deletions nfc2klipper_with_webservice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python3

# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]>
# SPDX-License-Identifier: GPL-3.0-or-later

"""Program to set current filament & spool in klipper, and write to tags. """

import threading

from flask import Flask, render_template
import json5

from lib.moonraker_web_client import MoonrakerWebClient
from lib.nfc_handler import NfcHandler
from lib.spoolman_client import SpoolmanClient

SPOOL = "SPOOL"
FILAMENT = "FILAMENT"
NDEF_TEXT_TYPE = "urn:nfc:wkt:T"

with open("nfc2klipper_with_webservice-config.json5", "r", encoding="utf-8") as fp:
args = json5.load(fp)

spoolman = SpoolmanClient(args["spoolman-url"])
moonraker = MoonrakerWebClient(args["moonraker-url"])
nfc_handler = NfcHandler(args["nfc-device"])


app = Flask(__name__)


def set_spool_and_filament(spool: int, filament: int):
"""Calls moonraker with the current spool & filament"""

if "old_spool" not in set_spool_and_filament.__dict__:
set_spool_and_filament.old_spool = None
set_spool_and_filament.old_filament = None

if (
set_spool_and_filament.old_spool == spool
and set_spool_and_filament.old_filament == filament
):
print("Read same spool & filament", flush=True)
return

print(f"Sending spool #{spool}, filament #{filament} to klipper", flush=True)

# In case the post fails, we might not know if the server has received
# it or not, so set them to None:
set_spool_and_filament.old_spool = None
set_spool_and_filament.old_filament = None

try:
moonraker.set_spool_and_filament(spool, filament)
except Exception as ex: # pylint: disable=W0718
print(ex)
return

set_spool_and_filament.old_spool = spool
set_spool_and_filament.old_filament = filament


@app.route("/w/<int:spool>/<int:filament>")
def write_tag(spool, filament):
"""
The web-api to write the spool & filament data to NFC/RFID tag
"""
print(f" write spool={spool}, filament={filament}")
if nfc_handler.write_to_tag(spool, filament):
return "OK"
return ("Failed to write to tag", 502)


@app.route("/")
def index():
"""
Returns the main index page.
"""
spools = spoolman.get_spools()

return render_template("index.html", spools=spools)


def on_nfc_tag_present(spool, filament):
"""Handles a read tag"""

if not args.get("clear_spool"):
if not (spool and filament):
print("Did not find spool and filament records in tag", flush=True)
if args.get("clear_spool") or (spool and filament):
if not spool:
spool = 0
if not filament:
filament = 0
set_spool_and_filament(spool, filament)


def on_nfc_no_tag_present():
"""Called when no tag is present (or tag without data)"""
if args.get("clear_spool"):
set_spool_and_filament(0, 0)


if __name__ == "__main__":

if args.get("clear_spool"):
# Start by unsetting current spool & filament:
set_spool_and_filament(0, 0)

nfc_handler.set_no_tag_present_callback(on_nfc_no_tag_present)
nfc_handler.set_tag_present_callback(on_nfc_tag_present)

thread = threading.Thread(target=nfc_handler.run)
thread.daemon = True
print("Starting nfc-handler")
thread.start()

print("Starting web server")
try:
app.run(args["web_address"], port=args["web_port"])
except Exception:
nfc_handler.stop()
thread.join()
raise
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
flask==3.0.3
json5==0.9.25
nfcpy==1.0.4
requests==2.31.0
npyscreen==4.10.5
requests==2.31.0
Loading

0 comments on commit 0452b61

Please sign in to comment.