diff --git a/lib/hidapi/hidapi.py b/lib/hidapi/hidapi.py index 3ba31e055..b9ccaf297 100644 --- a/lib/hidapi/hidapi.py +++ b/lib/hidapi/hidapi.py @@ -40,6 +40,8 @@ logger = logging.getLogger(__name__) +ACTION_ADD = "add" +ACTION_REMOVE = "remove" # Global handle to hidapi _hidapi = None @@ -209,10 +211,10 @@ def run(self): current_devices = {tuple(dev.items()): dev for dev in _enumerate_devices()} for key, device in self.prev_devices.items(): if key not in current_devices: - self.device_callback("remove", device) + self.device_callback(ACTION_REMOVE, device) for key, device in current_devices.items(): if key not in self.prev_devices: - self.device_callback("add", device) + self.device_callback(ACTION_ADD, device) self.prev_devices = current_devices sleep(self.polling_delay) @@ -220,7 +222,7 @@ def run(self): # The filterfn is used to determine whether this is a device of interest to Solaar. # It is given the bus id, vendor id, and product id and returns a dictionary # with the required hid_driver and usb_interface and whether this is a receiver or device. -def _match(action, device, filterfn): +def _match(action: str, device, filterfn): vid = device["vendor_id"] pid = device["product_id"] @@ -264,7 +266,7 @@ def _match(action, device, filterfn): return isDevice = filter.get("isDevice") - if action == "add": + if action == ACTION_ADD: d_info = DeviceInfo( path=device["path"].decode(), bus_id=bus_id, @@ -282,7 +284,7 @@ def _match(action, device, filterfn): ) return d_info - elif action == "remove": + elif action == ACTION_REMOVE: d_info = DeviceInfo( path=device["path"].decode(), bus_id=None, @@ -314,11 +316,11 @@ def find_paired_node_wpid(receiver_path, index): def monitor_glib(callback, filterfn): def device_callback(action, device): # print(f"device_callback({action}): {device}") - if action == "add": + if action == ACTION_ADD: d_info = _match(action, device, filterfn) if d_info: GLib.idle_add(callback, action, d_info) - elif action == "remove": + elif action == ACTION_REMOVE: # Removed devices will be detected by Solaar directly pass @@ -335,7 +337,7 @@ def enumerate(filterfn): :returns: a list of matching ``DeviceInfo`` tuples. """ for device in _enumerate_devices(): - d_info = _match("add", device, filterfn) + d_info = _match(ACTION_ADD, device, filterfn) if d_info: yield d_info diff --git a/lib/hidapi/udev.py b/lib/hidapi/udev.py index 082f565bd..e0e6281c4 100644 --- a/lib/hidapi/udev.py +++ b/lib/hidapi/udev.py @@ -44,6 +44,9 @@ logger = logging.getLogger(__name__) +ACTION_ADD = "add" +ACTION_REMOVE = "remove" + fileopen = open # @@ -73,12 +76,13 @@ def exit(): # The filterfn is used to determine whether this is a device of interest to Solaar. # It is given the bus id, vendor id, and product id and returns a dictionary # with the required hid_driver and usb_interface and whether this is a receiver or device. -def _match(action, device, filterfn): +def _match(action: str, device, filterfn): if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Dbus event {action} {device}") hid_device = device.find_parent("hid") if hid_device is None: # only HID devices are of interest to Solaar return + hid_id = hid_device.properties.get("HID_ID") if not hid_id: return # there are reports that sometimes the id isn't set up right so be defensive @@ -114,7 +118,7 @@ def _match(action, device, filterfn): interface_number = filter.get("usb_interface") isDevice = filter.get("isDevice") - if action == "add": + if action == ACTION_ADD: hid_driver_name = hid_device.properties.get("DRIVER") intf_device = device.find_parent("usb", "usb_interface") usb_interface = None if intf_device is None else intf_device.attributes.asint("bInterfaceNumber") @@ -152,7 +156,7 @@ def _match(action, device, filterfn): ) return d_info - elif action == "remove": + elif action == ACTION_REMOVE: d_info = DeviceInfo( path=device.device_node, bus_id=None, @@ -224,11 +228,11 @@ def _process_udev_event(monitor, condition, cb, filterfn): if event: action, device = event # print ("***", action, device) - if action == "add": + if action == ACTION_ADD: d_info = _match(action, device, filterfn) if d_info: GLib.idle_add(cb, action, d_info) - elif action == "remove": + elif action == ACTION_REMOVE: # the GLib notification does _not_ match! pass return True diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index b54dc1e9a..9f7fd8fe4 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -15,11 +15,14 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +from __future__ import annotations + import logging import socket import struct import threading +from enum import IntEnum from typing import Any from typing import List from typing import Optional @@ -55,10 +58,10 @@ class Device(Protocol): - def feature_request(self, feature: FEATURE) -> Any: + def feature_request(self, feature, function, *params, no_reply=False) -> Any: ... - def request(self) -> Any: + def request(self, **kwargs) -> Any: ... @property @@ -651,7 +654,7 @@ def __repr__(self): class Gesture: - def __init__(self, device, low, high, next_index, next_diversion_index): + def __init__(self, device: Device, low, high, next_index, next_diversion_index): self._device = device self.id = low self.gesture = GESTURE[low] @@ -725,7 +728,7 @@ def __repr__(self): class Param: - def __init__(self, device, low, high, next_param_index): + def __init__(self, device: Device, low, high, next_param_index): self._device = device self.id = low self.param = PARAM[low] @@ -773,7 +776,7 @@ def __int__(self): class Spec: - def __init__(self, device, low, high): + def __init__(self, device: Device, low, high): self._device = device self.id = low self.spec = SPEC[low] @@ -984,7 +987,7 @@ def __str__(self): class LEDEffectInfo: # an effect that a zone can do - def __init__(self, feature, function, device, zindex, eindex): + def __init__(self, feature, function, device: Device, zindex, eindex): info = device.feature_request(feature, function, zindex, eindex, 0x00) self.zindex, self.index, self.ID, self.capabilities, self.period = struct.unpack("!BBHHH", info[0:8]) @@ -1008,7 +1011,7 @@ def __str__(self): class LEDZoneInfo: # effects that a zone can do - def __init__(self, feature, function, offset, effect_function, device, index): + def __init__(self, feature, function, offset, effect_function, device: Device, index): info = device.feature_request(feature, function, index, 0xFF, 0x00) self.location, self.count = struct.unpack("!HB", info[1 + offset : 4 + offset]) self.index = index @@ -1057,28 +1060,42 @@ def __init__(self, device): self.zones.append(LEDZoneInfo(FEATURE.RGB_EFFECTS, 0x00, 1, 0x00, device, i)) -ButtonBehaviors = common.NamedInts(MacroExecute=0x0, MacroStop=0x1, MacroStopAll=0x2, Send=0x8, Function=0x9) -ButtonMappingTypes = common.NamedInts(No_Action=0x0, Button=0x1, Modifier_And_Key=0x2, Consumer_Key=0x3) -ButtonFunctions = common.NamedInts( - No_Action=0x0, - Tilt_Left=0x1, - Tilt_Right=0x2, - Next_DPI=0x3, - Previous_DPI=0x4, - Cycle_DPI=0x5, - Default_DPI=0x6, - Shift_DPI=0x7, - Next_Profile=0x8, - Previous_Profile=0x9, - Cycle_Profile=0xA, - G_Shift=0xB, - Battery_Status=0xC, - Profile_Select=0xD, - Mode_Switch=0xE, - Host_Button=0xF, - Scroll_Down=0x10, - Scroll_Up=0x11, -) +class ButtonBehavior(IntEnum): + MACRO_EXECUTE = 0x0 + MACRO_STOP = 0x1 + MACRO_STOP_ALL = 0x2 + SEND = 0x8 + FUNCTION = 0x9 + + +class ButtonMappingType(IntEnum): + NO_ACTION = 0x0 + BUTTON = 0x1 + MODIFIER_AND_KEY = 0x2 + CONSUMER_KEY = 0x3 + + +class ButtonFunctions(IntEnum): + NO_ACTION = 0x0 + TILT_LEFT = 0x1 + TILT_RIGHT = 0x2 + NEXT_DPI = 0x3 + PREVIOUS_DPI = 0x4 + CYCLE_DPI = 0x5 + DEFAULT_DPI = 0x6 + SHIFT_DPI = 0x7 + NEXT_PROFILE = 0x8 + PREVIOUS_PROFILE = 0x9 + CYCLE_PROFILE = 0xA + G_SHIFT = 0xB + BATTERY_STATUS = 0xC + PROFILE_SELECT = 0xD + MODE_SWITCH = 0xE + HOST_BUTTON = 0xF + SCROLL_DOWN = 0x10 + SCROLL_UP = 0x11 + + ButtonButtons = special_keys.MOUSE_BUTTONS ButtonModifiers = special_keys.modifiers ButtonKeys = special_keys.USB_HID_KEYCODES @@ -1088,8 +1105,8 @@ def __init__(self, device): class Button: """A button mapping""" - def __init__(self, **kwargs): - self.behavior = None + def __init__(self, behavior: int, **kwargs): + self.behavior = behavior for key, val in kwargs.items(): setattr(self, key, val) @@ -1103,54 +1120,61 @@ def to_yaml(cls, dumper, data): return dumper.represent_mapping("!Button", data.__dict__, flow_style=True) @classmethod - def from_bytes(cls, bytes): - behavior = ButtonBehaviors[bytes[0] >> 4] - if behavior == ButtonBehaviors.MacroExecute or behavior == ButtonBehaviors.MacroStop: - sector = ((bytes[0] & 0x0F) << 8) + bytes[1] - address = (bytes[2] << 8) + bytes[3] - result = cls(behavior=behavior, sector=sector, address=address) - elif behavior == ButtonBehaviors.Send: - mapping_type = ButtonMappingTypes[bytes[1]] - if mapping_type == ButtonMappingTypes.Button: - value = ButtonButtons[(bytes[2] << 8) + bytes[3]] - result = cls(behavior=behavior, type=mapping_type, value=value) - elif mapping_type == ButtonMappingTypes.Modifier_And_Key: - modifiers = bytes[2] - value = ButtonKeys[bytes[3]] - result = cls(behavior=behavior, type=mapping_type, modifiers=modifiers, value=value) - elif mapping_type == ButtonMappingTypes.Consumer_Key: - value = ButtonConsumerKeys[(bytes[2] << 8) + bytes[3]] - result = cls(behavior=behavior, type=mapping_type, value=value) - elif mapping_type == ButtonMappingTypes.No_Action: - result = cls(behavior=behavior, type=mapping_type) - elif behavior == ButtonBehaviors.Function: - value = ButtonFunctions[bytes[1]] if ButtonFunctions[bytes[1]] is not None else bytes[1] - data = bytes[3] - result = cls(behavior=behavior, value=value, data=data) + def from_bytes(cls, bytes_) -> Button: + result = None + try: + behavior = ButtonBehavior(bytes_[0] >> 4) + except ValueError: + behavior = None + + if behavior == ButtonBehavior.MACRO_EXECUTE or behavior == ButtonBehavior.MACRO_STOP: + sector = ((bytes_[0] & 0x0F) << 8) + bytes_[1] + address = (bytes_[2] << 8) + bytes_[3] + result = cls(behavior=behavior.value, sector=sector, address=address) + elif behavior == ButtonBehavior.SEND: + mapping_type = ButtonMappingType(bytes_[1]) + if mapping_type == ButtonMappingType.BUTTON: + value = ButtonButtons[(bytes_[2] << 8) + bytes_[3]] + result = cls(behavior=behavior.value, type=mapping_type.value, value=value) + elif mapping_type == ButtonMappingType.MODIFIER_AND_KEY: + modifiers = bytes_[2] + value = ButtonKeys[bytes_[3]] + result = cls(behavior=behavior.value, type=mapping_type.value, modifiers=modifiers, value=value) + elif mapping_type == ButtonMappingType.CONSUMER_KEY: + value = ButtonConsumerKeys[(bytes_[2] << 8) + bytes_[3]] + result = cls(behavior=behavior.value, type=mapping_type.value, value=value) + elif mapping_type == ButtonMappingType.NO_ACTION: + result = cls(behavior=behavior.value, type=mapping_type.value) + else: + raise ValueError("Unsupported ButtonMapping") + elif behavior == ButtonBehavior.FUNCTION: + value = ButtonFunctions(bytes_[1]).value if ButtonFunctions(bytes_[1]) is not None else bytes_[1] + data = bytes_[3] + result = cls(behavior=behavior.value, value=value, data=data) else: - result = cls(behavior=bytes[0] >> 4, bytes=bytes) + result = cls(behavior=bytes_[0] >> 4, bytes=bytes_) return result def to_bytes(self): - bytes = common.int2bytes(self.behavior << 4, 1) if self.behavior is not None else None - if self.behavior == ButtonBehaviors.MacroExecute or self.behavior == ButtonBehaviors.MacroStop: - bytes = common.int2bytes((self.behavior << 12) + self.sector, 2) + common.int2bytes(self.address, 2) - elif self.behavior == ButtonBehaviors.Send: - bytes += common.int2bytes(self.type, 1) - if self.type == ButtonMappingTypes.Button: - bytes += common.int2bytes(self.value, 2) - elif self.type == ButtonMappingTypes.Modifier_And_Key: - bytes += common.int2bytes(self.modifiers, 1) - bytes += common.int2bytes(self.value, 1) - elif self.type == ButtonMappingTypes.Consumer_Key: - bytes += common.int2bytes(self.value, 2) - elif self.type == ButtonMappingTypes.No_Action: - bytes += b"\xff\xff" - elif self.behavior == ButtonBehaviors.Function: - bytes += common.int2bytes(self.value, 1) + b"\xff" + (common.int2bytes(self.data, 1) if self.data else b"\x00") + bytes_ = common.int2bytes(self.behavior << 4, 1) if self.behavior is not None else None + if self.behavior == ButtonBehavior.MACRO_EXECUTE or self.behavior == ButtonBehavior.MACRO_STOP: + bytes_ = common.int2bytes((self.behavior << 12) + self.sector, 2) + common.int2bytes(self.address, 2) + elif self.behavior == ButtonBehavior.SEND: + bytes_ += common.int2bytes(self.type, 1) + if self.type == ButtonMappingType.BUTTON: + bytes_ += common.int2bytes(self.value, 2) + elif self.type == ButtonMappingType.MODIFIER_AND_KEY: + bytes_ += common.int2bytes(self.modifiers, 1) + bytes_ += common.int2bytes(self.value, 1) + elif self.type == ButtonMappingType.CONSUMER_KEY: + bytes_ += common.int2bytes(self.value, 2) + elif self.type == ButtonMappingType.NO_ACTION: + bytes_ += b"\xff\xff" + elif self.behavior == ButtonBehavior.FUNCTION: + bytes_ += common.int2bytes(self.value, 1) + b"\xff" + (common.int2bytes(self.data, 1) if self.data else b"\x00") else: - bytes = self.bytes if self.bytes else b"\xff\xff\xff\xff" - return bytes + bytes_ = self.bytes if self.bytes else b"\xff\xff\xff\xff" + return bytes_ def __repr__(self): return "%s{%s}" % ( @@ -1371,7 +1395,7 @@ def show(self): yaml.add_representer(OnboardProfiles, OnboardProfiles.to_yaml) -def feature_request(device, feature, function=0x00, *params, no_reply=False): +def feature_request(device: Device, feature, function=0x00, *params, no_reply=False): if device.online and device.features: if feature in device.features: feature_index = device.features[feature] @@ -1427,7 +1451,7 @@ def get_firmware(self, device): fw.append(fw_info) return tuple(fw) - def get_ids(self, device): + def get_ids(self, device: Device): """Reads a device's ids (unit and model numbers)""" ids = device.feature_request(FEATURE.DEVICE_FW_VERSION) if ids: diff --git a/lib/logitech_receiver/settings.py b/lib/logitech_receiver/settings.py index c9b816d87..c79a5a6e4 100644 --- a/lib/logitech_receiver/settings.py +++ b/lib/logitech_receiver/settings.py @@ -108,7 +108,7 @@ def range(self): assert hasattr(self, "_device") if self._validator.kind == KIND.range: - return (self._validator.min_value, self._validator.max_value) + return self._validator.min_value, self._validator.max_value def _pre_read(self, cached, key=None): if self.persist and self._value is None and getattr(self._device, "persister", None): @@ -626,7 +626,16 @@ class FeatureRW: default_read_fnid = 0x00 default_write_fnid = 0x10 - def __init__(self, feature, read_fnid=0x00, write_fnid=0x10, prefix=b"", suffix=b"", read_prefix=b"", no_reply=False): + def __init__( + self, + feature, + read_fnid=default_read_fnid, + write_fnid=default_write_fnid, + prefix=b"", + suffix=b"", + read_prefix=b"", + no_reply=False, + ): assert isinstance(feature, NamedInt) self.feature = feature self.read_fnid = read_fnid @@ -1488,7 +1497,7 @@ def handler(self, device, n): # Called on notification events from the device if n.sub_id < 0x40 and device.features.get_feature(n.sub_id) == hidpp20_constants.FEATURE.REPROG_CONTROLS_V4: if n.address == 0x00: cids = struct.unpack("!HHHH", n.data[:8]) - ## generalize to list of keys + # generalize to list of keys if not self.initiating_key: # no initiating key pressed for k in self.keys: if int(k.key) in cids: # initiating key that was pressed diff --git a/lib/logitech_receiver/settings_templates.py b/lib/logitech_receiver/settings_templates.py index f259384c0..f9b0db084 100644 --- a/lib/logitech_receiver/settings_templates.py +++ b/lib/logitech_receiver/settings_templates.py @@ -19,6 +19,7 @@ import struct import traceback +from enum import Enum from time import time from solaar.i18n import _ @@ -727,14 +728,24 @@ def build(cls, setting_class, device): return cls(choices, key_byte_count=2, byte_count=2, extra_default=0) if choices else None +class State(Enum): + IDLE = "idle" + PRESSED = "pressed" + MOVED = "moved" + + class DpiSlidingXY(settings.RawXYProcessing): + def __init__(self, device, name=""): + super().__init__(device, name) + self.fsmState = None + def activate_action(self): self.dpiSetting = next(filter(lambda s: s.name == "dpi" or s.name == "dpi_extended", self.device.settings), None) self.dpiChoices = list(self.dpiSetting.choices) self.otherDpiIdx = self.device.persister.get("_dpi-sliding", -1) if self.device.persister else -1 if not isinstance(self.otherDpiIdx, int) or self.otherDpiIdx < 0 or self.otherDpiIdx >= len(self.dpiChoices): self.otherDpiIdx = self.dpiChoices.index(self.dpiSetting.read()) - self.fsmState = "idle" + self.fsmState = State.IDLE self.dx = 0.0 self.movingDpiIdx = None @@ -754,23 +765,23 @@ def displayNewDpi(self, newDpiIdx): def press_action(self, key): # start tracking self.starting = True - if self.fsmState == "idle": - self.fsmState = "pressed" + if self.fsmState == State.IDLE: + self.fsmState = State.PRESSED self.dx = 0.0 # While in 'moved' state, the index into 'dpiChoices' of the currently selected DPI setting self.movingDpiIdx = None def release_action(self): # adjust DPI and stop tracking - if self.fsmState == "pressed": # Swap with other DPI + if self.fsmState == State.PRESSED: # Swap with other DPI thisIdx = self.dpiChoices.index(self.dpiSetting.read()) newDpiIdx, self.otherDpiIdx = self.otherDpiIdx, thisIdx if self.device.persister: self.device.persister["_dpi-sliding"] = self.otherDpiIdx self.setNewDpi(newDpiIdx) self.displayNewDpi(newDpiIdx) - elif self.fsmState == "moved": # Set DPI according to displacement + elif self.fsmState == State.MOVED: # Set DPI according to displacement self.setNewDpi(self.movingDpiIdx) - self.fsmState = "idle" + self.fsmState = State.IDLE def move_action(self, dx, dy): if self.device.features.get_feature_version(_F.REPROG_CONTROLS_V4) >= 5 and self.starting: @@ -778,11 +789,11 @@ def move_action(self, dx, dy): return currDpi = self.dpiSetting.read() self.dx += float(dx) / float(currDpi) * 15.0 # yields a more-or-less DPI-independent dx of about 5/cm - if self.fsmState == "pressed": + if self.fsmState == State.PRESSED: if abs(self.dx) >= 1.0: - self.fsmState = "moved" + self.fsmState = State.MOVED self.movingDpiIdx = self.dpiChoices.index(currDpi) - elif self.fsmState == "moved": + elif self.fsmState == State.MOVED: currIdx = self.dpiChoices.index(self.dpiSetting.read()) newMovingDpiIdx = min(max(currIdx + int(self.dx), 0), len(self.dpiChoices) - 1) if newMovingDpiIdx != self.movingDpiIdx: @@ -793,7 +804,7 @@ def move_action(self, dx, dy): class MouseGesturesXY(settings.RawXYProcessing): def activate_action(self): self.dpiSetting = next(filter(lambda s: s.name == "dpi" or s.name == "dpi_extended", self.device.settings), None) - self.fsmState = "idle" + self.fsmState = State.IDLE self.initialize_data() def initialize_data(self): @@ -804,13 +815,13 @@ def initialize_data(self): def press_action(self, key): self.starting = True - if self.fsmState == "idle": - self.fsmState = "pressed" + if self.fsmState == State.IDLE: + self.fsmState = State.PRESSED self.initialize_data() self.data = [key.key] def release_action(self): - if self.fsmState == "pressed": + if self.fsmState == State.PRESSED: # emit mouse gesture notification self.push_mouse_event() if logger.isEnabledFor(logging.INFO): @@ -818,10 +829,10 @@ def release_action(self): payload = struct.pack("!" + (len(self.data) * "h"), *self.data) notification = base.HIDPPNotification(0, 0, 0, 0, payload) diversion.process_notification(self.device, notification, _F.MOUSE_GESTURE) - self.fsmState = "idle" + self.fsmState = State.IDLE def move_action(self, dx, dy): - if self.fsmState == "pressed": + if self.fsmState == State.PRESSED: now = time() * 1000 # time_ns() / 1e6 if self.device.features.get_feature_version(_F.REPROG_CONTROLS_V4) >= 5 and self.starting: self.starting = False # hack to ignore strange first movement report from MX Master 3S