From efa88a73bc8f7f9706c9d60b3134f26e77fffde7 Mon Sep 17 00:00:00 2001 From: Priyanshu Tripathi Date: Mon, 29 Jul 2024 06:18:22 +0400 Subject: [PATCH] Switch to charachorder.py (#125) * refactor: use charachorder.py * fix: remove pyserial from the repo * fix: bump cc.py version --- nexus/CCSerial/CCSerial.py | 261 ------------------------------------- nexus/CCSerial/__init__.py | 5 - nexus/Freqlog/Freqlog.py | 48 ++++--- requirements.txt | 2 +- 4 files changed, 24 insertions(+), 292 deletions(-) delete mode 100644 nexus/CCSerial/CCSerial.py delete mode 100644 nexus/CCSerial/__init__.py diff --git a/nexus/CCSerial/CCSerial.py b/nexus/CCSerial/CCSerial.py deleted file mode 100644 index 92a3188..0000000 --- a/nexus/CCSerial/CCSerial.py +++ /dev/null @@ -1,261 +0,0 @@ -from dataclasses import dataclass -from typing import Iterator - -from serial import Serial -from serial.tools import list_ports -from serial.tools.list_ports_common import ListPortInfo - - -@dataclass -class CCDevice: - """ - CharaChorder device - """ - name: str - device: ListPortInfo - - def __repr__(self): - return self.name + " (" + self.device.device + ")" - - def __str__(self): - return self.name + " (" + self.device.device + ")" - - -class CCSerial: - - @staticmethod - def list_devices() -> list[CCDevice]: - """ - List CharaChorder serial devices - :returns: List of CharaChorder serial devices - """ - devices: list[CCDevice] = [] - for dev in list_ports.comports(): - match dev.vid: - case 9114: # Adafruit (M0) - match dev.pid: - case 32783: - devices.append(CCDevice("CharaChorder One", dev)) - case 32796: - devices.append(CCDevice("CharaChorder Lite (M0)", dev)) - case 12346: # Espressif (S2) - match dev.pid: - case 33070: - devices.append(CCDevice("CharaChorder Lite (S2)", dev)) - case 33163: - devices.append(CCDevice("CharaChorder X", dev)) - return devices - - def __init__(self, device: CCDevice) -> None: - """ - Initialize CharaChorder serial device - :param device: Path to device (use CCSerial.get_devices()[][0]) - """ - self.ser = Serial(device.device.device, 115200, timeout=1) - - def close(self): - """ - Close serial connection, must be called after completion of all serial operations on one device - """ - self.ser.close() - - def _readline_to_list(self) -> list[str]: - """ - Read a line from the serial device and split it into a list - :return: List of strings if read was successful, empty list otherwise - """ - res = self.ser.readline().decode("utf-8") - return res.strip().split(" ") if res and res[-1] == "\n" else [] - - def get_device_id(self) -> str: - """ - Get CharaChorder device ID - :raises IOError: If serial response is invalid - :returns: Device ID - """ - try: - self.ser.write(b"ID\r\n") - res = None - while not res or len(res) == 1: # Drop serial output from chording during this time - res = self._readline_to_list() - except Exception: - self.close() - raise - if len(res) != 2 or res[0] != "ID": - raise IOError(f"Invalid response: {res}") - return res[1] - - def get_device_version(self) -> str: - """ - Get CharaChorder device version - :raises IOError: If serial response is invalid - :returns: Device version - """ - try: - self.ser.write(b"VERSION\r\n") - res = None - while not res or len(res) == 1: # Drop serial output from chording during this time - res = self._readline_to_list() - except Exception: - self.close() - raise - if len(res) != 2 or res[0] != "VERSION": - raise IOError(f"Invalid response: {res}") - return res[1] - - def get_chordmap_count(self) -> int: - """ - Get CharaChorder device chordmap count - :raises IOError: If serial response is invalid - :returns: Chordmap count - """ - try: - self.ser.write(b"CML C0\r\n") - res = None - while not res or len(res) == 1: # Drop serial output from chording during this time - res = self._readline_to_list() - except Exception: - self.close() - raise - if len(res) != 3 or res[0] != "CML" or res[1] != "C0": - raise IOError(f"Invalid response: {res}") - return int(res[2]) - - def get_chordmap_by_index(self, index: int) -> (str, str): - """ - Get chordmap from CharaChorder device by index - :param index: Chordmap index - :raises ValueError: If index is out of range - :raises IOError: If serial response is invalid - :returns: Chord (hex), Chordmap (Hexadecimal CCActionCodes List) - """ - if index < 0 or index >= self.get_chordmap_count(): - raise ValueError("Index out of range") - try: - self.ser.write(f"CML C1 {index}\r\n".encode("utf-8")) - res = None - while not res or len(res) == 1: # Drop serial output from chording during this time - res = self._readline_to_list() - except Exception: - self.close() - raise - if len(res) != 6 or res[0] != "CML" or res[1] != "C1" or res[2] != str(index) or res[3] == "0" or res[4] == "0": - raise IOError(f"Invalid response: {res}") - return res[3], res[4] - - def get_chordmap_by_chord(self, chord: str) -> str | None: - """ - Get chordmap from CharaChorder device by chord - :param chord: Chord (hex) - :raises ValueError: If chord is not a hex string - :raises IOError: If serial response is invalid - :returns: Chordmap (Hexadecimal CCActionCodes List), or None if chord was not found on device - """ - try: - int(chord, 16) - except ValueError: - raise ValueError("Chord must be a hex string") - try: - self.ser.write(f"CML C2 {chord}\r\n".encode("utf-8")) - res = None - while not res or len(res) == 1: # Drop serial output from chording during this time - res = self._readline_to_list() - except Exception: - self.close() - raise - if len(res) != 4 or res[0] != "CML" or res[1] != "C2" or res[2] != chord: - raise IOError(f"Invalid response: {res}") - return res[3] if res[3] != "0" else None - - def set_chordmap_by_chord(self, chord: str, chordmap: str) -> bool: - """ - Set chordmap on CharaChorder device by chord - :param chord: Chord (hex) - :param chordmap: Chordmap (Hexadecimal CCActionCodes List) - :raises ValueError: If chord or chordmap is not a hex string - :raises IOError: If serial response is invalid - :returns: Whether the chord was set successfully - """ - try: - int(chord, 16) - except ValueError: - raise ValueError("Chord must be a hex string") - try: - int(chordmap, 16) - except ValueError: - raise ValueError("Chordmap must be a hex string") - try: - self.ser.write(f"CML C3 {chord}\r\n".encode("utf-8")) - res = None - while not res or len(res) == 1: # Drop serial output from chording during this time - res = self._readline_to_list() - except Exception: - self.close() - raise - if len(res) != 5 or res[0] != "CML" or res[1] != "C3" or res[2] != chord: - raise IOError(f"Invalid response: {res}") - return res[4] == "0" - - def del_chordmap_by_chord(self, chord: str) -> bool: - """ - Delete chordmap from CharaChorder device by chord - :param chord: Chord (hex) - :raises ValueError: If chord is not a hex string - :raises IOError: If serial response is invalid - :returns: False if the chord was not found on the device or was not deleted, True otherwise - """ - try: - int(chord, 16) - except ValueError: - raise ValueError("Chord must be a hex string") - try: - self.ser.write(f"CML C4 {chord}\r\n".encode("utf-8")) - res = None - while not res or len(res) == 1: # Drop serial output from chording during this time - res = self._readline_to_list() - except Exception: - self.close() - raise - if len(res) != 4 or res[0] != "CML" or res[1] != "C4": - raise IOError(f"Invalid response: {res}") - return res[3] == "0" - - @staticmethod - def decode_ascii_cc_action_code(code: int) -> str: - """ - Decode CharaChorder action code - :param code: integer action code - :return: character corresponding to decoded action code - :note: only decodes ASCII characters for now (32-126) - """ - if 32 <= code <= 126: - return chr(code) - else: - raise NotImplementedError(f"Action code {code} ({hex(code)}) not supported yet") - - def list_device_chords(self) -> Iterator[str]: - """ - List all chord(map)s on CharaChorder device - :return: list of chordmaps - """ - num_chords = self.get_chordmap_count() - for i in range(num_chords): - chord_hex = self.get_chordmap_by_index(i)[1] - chord_int = [int(chord_hex[i:i + 2], 16) for i in range(0, len(chord_hex), 2)] - chord_utf8 = [] - for j, c in enumerate(chord_int): - if c < 32: # 10-bit scan code - chord_int[j + 1] = (chord_int[j] << 8) | chord_int[j + 1] - elif c == 296: # Enter - chord_utf8.append("\n") - elif c == 298 and len(chord_utf8) > 0: # Backspace - chord_utf8.pop() - elif c == 299: # Tab - chord_utf8.append("\t") - elif c == 544: # Spaceright - chord_utf8.append(" ") - elif c > 126: # TODO: support non-ASCII characters - continue - else: - chord_utf8.append(chr(c)) - yield "".join(chord_utf8).strip() diff --git a/nexus/CCSerial/__init__.py b/nexus/CCSerial/__init__.py deleted file mode 100644 index a3f1dec..0000000 --- a/nexus/CCSerial/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""nexus module CCSerial: interfacing with cc devices using the serial api.""" - -__all__ = ["CCSerial"] - -from .CCSerial import CCSerial diff --git a/nexus/Freqlog/Freqlog.py b/nexus/Freqlog/Freqlog.py index e77b4bb..afdc99b 100644 --- a/nexus/Freqlog/Freqlog.py +++ b/nexus/Freqlog/Freqlog.py @@ -6,13 +6,12 @@ from threading import Thread from typing import Optional +from charachorder import CharaChorder, SerialException from pynput import keyboard as kbd, mouse -from serial import SerialException from .backends import Backend, SQLiteBackend from .Definitions import ActionType, BanlistAttr, BanlistEntry, CaseSensitivity, ChordMetadata, ChordMetadataAttr, \ Defaults, WordMetadata, WordMetadataAttr -from ..CCSerial import CCSerial class Freqlog: @@ -230,21 +229,23 @@ def _get_chords(self): """ Get chords from device """ - logging.info(f"Getting {self.dev.get_chordmap_count()} chords from device") - self.chords = [] - started_logging = False # prevent early short-circuit - for chord in self.dev.list_device_chords(): - self.chords.append(chord.strip()) - if not self.is_logging: # Short circuit if logging is stopped - if started_logging: - logging.info("Stopped getting chords from device") - break + if self.device is None: + return + + with self.device: + logging.info(f"Getting {self.device.get_chordmap_count()} chords from device") + self.chords = [] + started_logging = False # prevent early short-circuit + for chord, phrase in self.device.get_chordmaps(): + self.chords.append(str(phrase).strip()) + if not self.is_logging: # Short circuit if logging is stopped + if started_logging: + logging.info("Stopped getting chords from device") + break + else: + started_logging = True else: - started_logging = True - else: - logging.info(f"Got {len(self.chords)} chords from device") - if self.dev: - self.dev.close() + logging.info(f"Got {len(self.chords)} chords from device") @staticmethod def is_backend_initialized(backend_path: str) -> bool: @@ -272,12 +273,12 @@ def __init__(self, backend_path: str, password_callback: callable, loggable: boo :raises cryptography.fernet.InvalidToken: If the password is incorrect """ logging.info("Initializing freqlog") - self.dev: CCSerial | None = None + self.device: CharaChorder | None = None self.chords: list[str] | None = None self.num_chords: int | None = None # Get serial device - devices = CCSerial.list_devices() + devices = CharaChorder.list_devices() if len(devices) == 0: logging.warning("No CharaChorder devices found") else: @@ -286,8 +287,9 @@ def __init__(self, backend_path: str, password_callback: callable, loggable: boo logging.debug(f"Other devices: {devices[1:]}") logging.info(f"Connecting to CharaChorder device at {devices[0]}") try: - self.dev = CCSerial(devices[0]) - self.num_chords = self.dev.get_chordmap_count() + self.device = devices[0] + with self.device: + self.num_chords = self.device.get_chordmap_count() except SerialException as e: logging.error(f"Failed to connect to CharaChorder device: {devices[0]}") logging.error(e) @@ -300,11 +302,7 @@ def __init__(self, backend_path: str, password_callback: callable, loggable: boo logging.info(f"Logging set to freqlog db at {backend_path}") # Asynchronously get chords from device - if self.dev: - Thread(target=self._get_chords).start() - else: # We're done with device, close it - if self.dev: - self.dev.close() + Thread(target=self._get_chords).start() self.backend: Backend = SQLiteBackend(backend_path, password_callback, upgrade_callback) self.q: Queue = Queue() diff --git a/requirements.txt b/requirements.txt index a3dffe9..5d62491 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ +charachorder.py~=0.5.2 pynput~=1.7.6 pyinstaller~=5.13 setuptools~=70.3 PySide6~=6.5 -pySerial~=3.5 cryptography~=42.0 requests~=2.32.1