-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The start of a web service to write NFC tags
- Loading branch information
Showing
6 changed files
with
302 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
#!/usr/bin/env python3 | ||
|
||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
"""Moonraker Web Client""" | ||
|
||
import requests | ||
|
||
class MoonrakerWebClient: | ||
"""Moonraker Web Client""" | ||
|
||
def __init__(self, url : str): | ||
self.url = url | ||
|
||
|
||
def set_spool_and_filament(spool: int, filament: int): | ||
"""Calls moonraker with the current spool & filament""" | ||
|
||
commands = { | ||
"commands": [ | ||
f"SET_ACTIVE_SPOOL ID={spool}", | ||
f"SET_ACTIVE_FILAMENT ID={filament}", | ||
] | ||
} | ||
|
||
response = requests.post( | ||
self.url + "/api/printer/command", timeout=10, json=commands | ||
) | ||
if response.status_code != 200: | ||
raise ValueError(f"Request to moonraker failed: {response}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
#!/usr/bin/env python3 | ||
|
||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
"""Spoolman client""" | ||
|
||
import json | ||
import requests | ||
|
||
class SpoolmanClient: | ||
""" Spoolman Web Client """ | ||
|
||
def __init__(self, url : str): | ||
if url.endswith("/"): | ||
url = url[:-1] | ||
self.url = url | ||
|
||
def record_to_text(record): | ||
"""Translate a json spool object to a readable string""" | ||
return f"#{record['id']} {record['filament']['vendor']['name']} - {record['filament']['name']}" | ||
|
||
|
||
def get_spools(self): | ||
""" Get the spools from spoolman """ | ||
url = self.url + "/api/v1/spool" | ||
response = requests.get(url, timeout=10) | ||
if response.status_code != 200: | ||
raise ValueError(f"Request to spoolman failed: {response}") | ||
records = json.loads(response.text) | ||
return records |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
{ | ||
/// The address the web server listens to, | ||
/// use 0.0.0.0 for all IPv4 | ||
"web_address": "0.0.0.0", | ||
|
||
/// The port the web server listens to | ||
"web_port": 5001, | ||
|
||
/// Clear the spool & filament info if no tag can be read | ||
"clear-spool": false, | ||
|
||
/// Which NFC reader to use, see | ||
/// https://nfcpy.readthedocs.io/en/latest/topics/get-started.html#open-a-local-device | ||
"nfc-device": "ttyAMA0", | ||
|
||
/// URL for the moonraker installation | ||
"moonraker-url": "http://mainsailos.local", | ||
|
||
/// URL for the moonraker installation | ||
"spoolman-url": "http://mainsailos.local:7912", | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
#!/usr/bin/env python3 | ||
|
||
# SPDX-FileCopyrightText: 2024 Sebastian Andersson <[email protected]> | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
"""Program to set current filament & spool in klipper.""" | ||
|
||
from flask import Flask, render_template | ||
import json5 | ||
import nfc | ||
import requests | ||
from SpoolmanClient import SpoolmanClient | ||
|
||
SPOOL = "SPOOL" | ||
FILAMENT = "FILAMENT" | ||
NDEF_TEXT_TYPE = "urn:nfc:wkt:T" | ||
|
||
with open("nfc2klipper_with_webservice-config.json5", "r", encoding="utf-8") as fp: | ||
args = json5.load(fp) | ||
|
||
spoolman = SpoolmanClient(args['spoolman-url']) | ||
|
||
app = Flask(__name__) | ||
|
||
|
||
def set_spool_and_filament(url: str, spool: int, filament: int): | ||
"""Calls moonraker with the current spool & filament""" | ||
|
||
if "old_spool" not in set_spool_and_filament.__dict__: | ||
set_spool_and_filament.old_spool = None | ||
set_spool_and_filament.old_filament = None | ||
|
||
if ( | ||
set_spool_and_filament.old_spool == spool | ||
and set_spool_and_filament.old_filament == filament | ||
): | ||
print("Read same spool & filament", flush=True) | ||
return | ||
|
||
print(f"Sending spool #{spool}, filament #{filament} to klipper", flush=True) | ||
|
||
commands = { | ||
"commands": [ | ||
f"SET_ACTIVE_SPOOL ID={spool}", | ||
f"SET_ACTIVE_FILAMENT ID={filament}", | ||
] | ||
} | ||
|
||
# In case the post fails, we might not know if the server has received | ||
# it or not, so set them to None: | ||
set_spool_and_filament.old_spool = None | ||
set_spool_and_filament.old_filament = None | ||
|
||
try: | ||
response = requests.post( | ||
url + "/api/printer/command", timeout=10, json=commands | ||
) | ||
if response.status_code != 200: | ||
raise ValueError(f"Request to moonraker failed: {response}") | ||
except Exception as ex: # pylint: disable=W0718 | ||
print(ex) | ||
|
||
set_spool_and_filament.old_spool = spool | ||
set_spool_and_filament.old_filament = filament | ||
|
||
|
||
def get_data_from_ndef_records(records): | ||
"""Find wanted data from the NDEF records. | ||
>>> import ndef | ||
>>> record0 = ndef.TextRecord("") | ||
>>> record1 = ndef.TextRecord("SPOOL:23\\n") | ||
>>> record2 = ndef.TextRecord("FILAMENT:14\\n") | ||
>>> record3 = ndef.TextRecord("SPOOL:23\\nFILAMENT:14\\n") | ||
>>> get_data_from_ndef_records([record0]) | ||
(None, None) | ||
>>> get_data_from_ndef_records([record3]) | ||
('23', '14') | ||
>>> get_data_from_ndef_records([record1]) | ||
('23', None) | ||
>>> get_data_from_ndef_records([record2]) | ||
(None, '14') | ||
>>> get_data_from_ndef_records([record0, record3]) | ||
('23', '14') | ||
>>> get_data_from_ndef_records([record3, record0]) | ||
('23', '14') | ||
>>> get_data_from_ndef_records([record1, record2]) | ||
('23', '14') | ||
>>> get_data_from_ndef_records([record2, record1]) | ||
('23', '14') | ||
""" | ||
|
||
spool = None | ||
filament = None | ||
|
||
for record in records: | ||
if record.type == NDEF_TEXT_TYPE: | ||
for line in record.text.splitlines(): | ||
line = line.split(":") | ||
if len(line) == 2: | ||
if line[0] == SPOOL: | ||
spool = line[1] | ||
if line[0] == FILAMENT: | ||
filament = line[1] | ||
else: | ||
print(f"Read other record: {record}", flush=True) | ||
|
||
return spool, filament | ||
|
||
|
||
def on_nfc_connect(tag): | ||
"""Handles a read tag""" | ||
|
||
if tag.ndef is None: | ||
print("The tag doesn't have NDEF records", flush=True) | ||
return True | ||
|
||
spool, filament = get_data_from_ndef_records(tag.ndef.records) | ||
|
||
if not args.get("clear_spool"): | ||
if not (spool and filament): | ||
print("Did not find spool and filament records in tag", flush=True) | ||
if args.get("clear_spool") or (spool and filament): | ||
if not spool: | ||
spool = 0 | ||
if not filament: | ||
filament = 0 | ||
set_spool_and_filament(args["moonraker_url"], spool, filament) | ||
|
||
# Don't let connect return until the tag is removed: | ||
return True | ||
|
||
|
||
@app.route("/w/<int:spool>/<int:filament>") | ||
def write_tag(spool, filament): | ||
""" | ||
The web-api to write the spool & filament data to NFC/RFID tag | ||
""" | ||
import time | ||
|
||
print(f" write spool={spool}, filament={filament}") | ||
time.sleep(5) | ||
if filament > 1: | ||
return "Didn't find NFC tag", 503 # Service Unavailable | ||
return "OK" | ||
|
||
|
||
@app.route("/") | ||
def index(): | ||
""" | ||
Returns the main index page. | ||
""" | ||
spools = spoolman.get_spools() | ||
|
||
return render_template("index.html", spools=spools) | ||
|
||
app.run(args["web_address"], port=args["web_port"]) | ||
|
||
if __name__ == "__main2__": | ||
|
||
# Open NFC reader. Will throw an exception if it fails. | ||
clf = nfc.ContactlessFrontend(args["nfc_device"]) | ||
|
||
if args.get("clear_spool"): | ||
# Start by unsetting current spool & filament: | ||
set_spool_and_filament(args["moonraker_url"], 0, 0) | ||
|
||
while True: | ||
clf.connect(rdwr={"on-connect": on_nfc_connect}) | ||
# No tag connected anymore. | ||
if args.get("clear_spool"): | ||
set_spool_and_filament(args["moonraker_url"], 0, 0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
flask==3.0.3 | ||
json5==0.9.25 | ||
nfcpy==1.0.4 | ||
requests==2.31.0 | ||
npyscreen==4.10.5 | ||
requests==2.31.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
<!DOCTYPE html> | ||
<html lang="en"> | ||
<head> | ||
<meta charset="UTF-8"> | ||
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no"> | ||
<link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous"> | ||
|
||
<title> NFC2Klipper </title> | ||
</head> | ||
<script> | ||
function x(spool, filament) { | ||
document.getElementById("status").textContent = "Writing to NFC..." | ||
fetch("/w/" + spool + "/" + filament) | ||
.then((result) => { | ||
if (!result.ok) { | ||
result.text().then((text) => { | ||
document.getElementById("status").textContent = "Failed to write to NFC tag: " + text; | ||
}); | ||
throw new Error("Could not write tag"); | ||
} | ||
return result.text(); | ||
}) | ||
.then((text) => { | ||
document.getElementById("status").textContent = "Wrote to NFC tag"; | ||
}) | ||
.catch((error) => { | ||
console.error("Failed to request NFC writing:", error); | ||
}); | ||
return false; | ||
} | ||
</script> | ||
<body> | ||
<h1> NFC22Klipper </h1> | ||
<div id="status"> </div> | ||
<ul> | ||
{% for spool in spools %} | ||
<li> | ||
<a onclick="x({{spool['spool_id']}}, {{spool['filament_id']}})" href="#">[write]</a> | ||
{{spool['id']}}: {{spool['filament']['vendor']['name']}} - {{spool['filament']['name']}} | ||
</li> | ||
{% endfor %} | ||
</ul> | ||
</body> | ||
</html> |