diff --git a/ble-spaceauth-manager.py b/ble-spaceauth-manager.py index 7bfd8e3..d15b677 100644 --- a/ble-spaceauth-manager.py +++ b/ble-spaceauth-manager.py @@ -7,6 +7,8 @@ import serial.serialutil import fcntl from enum import IntEnum +from paho.mqtt.publish import single as mqtt_pub +import argparse # 7-bit C1 ANSI sequences ansi_escape = re.compile(r''' @@ -17,12 +19,21 @@ [@-~] # Final byte ''', re.VERBOSE) -do_sanity_checks = True -verbose = False +parser = argparse.ArgumentParser(description='BLE Space Authentication Helper script.') +parser.add_argument('--skip-sanity-checks', action='store_true', default=False, + help='skip matching with local coin list') +parser.add_argument('--verbose', action='store_true', default=False, help='print more output') +parser.add_argument('--topic', default='Netz39/Things/Door/Command', help='MQTT Topic to publish to') +parser.add_argument('--host', default='localhost', help='MQTT Host to publish to') +parser.add_argument('--port', default=1883, type=int, help='MQTT Host Port to publish to') +parser.add_argument('--qos', default=2, type=int, help='QOS of MQTT message') +parser.add_argument('--msg', default=b"door open", type=bytes, help='MQTT Message payload') +args = parser.parse_args() def confirm_authentication(address, battery): print("\t[%s] successfully authenticated. Remaining Battery: %s%%" % (address, battery)) + mqtt_pub(topic=args.topic, payload=args.msg, qos=args.qos, hostname=args.host) # read coins and central ids from pseudo-database @@ -111,7 +122,7 @@ def parse_status(l): for k in regs: m = re.search(pattern=regs[k], string=l) if m: - if verbose: + if args.verbose: print('\t', k, m.groups()) return k, m.groups() return None, None @@ -128,12 +139,10 @@ async def serial_fetch_line(s): async def manage_serial(s: aioserial.AioSerial): s.write(b'ble_start\r\n') - config_identity, coin_list = read_db() - - bonds = await request_bonds(s) - spacekeys = await request_spacekeys(s) - - if do_sanity_checks: + if not args.skip_sanity_checks: + config_identity, coin_list = read_db() + bonds = await request_bonds(s) + spacekeys = await request_spacekeys(s) assert len(bonds) == len(spacekeys) == len(coin_list), "number of coins does not match" for i in zip(bonds, spacekeys, coin_list): assert i[0][0] == i[1][0], "addresses must match" @@ -147,7 +156,7 @@ async def manage_serial(s: aioserial.AioSerial): line = await serial_fetch_line(s) print(line, end='', flush=True) k, v = parse_status(line) - if do_sanity_checks: + if not args.skip_sanity_checks: if k == StatusType.IDENTITY: assert v[0].upper() == config_identity[0], v if k == StatusType.AUTHENTICATED: