From 239c5b6e249f260ecbff413560cc3b94c855f4c2 Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Thu, 19 Oct 2023 19:24:43 -0400 Subject: [PATCH 1/5] [2051] Config Files --- config/__init__.py | 72 +++++++++++------------- config/darwin.py | 24 ++++---- config/linux.py | 136 ++++++++++++++++++++++----------------------- config/windows.py | 98 +++++++++++++++----------------- 4 files changed, 156 insertions(+), 174 deletions(-) diff --git a/config/__init__.py b/config/__init__.py index a31cdd89e..ae08f3a03 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,12 +1,14 @@ """ -Code dealing with the configuration of the program. +__init__.py - Code dealing with the configuration of the program. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. Windows uses the Registry to store values in a flat manner. Linux uses a file, but for commonality it's still a flat data structure. macOS uses a 'defaults' object. """ - - __all__ = [ # defined in the order they appear in the file 'GITVERSION_FILE', @@ -40,10 +42,8 @@ import traceback import warnings from abc import abstractmethod -from typing import Any, Callable, Optional, Type, TypeVar - +from typing import Any, Callable, Optional, Type, TypeVar, Union, List import semantic_version - from constants import GITVERSION_FILE, applongname, appname # Any of these may be imported by plugins @@ -52,7 +52,7 @@ # # Major.Minor.Patch(-prerelease)(+buildmetadata) # NB: Do *not* import this, use the functions appversion() and appversion_nobuild() -_static_appversion = '5.9.5' +_static_appversion = '5.10.0-alpha0' _cached_version: Optional[semantic_version.Version] = None copyright = '© 2015-2019 Jonathan Harris, 2020-2023 EDCD' @@ -60,10 +60,10 @@ update_feed = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml' update_interval = 8*60*60 # Providers marked to be in debug mode. Generally this is expected to switch to sending data to a log file -debug_senders: list[str] = [] +debug_senders: List[str] = [] # TRACE logging code that should actually be used. Means not spamming it # *all* if only interested in some things. -trace_on: list[str] = [] +trace_on: List[str] = [] capi_pretend_down: bool = False capi_debug_access_token: Optional[str] = None @@ -79,7 +79,6 @@ _T = TypeVar('_T') -########################################################################### def git_shorthash_from_head() -> str: """ Determine short hash for current git HEAD. @@ -91,13 +90,14 @@ def git_shorthash_from_head() -> str: shorthash: str = None # type: ignore try: - git_cmd = subprocess.Popen('git rev-parse --short HEAD'.split(), - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT - ) + git_cmd = subprocess.Popen( + "git rev-parse --short HEAD".split(), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) out, err = git_cmd.communicate() - except Exception as e: + except subprocess.CalledProcessError as e: logger.info(f"Couldn't run git command for short hash: {e!r}") else: @@ -131,7 +131,7 @@ def appversion() -> semantic_version.Version: if getattr(sys, 'frozen', False): # Running frozen, so we should have a .gitversion file # Yes, .parent because if frozen we're inside library.zip - with open(pathlib.Path(sys.path[0]).parent / GITVERSION_FILE, 'r', encoding='utf-8') as gitv: + with open(pathlib.Path(sys.path[0]).parent / GITVERSION_FILE, encoding='utf-8') as gitv: shorthash = gitv.read() else: @@ -157,23 +157,15 @@ def appversion_nobuild() -> semantic_version.Version: :return: App version without any build meta data. """ return appversion().truncate('prerelease') -########################################################################### class AbstractConfig(abc.ABC): """Abstract root class of all platform specific Config implementations.""" OUT_EDDN_SEND_STATION_DATA = 1 - # OUT_MKT_BPC = 2 # No longer supported OUT_MKT_TD = 4 OUT_MKT_CSV = 8 OUT_SHIP = 16 - # OUT_SHIP_EDS = 16 # Replaced by OUT_SHIP - # OUT_SYS_FILE = 32 # No longer supported - # OUT_STAT = 64 # No longer available - # OUT_SHIP_CORIOLIS = 128 # Replaced by OUT_SHIP - # OUT_SYS_EDSM = 256 # Now a plugin - # OUT_SYS_AUTO = 512 # Now always automatic OUT_MKT_MANUAL = 1024 OUT_EDDN_SEND_NON_STATION = 2048 OUT_EDDN_DELAY = 4096 @@ -185,7 +177,6 @@ class AbstractConfig(abc.ABC): respath_path: pathlib.Path home_path: pathlib.Path default_journal_dir_path: pathlib.Path - identifier: str __in_shutdown = False # Is the application currently shutting down ? @@ -294,7 +285,7 @@ def default_journal_dir(self) -> str: @staticmethod def _suppress_call( - func: Callable[..., _T], exceptions: Type[BaseException] | list[Type[BaseException]] = Exception, + func: Callable[..., _T], exceptions: Union[Type[BaseException], List[Type[BaseException]]] = Exception, *args: Any, **kwargs: Any ) -> Optional[_T]: if exceptions is None: @@ -303,15 +294,15 @@ def _suppress_call( if not isinstance(exceptions, list): exceptions = [exceptions] - with contextlib.suppress(*exceptions): # type: ignore # it works fine, mypy + with contextlib.suppress(*exceptions): return func(*args, **kwargs) return None def get( self, key: str, - default: list | str | bool | int | None = None - ) -> list | str | bool | int | None: + default: Union[list, str, bool, int, None] = None + ) -> Union[list, str, bool, int, None]: """ Return the data for the requested key, or a default. @@ -326,19 +317,19 @@ def get( if (a_list := self._suppress_call(self.get_list, ValueError, key, default=None)) is not None: return a_list - elif (a_str := self._suppress_call(self.get_str, ValueError, key, default=None)) is not None: + if (a_str := self._suppress_call(self.get_str, ValueError, key, default=None)) is not None: return a_str - elif (a_bool := self._suppress_call(self.get_bool, ValueError, key, default=None)) is not None: + if (a_bool := self._suppress_call(self.get_bool, ValueError, key, default=None)) is not None: return a_bool - elif (an_int := self._suppress_call(self.get_int, ValueError, key, default=None)) is not None: + if (an_int := self._suppress_call(self.get_int, ValueError, key, default=None)) is not None: return an_int - return default # type: ignore + return default @abstractmethod - def get_list(self, key: str, *, default: list | None = None) -> list: + def get_list(self, key: str, *, default: Optional[list] = None) -> list: """ Return the list referred to by the given key if it exists, or the default. @@ -347,7 +338,7 @@ def get_list(self, key: str, *, default: list | None = None) -> list: raise NotImplementedError @abstractmethod - def get_str(self, key: str, *, default: str | None = None) -> str: + def get_str(self, key: str, *, default: Optional[str] = None) -> str: """ Return the string referred to by the given key if it exists, or the default. @@ -360,7 +351,7 @@ def get_str(self, key: str, *, default: str | None = None) -> str: raise NotImplementedError @abstractmethod - def get_bool(self, key: str, *, default: bool | None = None) -> bool: + def get_bool(self, key: str, *, default: Optional[bool] = None) -> bool: """ Return the bool referred to by the given key if it exists, or the default. @@ -400,7 +391,7 @@ def get_int(self, key: str, *, default: int = 0) -> int: raise NotImplementedError @abstractmethod - def set(self, key: str, val: int | str | list[str] | bool) -> None: + def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: """ Set the given key's data to the given value. @@ -462,16 +453,15 @@ def get_config(*args, **kwargs) -> AbstractConfig: from .darwin import MacConfig return MacConfig(*args, **kwargs) - elif sys.platform == "win32": # pragma: sys-platform-win32 + if sys.platform == "win32": # pragma: sys-platform-win32 from .windows import WinConfig return WinConfig(*args, **kwargs) - elif sys.platform == "linux": # pragma: sys-platform-linux + if sys.platform == "linux": # pragma: sys-platform-linux from .linux import LinuxConfig return LinuxConfig(*args, **kwargs) - else: # pragma: sys-platform-not-known - raise ValueError(f'Unknown platform: {sys.platform=}') + raise ValueError(f'Unknown platform: {sys.platform=}') config = get_config() diff --git a/config/darwin.py b/config/darwin.py index 895218a89..68d30306c 100644 --- a/config/darwin.py +++ b/config/darwin.py @@ -1,13 +1,17 @@ -"""Darwin/macOS implementation of AbstractConfig.""" +""" +darwin.py - Darwin/macOS implementation of AbstractConfig. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" import pathlib import sys from typing import Any, Dict, List, Union - from Foundation import ( # type: ignore NSApplicationSupportDirectory, NSBundle, NSDocumentDirectory, NSSearchPathForDirectoriesInDomains, NSUserDefaults, NSUserDomainMask ) - from config import AbstractConfig, appname, logger assert sys.platform == 'darwin' @@ -82,7 +86,7 @@ def get_str(self, key: str, *, default: str = None) -> str: """ res = self.__raw_get(key) if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default if not isinstance(res, str): raise ValueError(f'unexpected data returned from __raw_get: {type(res)=} {res}') @@ -97,9 +101,9 @@ def get_list(self, key: str, *, default: list = None) -> list: """ res = self.__raw_get(key) if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, list): + if not isinstance(res, list): raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}') return res @@ -114,7 +118,7 @@ def get_int(self, key: str, *, default: int = 0) -> int: if res is None: return default - elif not isinstance(res, (str, int)): + if not isinstance(res, (str, int)): raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}') try: @@ -122,7 +126,7 @@ def get_int(self, key: str, *, default: int = 0) -> int: except ValueError as e: logger.error(f'__raw_get returned {res!r} which cannot be parsed to an int: {e}') - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default def get_bool(self, key: str, *, default: bool = None) -> bool: """ @@ -132,9 +136,9 @@ def get_bool(self, key: str, *, default: bool = None) -> bool: """ res = self.__raw_get(key) if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, bool): + if not isinstance(res, bool): raise ValueError(f'__raw_get returned unexpected type {type(res)=} {res!r}') return res diff --git a/config/linux.py b/config/linux.py index 5d543d3fa..078764812 100644 --- a/config/linux.py +++ b/config/linux.py @@ -1,9 +1,15 @@ -"""Linux config implementation.""" +""" +linux.py - Linux config implementation. + +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" import os import pathlib import sys from configparser import ConfigParser - +from typing import Optional, Union, List from config import AbstractConfig, appname, logger assert sys.platform == 'linux' @@ -13,100 +19,97 @@ class LinuxConfig(AbstractConfig): """Linux implementation of AbstractConfig.""" SECTION = 'config' - # TODO: I dislike this, would rather use a sane config file format. But here we are. + __unescape_lut = {'\\': '\\', 'n': '\n', ';': ';', 'r': '\r', '#': '#'} __escape_lut = {'\\': '\\', '\n': 'n', ';': ';', '\r': 'r'} - def __init__(self, filename: str | None = None) -> None: + def __init__(self, filename: Optional[str] = None) -> None: + """ + Initialize LinuxConfig instance. + + :param filename: Optional file name to use for configuration storage. + """ super().__init__() - # http://standards.freedesktop.org/basedir-spec/latest/ar01s03.html + + # Initialize directory paths xdg_data_home = pathlib.Path(os.getenv('XDG_DATA_HOME', default='~/.local/share')).expanduser() self.app_dir_path = xdg_data_home / appname self.app_dir_path.mkdir(exist_ok=True, parents=True) self.plugin_dir_path = self.app_dir_path / 'plugins' self.plugin_dir_path.mkdir(exist_ok=True) - self.respath_path = pathlib.Path(__file__).parent.parent - self.internal_plugin_dir_path = self.respath_path / 'plugins' self.default_journal_dir_path = None # type: ignore - self.identifier = f'uk.org.marginal.{appname.lower()}' # TODO: Unused? + # Configure the filename config_home = pathlib.Path(os.getenv('XDG_CONFIG_HOME', default='~/.config')).expanduser() - - self.filename = config_home / appname / f'{appname}.ini' - if filename is not None: - self.filename = pathlib.Path(filename) - + self.filename = pathlib.Path(filename) if filename is not None else config_home / appname / f'{appname}.ini' self.filename.parent.mkdir(exist_ok=True, parents=True) - self.config: ConfigParser | None = ConfigParser(comment_prefixes=('#',), interpolation=None) - self.config.read(self.filename) # read() ignores files that dont exist + # Initialize the configuration + self.config = ConfigParser(comment_prefixes=('#',), interpolation=None) + self.config.read(self.filename) # Ensure that our section exists. This is here because configparser will happily create files for us, but it # does not magically create sections try: - self.config[self.SECTION].get("this_does_not_exist", fallback=None) + self.config[self.SECTION].get("this_does_not_exist") except KeyError: - logger.info("Config section not found. Backing up existing file (if any) and readding a section header") + logger.info("Config section not found. Backing up existing file (if any) and re-adding a section header") if self.filename.exists(): - (self.filename.parent / f'{appname}.ini.backup').write_bytes(self.filename.read_bytes()) - + backup_filename = self.filename.parent / f'{appname}.ini.backup' + backup_filename.write_bytes(self.filename.read_bytes()) self.config.add_section(self.SECTION) - if (outdir := self.get_str('outdir')) is None or not pathlib.Path(outdir).is_dir(): + # Set 'outdir' if not specified or invalid + outdir = self.get_str('outdir') + if outdir is None or not pathlib.Path(outdir).is_dir(): self.set('outdir', self.home) def __escape(self, s: str) -> str: """ - Escape a string using self.__escape_lut. - - This does NOT support multi-character escapes. + Escape special characters in a string. - :param s: str - String to be escaped. - :return: str - The escaped string. + :param s: The input string. + :return: The escaped string. """ - out = "" - for c in s: - if c not in self.__escape_lut: - out += c - continue + escaped_chars = [] - out += '\\' + self.__escape_lut[c] + for c in s: + escaped_chars.append(self.__escape_lut.get(c, c)) - return out + return ''.join(escaped_chars) def __unescape(self, s: str) -> str: """ - Unescape a string. + Unescape special characters in a string. - :param s: str - The string to unescape. - :return: str - The unescaped string. + :param s: The input string. + :return: The unescaped string. """ - out: list[str] = [] + unescaped_chars = [] i = 0 while i < len(s): - c = s[i] - if c != '\\': - out.append(c) + current_char = s[i] + if current_char != '\\': + unescaped_chars.append(current_char) i += 1 continue - # We have a backslash, check what its escaping - if i == len(s)-1: + if i == len(s) - 1: raise ValueError('Escaped string has unescaped trailer') - unescaped = self.__unescape_lut.get(s[i+1]) + unescaped = self.__unescape_lut.get(s[i + 1]) if unescaped is None: - raise ValueError(f'Unknown escape: \\ {s[i+1]}') + raise ValueError(f'Unknown escape: \\{s[i + 1]}') - out.append(unescaped) + unescaped_chars.append(unescaped) i += 2 - return "".join(out) + return "".join(unescaped_chars) - def __raw_get(self, key: str) -> str | None: + def __raw_get(self, key: str) -> Optional[str]: """ Get a raw data value from the config file. @@ -118,7 +121,7 @@ def __raw_get(self, key: str) -> str | None: return self.config[self.SECTION].get(key) - def get_str(self, key: str, *, default: str | None = None) -> str: + def get_str(self, key: str, *, default: Optional[str] = None) -> str: """ Return the string referred to by the given key if it exists, or the default. @@ -126,29 +129,28 @@ def get_str(self, key: str, *, default: str | None = None) -> str: """ data = self.__raw_get(key) if data is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default or "" if '\n' in data: - raise ValueError('asked for string, got list') + raise ValueError('Expected string, but got list') return self.__unescape(data) - def get_list(self, key: str, *, default: list | None = None) -> list: + def get_list(self, key: str, *, default: Optional[list] = None) -> list: """ Return the list referred to by the given key if it exists, or the default. Implements :meth:`AbstractConfig.get_list`. """ data = self.__raw_get(key) - if data is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default or [] split = data.split('\n') if split[-1] != ';': raise ValueError('Encoded list does not have trailer sentinel') - return list(map(self.__unescape, split[:-1])) + return [self.__unescape(item) for item in split[:-1]] def get_int(self, key: str, *, default: int = 0) -> int: """ @@ -157,55 +159,47 @@ def get_int(self, key: str, *, default: int = 0) -> int: Implements :meth:`AbstractConfig.get_int`. """ data = self.__raw_get(key) - if data is None: return default try: return int(data) - except ValueError as e: - raise ValueError(f'requested {key=} as int cannot be converted to int') from e + raise ValueError(f'Failed to convert {key=} to int') from e - def get_bool(self, key: str, *, default: bool | None = None) -> bool: + def get_bool(self, key: str, *, default: Optional[bool] = None) -> bool: """ Return the bool referred to by the given key if it exists, or the default. Implements :meth:`AbstractConfig.get_bool`. """ if self.config is None: - raise ValueError('attempt to use a closed config') + raise ValueError('Attempt to use a closed config') data = self.__raw_get(key) if data is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default or False return bool(int(data)) - def set(self, key: str, val: int | str | list[str]) -> None: + def set(self, key: str, val: Union[int, str, List[str]]) -> None: """ Set the given key's data to the given value. Implements :meth:`AbstractConfig.set`. """ if self.config is None: - raise ValueError('attempt to use a closed config') - - to_set: str | None = None + raise ValueError('Attempt to use a closed config') if isinstance(val, bool): to_set = str(int(val)) - elif isinstance(val, str): to_set = self.__escape(val) - elif isinstance(val, int): to_set = str(val) - elif isinstance(val, list): to_set = '\n'.join([self.__escape(s) for s in val] + [';']) - else: - raise ValueError(f'Unexpected type for value {type(val)=}') + raise ValueError(f'Unexpected type for value {type(val).__name__}') self.config.set(self.SECTION, key, to_set) self.save() @@ -217,7 +211,7 @@ def delete(self, key: str, *, suppress=False) -> None: Implements :meth:`AbstractConfig.delete`. """ if self.config is None: - raise ValueError('attempt to use a closed config') + raise ValueError('Attempt to delete from a closed config') self.config.remove_option(self.SECTION, key) self.save() @@ -229,7 +223,7 @@ def save(self) -> None: Implements :meth:`AbstractConfig.save`. """ if self.config is None: - raise ValueError('attempt to use a closed config') + raise ValueError('Attempt to save a closed config') with open(self.filename, 'w', encoding='utf-8') as f: self.config.write(f) @@ -241,4 +235,4 @@ def close(self) -> None: Implements :meth:`AbstractConfig.close`. """ self.save() - self.config = None + self.config = None # type: ignore diff --git a/config/windows.py b/config/windows.py index f7590a923..94a158f1d 100644 --- a/config/windows.py +++ b/config/windows.py @@ -1,6 +1,10 @@ -"""Windows config implementation.""" +""" +windows.py - Windows config implementation. -# spell-checker: words folderid deps hkey edcd +Copyright (c) EDCD, All Rights Reserved +Licensed under the GNU General Public License. +See LICENSE file. +""" import ctypes import functools import pathlib @@ -9,7 +13,6 @@ import winreg from ctypes.wintypes import DWORD, HANDLE from typing import List, Literal, Optional, Union - from config import AbstractConfig, applongname, appname, logger, update_interval assert sys.platform == 'win32' @@ -43,7 +46,8 @@ class WinConfig(AbstractConfig): """Implementation of AbstractConfig for Windows.""" def __init__(self, do_winsparkle=True) -> None: - self.app_dir_path = pathlib.Path(str(known_folder_path(FOLDERID_LocalAppData))) / appname + super().__init__() + self.app_dir_path = pathlib.Path(known_folder_path(FOLDERID_LocalAppData)) / appname # type: ignore self.app_dir_path.mkdir(exist_ok=True) self.plugin_dir_path = self.app_dir_path / 'plugins' @@ -52,19 +56,17 @@ def __init__(self, do_winsparkle=True) -> None: if getattr(sys, 'frozen', False): self.respath_path = pathlib.Path(sys.executable).parent self.internal_plugin_dir_path = self.respath_path / 'plugins' - else: self.respath_path = pathlib.Path(__file__).parent.parent self.internal_plugin_dir_path = self.respath_path / 'plugins' self.home_path = pathlib.Path.home() - journal_dir_str = known_folder_path(FOLDERID_SavedGames) - journaldir = pathlib.Path(journal_dir_str) if journal_dir_str is not None else None - self.default_journal_dir_path = None # type: ignore - if journaldir is not None: - self.default_journal_dir_path = journaldir / 'Frontier Developments' / 'Elite Dangerous' + journal_dir_path = pathlib.Path( + known_folder_path(FOLDERID_SavedGames)) / 'Frontier Developments' / 'Elite Dangerous' # type: ignore + self.default_journal_dir_path = journal_dir_path if journal_dir_path.is_dir() else None # type: ignore + REGISTRY_SUBKEY = r'Software\Marginal\EDMarketConnector' # noqa: N806 create_key_defaults = functools.partial( winreg.CreateKeyEx, key=winreg.HKEY_CURRENT_USER, @@ -72,20 +74,18 @@ def __init__(self, do_winsparkle=True) -> None: ) try: - self.__reg_handle: winreg.HKEYType = create_key_defaults( - sub_key=r'Software\Marginal\EDMarketConnector' - ) + self.__reg_handle: winreg.HKEYType = create_key_defaults(sub_key=REGISTRY_SUBKEY) if do_winsparkle: self.__setup_winsparkle() except OSError: - logger.exception('could not create required registry keys') + logger.exception('Could not create required registry keys') raise self.identifier = applongname if (outdir_str := self.get_str('outdir')) is None or not pathlib.Path(outdir_str).is_dir(): docs = known_folder_path(FOLDERID_Documents) - self.set('outdir', docs if docs is not None else self.home) + self.set("outdir", docs if docs is not None else self.home) def __setup_winsparkle(self): """Ensure the necessary Registry keys for WinSparkle are present.""" @@ -94,31 +94,29 @@ def __setup_winsparkle(self): key=winreg.HKEY_CURRENT_USER, access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY, ) - try: - edcd_handle: winreg.HKEYType = create_key_defaults(sub_key=r'Software\EDCD\EDMarketConnector') - winsparkle_reg: winreg.HKEYType = winreg.CreateKeyEx( - edcd_handle, sub_key='WinSparkle', access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY - ) + try: + with create_key_defaults(sub_key=r'Software\EDCD\EDMarketConnector') as edcd_handle: + with winreg.CreateKeyEx(edcd_handle, sub_key='WinSparkle', + access=winreg.KEY_ALL_ACCESS | winreg.KEY_WOW64_64KEY) as winsparkle_reg: + # Set WinSparkle defaults - https://github.com/vslavik/winsparkle/wiki/Registry-Settings + UPDATE_INTERVAL_NAME = 'UpdateInterval' # noqa: N806 + CHECK_FOR_UPDATES_NAME = 'CheckForUpdates' # noqa: N806 + REG_SZ = winreg.REG_SZ # noqa: N806 + + winreg.SetValueEx(winsparkle_reg, UPDATE_INTERVAL_NAME, REG_RESERVED_ALWAYS_ZERO, REG_SZ, + str(update_interval)) + + try: + winreg.QueryValueEx(winsparkle_reg, CHECK_FOR_UPDATES_NAME) + except FileNotFoundError: + # Key doesn't exist, set it to a default + winreg.SetValueEx(winsparkle_reg, CHECK_FOR_UPDATES_NAME, REG_RESERVED_ALWAYS_ZERO, REG_SZ, + '1') except OSError: - logger.exception('could not open WinSparkle handle') + logger.exception('Could not open WinSparkle handle') raise - # set WinSparkle defaults - https://github.com/vslavik/winsparkle/wiki/Registry-Settings - winreg.SetValueEx( - winsparkle_reg, 'UpdateInterval', REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, str(update_interval) - ) - - try: - winreg.QueryValueEx(winsparkle_reg, 'CheckForUpdates') - - except FileNotFoundError: - # Key doesn't exist, set it to a default - winreg.SetValueEx(winsparkle_reg, 'CheckForUpdates', REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, '1') - - winsparkle_reg.Close() - edcd_handle.Close() - def __get_regentry(self, key: str) -> Union[None, list, str, int]: """Access the Registry for the raw entry.""" try: @@ -127,22 +125,20 @@ def __get_regentry(self, key: str) -> Union[None, list, str, int]: # Key doesn't exist return None - # The type returned is actually as we'd expect for each of these. The casts are here for type checkers and # For programmers who want to actually know what is going on if _type == winreg.REG_SZ: return str(value) - elif _type == winreg.REG_DWORD: + if _type == winreg.REG_DWORD: return int(value) - elif _type == winreg.REG_MULTI_SZ: + if _type == winreg.REG_MULTI_SZ: return list(value) - else: - logger.warning(f'registry key {key=} returned unknown type {_type=} {value=}') - return None + logger.warning(f'Registry key {key=} returned unknown type {_type=} {value=}') + return None - def get_str(self, key: str, *, default: str | None = None) -> str: + def get_str(self, key: str, *, default: Optional[str] = None) -> str: """ Return the string referred to by the given key if it exists, or the default. @@ -152,12 +148,12 @@ def get_str(self, key: str, *, default: str | None = None) -> str: if res is None: return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, str): + if not isinstance(res, str): raise ValueError(f'Data from registry is not a string: {type(res)=} {res=}') return res - def get_list(self, key: str, *, default: list | None = None) -> list: + def get_list(self, key: str, *, default: Optional[list] = None) -> list: """ Return the list referred to by the given key if it exists, or the default. @@ -167,7 +163,7 @@ def get_list(self, key: str, *, default: list | None = None) -> list: if res is None: return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default - elif not isinstance(res, list): + if not isinstance(res, list): raise ValueError(f'Data from registry is not a list: {type(res)=} {res}') return res @@ -187,7 +183,7 @@ def get_int(self, key: str, *, default: int = 0) -> int: return res - def get_bool(self, key: str, *, default: bool | None = None) -> bool: + def get_bool(self, key: str, *, default: Optional[bool] = None) -> bool: """ Return the bool referred to by the given key if it exists, or the default. @@ -195,7 +191,7 @@ def get_bool(self, key: str, *, default: bool | None = None) -> bool: """ res = self.get_int(key, default=default) # type: ignore if res is None: - return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default + return default # Yes it could be None, but we're _assuming_ that people gave us a default return bool(res) @@ -206,12 +202,11 @@ def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: Implements :meth:`AbstractConfig.set`. """ # These are the types that winreg.REG_* below resolve to. - reg_type: Literal[1] | Literal[4] | Literal[7] + reg_type: Union[Literal[1], Literal[4], Literal[7]] if isinstance(val, str): reg_type = winreg.REG_SZ - winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, winreg.REG_SZ, val) - elif isinstance(val, int): # The original code checked for numbers.Integral, I dont think that is needed. + elif isinstance(val, int): reg_type = winreg.REG_DWORD elif isinstance(val, list): @@ -224,7 +219,6 @@ def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: else: raise ValueError(f'Unexpected type for value {type(val)=}') - # Its complaining about the list, it works, tested on windows, ignored. winreg.SetValueEx(self.__reg_handle, key, REG_RESERVED_ALWAYS_ZERO, reg_type, val) # type: ignore def delete(self, key: str, *, suppress=False) -> None: From 28e10b70ef140cf23b5b01695875962f4d807bbd Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Thu, 19 Oct 2023 20:06:52 -0400 Subject: [PATCH 2/5] [2051] Add Tests and gitignore File --- .gitignore | 37 ++++++++++++--- tests/EDMCLogging.py/test_logging_classvar.py | 2 +- tests/config/_old_config.py | 45 +++++++++---------- tests/config/test_config.py | 4 +- tests/journal_lock.py/test_journal_lock.py | 12 +++-- tests/killswitch.py/test_apply.py | 6 +-- tests/killswitch.py/test_killswitch.py | 4 +- 7 files changed, 64 insertions(+), 46 deletions(-) diff --git a/.gitignore b/.gitignore index 35fa75659..4283fcb0f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,18 @@ +# Ignore version file .gitversion + +# Ignore macOS DS_Store files .DS_Store + +# Ignore build artifacts build -ChangeLog.html +dist.win32/ dist.* + +# Ignore generated ChangeLog.html file +ChangeLog.html + +# Ignore files dump *.bak *.pyc @@ -11,20 +21,37 @@ dump *.pdb *.msi *.wixobj +*.zip + +# Ignore Update Things EDMarketConnector_Installer_*.exe appcast_win_*.xml appcast_mac_*.xml -EDMarketConnector.VisualElementsManifest.xml -*.zip EDMC_Installer_Config.iss +EDMarketConnector.wxs +wix/components.wxs +# Ignore Visual Elements Manifest file for Windows +EDMarketConnector.VisualElementsManifest.xml + +# Ignore IDE and editor configuration files .idea .vscode + +# Ignore virtual environments .venv/ venv/ +venv2 + +# Ignore workspace file for Visual Studio Code *.code-workspace + +# Ignore coverage reports htmlcov/ .ignored .coverage -EDMarketConnector.wxs -wix/components.wxs +pylintrc +pylint.txt + +# Ignore Submodule data directory +coriolis-data/ diff --git a/tests/EDMCLogging.py/test_logging_classvar.py b/tests/EDMCLogging.py/test_logging_classvar.py index 24ab009ee..89d3220e9 100644 --- a/tests/EDMCLogging.py/test_logging_classvar.py +++ b/tests/EDMCLogging.py/test_logging_classvar.py @@ -37,7 +37,7 @@ def test_class_logger(caplog: 'LogCaptureFixture') -> None: ClassVarLogger.set_logger(logger) ClassVarLogger.logger.debug('test') # type: ignore # its there ClassVarLogger.logger.info('test2') # type: ignore # its there - log_stuff('test3') # type: ignore # its there + log_stuff('test3') # Dont move these, it relies on the line numbres. assert 'EDMarketConnector.EDMCLogging.py:test_logging_classvar.py:38 test' in caplog.text diff --git a/tests/config/_old_config.py b/tests/config/_old_config.py index 211c1e72f..6eab451bd 100644 --- a/tests/config/_old_config.py +++ b/tests/config/_old_config.py @@ -1,4 +1,4 @@ -# type: ignore +"""Old Configuration Test File.""" import numbers import sys import warnings @@ -6,7 +6,6 @@ from os import getenv, makedirs, mkdir, pardir from os.path import dirname, expanduser, isdir, join, normpath from typing import TYPE_CHECKING, Optional, Union - from config import applongname, appname, update_interval from EDMCLogging import get_main_logger @@ -95,7 +94,7 @@ def known_folder_path(guid: uuid.UUID) -> Optional[str]: from configparser import RawConfigParser -class OldConfig(): +class OldConfig: """Object that holds all configuration data.""" OUT_EDDN_SEND_STATION_DATA = 1 @@ -139,7 +138,7 @@ def __init__(self): self.identifier = f'uk.org.marginal.{appname.lower()}' NSBundle.mainBundle().infoDictionary()['CFBundleIdentifier'] = self.identifier - self.default_journal_dir: str | None = join( + self.default_journal_dir: Optional[str] = join( NSSearchPathForDirectoriesInDomains(NSApplicationSupportDirectory, NSUserDomainMask, True)[0], 'Frontier Developments', 'Elite Dangerous' @@ -159,14 +158,13 @@ def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, l if val is None: return default - elif isinstance(val, str): + if isinstance(val, str): return str(val) - elif isinstance(val, list): + if isinstance(val, list): return list(val) # make writeable - else: - return default + return default def getint(self, key: str, default: int = 0) -> int: """Look up an integer configuration value.""" @@ -202,7 +200,7 @@ def close(self) -> None: elif sys.platform == 'win32': def __init__(self): - self.app_dir = join(known_folder_path(FOLDERID_LocalAppData), appname) # type: ignore # Not going to change + self.app_dir = join(known_folder_path(FOLDERID_LocalAppData), appname) # type: ignore if not isdir(self.app_dir): mkdir(self.app_dir) @@ -223,13 +221,13 @@ def __init__(self): journaldir = known_folder_path(FOLDERID_SavedGames) if journaldir: - self.default_journal_dir: str | None = join(journaldir, 'Frontier Developments', 'Elite Dangerous') + self.default_journal_dir: Optional[str] = join(journaldir, 'Frontier Developments', 'Elite Dangerous') else: self.default_journal_dir = None self.identifier = applongname - self.hkey: ctypes.c_void_p | None = HKEY() + self.hkey: Optional[ctypes.c_void_p] = HKEY() disposition = DWORD() if RegCreateKeyEx( HKEY_CURRENT_USER, @@ -279,7 +277,7 @@ def __init__(self): RegSetValueEx(sparklekey, 'UpdateInterval', 0, 1, buf, len(buf) * 2) RegCloseKey(sparklekey) - if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore # Not going to change + if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore self.set('outdir', known_folder_path(FOLDERID_Documents) or self.home) def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: @@ -304,11 +302,10 @@ def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, l if RegQueryValueEx(self.hkey, key, 0, ctypes.byref(key_type), buf, ctypes.byref(key_size)): return default - elif key_type.value == REG_MULTI_SZ: + if key_type.value == REG_MULTI_SZ: return list(ctypes.wstring_at(buf, len(buf)-2).split('\x00')) - else: - return str(buf.value) + return str(buf.value) def getint(self, key: str, default: int = 0) -> int: """Look up an integer configuration value.""" @@ -328,8 +325,7 @@ def getint(self, key: str, default: int = 0) -> int: ): return default - else: - return key_val.value + return key_val.value def set(self, key: str, val: Union[int, str, list]) -> None: """Set value on the specified configuration key.""" @@ -377,7 +373,7 @@ def __init__(self): mkdir(self.plugin_dir) self.internal_plugin_dir = join(dirname(__file__), 'plugins') - self.default_journal_dir: str | None = None + self.default_journal_dir: Optional[str] = None self.home = expanduser('~') self.respath = dirname(__file__) self.identifier = f'uk.org.marginal.{appname.lower()}' @@ -388,7 +384,7 @@ def __init__(self): self.config = RawConfigParser(comment_prefixes=('#',)) try: - with codecs.open(self.filename, 'r') as h: + with codecs.open(self.filename) as h: self.config.read_file(h) except Exception as e: @@ -407,8 +403,7 @@ def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, l # so we add a spurious ';' entry in set() and remove it here assert val.split('\n')[-1] == ';', val.split('\n') return [self._unescape(x) for x in val.split('\n')[:-1]] - else: - return self._unescape(val) + return self._unescape(val) except NoOptionError: logger.debug(f'attempted to get key {key} that does not exist') @@ -437,10 +432,10 @@ def getint(self, key: str, default: int = 0) -> int: def set(self, key: str, val: Union[int, str, list]) -> None: """Set value on the specified configuration key.""" if isinstance(val, bool): - self.config.set(self.SECTION, key, val and '1' or '0') # type: ignore # Not going to change + self.config.set(self.SECTION, key, val and '1' or '0') - elif isinstance(val, str) or isinstance(val, numbers.Integral): - self.config.set(self.SECTION, key, self._escape(val)) # type: ignore # Not going to change + elif isinstance(val, (numbers.Integral, str)): + self.config.set(self.SECTION, key, self._escape(val)) elif isinstance(val, list): self.config.set(self.SECTION, key, '\n'.join([self._escape(x) for x in val] + [';'])) @@ -460,7 +455,7 @@ def save(self) -> None: def close(self) -> None: """Close the configuration.""" self.save() - self.config = None + self.config = None # type: ignore def _escape(self, val: str) -> str: """Escape a string for storage.""" diff --git a/tests/config/test_config.py b/tests/config/test_config.py index ad6bbd6f2..ae80701c3 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -7,8 +7,6 @@ Most of these tests are parity tests with the "old" config, and likely one day can be entirely removed. """ -from __future__ import annotations - import contextlib import itertools import pathlib @@ -81,7 +79,7 @@ def _build_test_list(static_data, random_data, random_id_name='random_test_{i}') class TestNewConfig: - """Test the new config with an array of hand picked and random data.""" + """Test the new config with an array of hand-picked and random data.""" def __update_linuxconfig(self) -> None: """On linux config uses ConfigParser, which doesn't update from disk changes. Force the update here.""" diff --git a/tests/journal_lock.py/test_journal_lock.py b/tests/journal_lock.py/test_journal_lock.py index c617d52c2..148c2cb5c 100644 --- a/tests/journal_lock.py/test_journal_lock.py +++ b/tests/journal_lock.py/test_journal_lock.py @@ -3,11 +3,9 @@ import os import pathlib import sys -from typing import Generator - +from typing import Generator, Optional import pytest from pytest import MonkeyPatch, TempdirFactory, TempPathFactory - from config import config from journal_lock import JournalLock, JournalLockResult @@ -120,7 +118,7 @@ def mock_journaldir( tmp_path_factory: TempdirFactory ) -> Generator: """Fixture for mocking config.get_str('journaldir').""" - def get_str(key: str, *, default: str | None = None) -> str: + def get_str(key: str, *, default: Optional[str] = None) -> str: """Mock config.*Config get_str to provide fake journaldir.""" if key == 'journaldir': return str(tmp_path_factory.getbasetemp()) @@ -139,10 +137,10 @@ def mock_journaldir_changing( tmp_path_factory: TempdirFactory ) -> Generator: """Fixture for mocking config.get_str('journaldir').""" - def get_str(key: str, *, default: str | None = None) -> str: + def get_str(key: str, *, default: Optional[str] = None) -> str: """Mock config.*Config get_str to provide fake journaldir.""" if key == 'journaldir': - return tmp_path_factory.mktemp("changing") + return tmp_path_factory.mktemp("changing") # type: ignore print('Other key, calling up ...') return config.get_str(key) # Call the non-mocked @@ -301,7 +299,7 @@ def test_obtain_lock_already_locked(self, mock_journaldir: TempPathFactory): # Need to release any handles on the lockfile else the sub-process # might not be able to clean up properly, and that will impact # on later tests. - jlock.journal_dir_lockfile.close() + jlock.journal_dir_lockfile.close() # type: ignore print('Telling sub-process to quit...') exit_q.put('quit') diff --git a/tests/killswitch.py/test_apply.py b/tests/killswitch.py/test_apply.py index c199ec485..63657c696 100644 --- a/tests/killswitch.py/test_apply.py +++ b/tests/killswitch.py/test_apply.py @@ -33,11 +33,11 @@ def test_apply(source: UPDATABLE_DATA, key: str, action: str, to_set: Any, resul def test_apply_errors() -> None: """_apply should fail when passed something that isn't a Sequence or MutableMapping.""" with pytest.raises(ValueError, match=r'Dont know how to'): - killswitch._apply(set(), '0', None, False) # type: ignore # Its intentional that its broken - killswitch._apply(None, '', None) # type: ignore # Its intentional that its broken + killswitch._apply(set(), '0') # type: ignore # Its intentional that its broken + killswitch._apply(None, '') # type: ignore # Its intentional that its broken with pytest.raises(ValueError, match=r'Cannot use string'): - killswitch._apply([], 'test', None, False) + killswitch._apply([], 'test') def test_apply_no_error() -> None: diff --git a/tests/killswitch.py/test_killswitch.py b/tests/killswitch.py/test_killswitch.py index cda672ac7..3b004a481 100644 --- a/tests/killswitch.py/test_killswitch.py +++ b/tests/killswitch.py/test_killswitch.py @@ -1,6 +1,6 @@ """Tests of killswitch behaviour.""" import copy -from typing import Optional +from typing import Optional, List import pytest import semantic_version @@ -85,7 +85,7 @@ def test_operator_precedence( ] ) def test_check_multiple( - names: list[str], input: killswitch.UPDATABLE_DATA, result: killswitch.UPDATABLE_DATA, expected_return: bool + names: List[str], input: killswitch.UPDATABLE_DATA, result: killswitch.UPDATABLE_DATA, expected_return: bool ) -> None: """Check that order is correct when checking multiple killswitches.""" should_return, data = TEST_SET.check_multiple_killswitches(input, *names, version='1.0.0') From cb4a26186a43f67daed506a08ebf1b588bc87f33 Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Fri, 10 Nov 2023 11:22:49 -0500 Subject: [PATCH 3/5] [2051] Revert Linux Changes I can't adequately test this right now, so not touching it. --- config/linux.py | 127 ++++++++++++++++++++++++++---------------------- 1 file changed, 69 insertions(+), 58 deletions(-) diff --git a/config/linux.py b/config/linux.py index 078764812..73100800f 100644 --- a/config/linux.py +++ b/config/linux.py @@ -9,7 +9,6 @@ import pathlib import sys from configparser import ConfigParser -from typing import Optional, Union, List from config import AbstractConfig, appname, logger assert sys.platform == 'linux' @@ -19,97 +18,100 @@ class LinuxConfig(AbstractConfig): """Linux implementation of AbstractConfig.""" SECTION = 'config' - + # TODO: I dislike this, would rather use a sane config file format. But here we are. __unescape_lut = {'\\': '\\', 'n': '\n', ';': ';', 'r': '\r', '#': '#'} __escape_lut = {'\\': '\\', '\n': 'n', ';': ';', '\r': 'r'} - def __init__(self, filename: Optional[str] = None) -> None: - """ - Initialize LinuxConfig instance. - - :param filename: Optional file name to use for configuration storage. - """ + def __init__(self, filename: str | None = None) -> None: super().__init__() - - # Initialize directory paths + # http://standards.freedesktop.org/basedir-spec/latest/ar01s03.html xdg_data_home = pathlib.Path(os.getenv('XDG_DATA_HOME', default='~/.local/share')).expanduser() self.app_dir_path = xdg_data_home / appname self.app_dir_path.mkdir(exist_ok=True, parents=True) self.plugin_dir_path = self.app_dir_path / 'plugins' self.plugin_dir_path.mkdir(exist_ok=True) + self.respath_path = pathlib.Path(__file__).parent.parent + self.internal_plugin_dir_path = self.respath_path / 'plugins' self.default_journal_dir_path = None # type: ignore + self.identifier = f'uk.org.marginal.{appname.lower()}' # TODO: Unused? - # Configure the filename config_home = pathlib.Path(os.getenv('XDG_CONFIG_HOME', default='~/.config')).expanduser() - self.filename = pathlib.Path(filename) if filename is not None else config_home / appname / f'{appname}.ini' + + self.filename = config_home / appname / f'{appname}.ini' + if filename is not None: + self.filename = pathlib.Path(filename) + self.filename.parent.mkdir(exist_ok=True, parents=True) - # Initialize the configuration - self.config = ConfigParser(comment_prefixes=('#',), interpolation=None) - self.config.read(self.filename) + self.config: ConfigParser | None = ConfigParser(comment_prefixes=('#',), interpolation=None) + self.config.read(self.filename) # read() ignores files that dont exist # Ensure that our section exists. This is here because configparser will happily create files for us, but it # does not magically create sections try: - self.config[self.SECTION].get("this_does_not_exist") + self.config[self.SECTION].get("this_does_not_exist", fallback=None) except KeyError: - logger.info("Config section not found. Backing up existing file (if any) and re-adding a section header") + logger.info("Config section not found. Backing up existing file (if any) and readding a section header") if self.filename.exists(): - backup_filename = self.filename.parent / f'{appname}.ini.backup' - backup_filename.write_bytes(self.filename.read_bytes()) + (self.filename.parent / f'{appname}.ini.backup').write_bytes(self.filename.read_bytes()) + self.config.add_section(self.SECTION) - # Set 'outdir' if not specified or invalid - outdir = self.get_str('outdir') - if outdir is None or not pathlib.Path(outdir).is_dir(): + if (outdir := self.get_str('outdir')) is None or not pathlib.Path(outdir).is_dir(): self.set('outdir', self.home) def __escape(self, s: str) -> str: """ - Escape special characters in a string. + Escape a string using self.__escape_lut. - :param s: The input string. - :return: The escaped string. - """ - escaped_chars = [] + This does NOT support multi-character escapes. + :param s: str - String to be escaped. + :return: str - The escaped string. + """ + out = "" for c in s: - escaped_chars.append(self.__escape_lut.get(c, c)) + if c not in self.__escape_lut: + out += c + continue + + out += '\\' + self.__escape_lut[c] - return ''.join(escaped_chars) + return out def __unescape(self, s: str) -> str: """ - Unescape special characters in a string. + Unescape a string. - :param s: The input string. - :return: The unescaped string. + :param s: str - The string to unescape. + :return: str - The unescaped string. """ - unescaped_chars = [] + out: list[str] = [] i = 0 while i < len(s): - current_char = s[i] - if current_char != '\\': - unescaped_chars.append(current_char) + c = s[i] + if c != '\\': + out.append(c) i += 1 continue - if i == len(s) - 1: + # We have a backslash, check what its escaping + if i == len(s)-1: raise ValueError('Escaped string has unescaped trailer') - unescaped = self.__unescape_lut.get(s[i + 1]) + unescaped = self.__unescape_lut.get(s[i+1]) if unescaped is None: - raise ValueError(f'Unknown escape: \\{s[i + 1]}') + raise ValueError(f'Unknown escape: \\ {s[i+1]}') - unescaped_chars.append(unescaped) + out.append(unescaped) i += 2 - return "".join(unescaped_chars) + return "".join(out) - def __raw_get(self, key: str) -> Optional[str]: + def __raw_get(self, key: str) -> str | None: """ Get a raw data value from the config file. @@ -121,7 +123,7 @@ def __raw_get(self, key: str) -> Optional[str]: return self.config[self.SECTION].get(key) - def get_str(self, key: str, *, default: Optional[str] = None) -> str: + def get_str(self, key: str, *, default: str | None = None) -> str: """ Return the string referred to by the given key if it exists, or the default. @@ -129,28 +131,29 @@ def get_str(self, key: str, *, default: Optional[str] = None) -> str: """ data = self.__raw_get(key) if data is None: - return default or "" + return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default if '\n' in data: - raise ValueError('Expected string, but got list') + raise ValueError('asked for string, got list') return self.__unescape(data) - def get_list(self, key: str, *, default: Optional[list] = None) -> list: + def get_list(self, key: str, *, default: list | None = None) -> list: """ Return the list referred to by the given key if it exists, or the default. Implements :meth:`AbstractConfig.get_list`. """ data = self.__raw_get(key) + if data is None: - return default or [] + return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default split = data.split('\n') if split[-1] != ';': raise ValueError('Encoded list does not have trailer sentinel') - return [self.__unescape(item) for item in split[:-1]] + return list(map(self.__unescape, split[:-1])) def get_int(self, key: str, *, default: int = 0) -> int: """ @@ -159,47 +162,55 @@ def get_int(self, key: str, *, default: int = 0) -> int: Implements :meth:`AbstractConfig.get_int`. """ data = self.__raw_get(key) + if data is None: return default try: return int(data) + except ValueError as e: - raise ValueError(f'Failed to convert {key=} to int') from e + raise ValueError(f'requested {key=} as int cannot be converted to int') from e - def get_bool(self, key: str, *, default: Optional[bool] = None) -> bool: + def get_bool(self, key: str, *, default: bool | None = None) -> bool: """ Return the bool referred to by the given key if it exists, or the default. Implements :meth:`AbstractConfig.get_bool`. """ if self.config is None: - raise ValueError('Attempt to use a closed config') + raise ValueError('attempt to use a closed config') data = self.__raw_get(key) if data is None: - return default or False + return default # type: ignore # Yes it could be None, but we're _assuming_ that people gave us a default return bool(int(data)) - def set(self, key: str, val: Union[int, str, List[str]]) -> None: + def set(self, key: str, val: int | str | list[str]) -> None: """ Set the given key's data to the given value. Implements :meth:`AbstractConfig.set`. """ if self.config is None: - raise ValueError('Attempt to use a closed config') + raise ValueError('attempt to use a closed config') + + to_set: str | None = None if isinstance(val, bool): to_set = str(int(val)) + elif isinstance(val, str): to_set = self.__escape(val) + elif isinstance(val, int): to_set = str(val) + elif isinstance(val, list): to_set = '\n'.join([self.__escape(s) for s in val] + [';']) + else: - raise ValueError(f'Unexpected type for value {type(val).__name__}') + raise ValueError(f'Unexpected type for value {type(val)=}') self.config.set(self.SECTION, key, to_set) self.save() @@ -211,7 +222,7 @@ def delete(self, key: str, *, suppress=False) -> None: Implements :meth:`AbstractConfig.delete`. """ if self.config is None: - raise ValueError('Attempt to delete from a closed config') + raise ValueError('attempt to use a closed config') self.config.remove_option(self.SECTION, key) self.save() @@ -223,7 +234,7 @@ def save(self) -> None: Implements :meth:`AbstractConfig.save`. """ if self.config is None: - raise ValueError('Attempt to save a closed config') + raise ValueError('attempt to use a closed config') with open(self.filename, 'w', encoding='utf-8') as f: self.config.write(f) @@ -235,4 +246,4 @@ def close(self) -> None: Implements :meth:`AbstractConfig.close`. """ self.save() - self.config = None # type: ignore + self.config = None From 070a3989a08e343e2373e2a302386db7d984dd18 Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Fri, 10 Nov 2023 11:33:04 -0500 Subject: [PATCH 4/5] [2051] Prevent Typing Reversion Thanks to @norohind for catching this one! I'm not up to date on my PEP585. --- config/__init__.py | 30 ++++++------ config/darwin.py | 10 ++-- config/windows.py | 18 ++++--- plugins/coriolis.py | 9 ++-- plugins/eddn.py | 115 ++++++++++++++++++++++---------------------- plugins/edsm.py | 70 ++++++++++++++------------- plugins/edsy.py | 6 ++- plugins/inara.py | 66 ++++++++++++------------- 8 files changed, 167 insertions(+), 157 deletions(-) diff --git a/config/__init__.py b/config/__init__.py index ae08f3a03..e86a60d16 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -9,6 +9,8 @@ Linux uses a file, but for commonality it's still a flat data structure. macOS uses a 'defaults' object. """ +from __future__ import annotations + __all__ = [ # defined in the order they appear in the file 'GITVERSION_FILE', @@ -42,7 +44,7 @@ import traceback import warnings from abc import abstractmethod -from typing import Any, Callable, Optional, Type, TypeVar, Union, List +from typing import Any, Callable, Type, TypeVar import semantic_version from constants import GITVERSION_FILE, applongname, appname @@ -54,19 +56,19 @@ # NB: Do *not* import this, use the functions appversion() and appversion_nobuild() _static_appversion = '5.10.0-alpha0' -_cached_version: Optional[semantic_version.Version] = None +_cached_version: semantic_version.Version | None = None copyright = '© 2015-2019 Jonathan Harris, 2020-2023 EDCD' update_feed = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/edmarketconnector.xml' update_interval = 8*60*60 # Providers marked to be in debug mode. Generally this is expected to switch to sending data to a log file -debug_senders: List[str] = [] +debug_senders: list[str] = [] # TRACE logging code that should actually be used. Means not spamming it # *all* if only interested in some things. -trace_on: List[str] = [] +trace_on: list[str] = [] capi_pretend_down: bool = False -capi_debug_access_token: Optional[str] = None +capi_debug_access_token: str | None = None # This must be done here in order to avoid an import cycle with EDMCLogging. # Other code should use EDMCLogging.get_main_logger if os.getenv("EDMC_NO_UI"): @@ -232,7 +234,7 @@ def set_eddn_url(self, eddn_url: str): self.__eddn_url = eddn_url @property - def eddn_url(self) -> Optional[str]: + def eddn_url(self) -> str | None: """ Provide the custom EDDN URL. @@ -285,9 +287,9 @@ def default_journal_dir(self) -> str: @staticmethod def _suppress_call( - func: Callable[..., _T], exceptions: Union[Type[BaseException], List[Type[BaseException]]] = Exception, + func: Callable[..., _T], exceptions: Type[BaseException] | list[Type[BaseException]] = Exception, *args: Any, **kwargs: Any - ) -> Optional[_T]: + ) -> _T | None: if exceptions is None: exceptions = [Exception] @@ -301,8 +303,8 @@ def _suppress_call( def get( self, key: str, - default: Union[list, str, bool, int, None] = None - ) -> Union[list, str, bool, int, None]: + default: list | str | bool | int | None = None + ) -> list | str | bool | int | None: """ Return the data for the requested key, or a default. @@ -329,7 +331,7 @@ def get( return default @abstractmethod - def get_list(self, key: str, *, default: Optional[list] = None) -> list: + def get_list(self, key: str, *, default: list | None = None) -> list: """ Return the list referred to by the given key if it exists, or the default. @@ -338,7 +340,7 @@ def get_list(self, key: str, *, default: Optional[list] = None) -> list: raise NotImplementedError @abstractmethod - def get_str(self, key: str, *, default: Optional[str] = None) -> str: + def get_str(self, key: str, *, default: str | None = None) -> str: """ Return the string referred to by the given key if it exists, or the default. @@ -351,7 +353,7 @@ def get_str(self, key: str, *, default: Optional[str] = None) -> str: raise NotImplementedError @abstractmethod - def get_bool(self, key: str, *, default: Optional[bool] = None) -> bool: + def get_bool(self, key: str, *, default: bool | None = None) -> bool: """ Return the bool referred to by the given key if it exists, or the default. @@ -391,7 +393,7 @@ def get_int(self, key: str, *, default: int = 0) -> int: raise NotImplementedError @abstractmethod - def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: + def set(self, key: str, val: int | str | list[str] | bool) -> None: """ Set the given key's data to the given value. diff --git a/config/darwin.py b/config/darwin.py index 68d30306c..9c15ec32d 100644 --- a/config/darwin.py +++ b/config/darwin.py @@ -5,9 +5,11 @@ Licensed under the GNU General Public License. See LICENSE file. """ +from __future__ import annotations + import pathlib import sys -from typing import Any, Dict, List, Union +from typing import Any from Foundation import ( # type: ignore NSApplicationSupportDirectory, NSBundle, NSDocumentDirectory, NSSearchPathForDirectoriesInDomains, NSUserDefaults, NSUserDomainMask @@ -52,14 +54,14 @@ def __init__(self) -> None: self.default_journal_dir_path = support_path / 'Frontier Developments' / 'Elite Dangerous' self._defaults: Any = NSUserDefaults.standardUserDefaults() - self._settings: Dict[str, Union[int, str, list]] = dict( + self._settings: dict[str, int | str | list] = dict( self._defaults.persistentDomainForName_(self.identifier) or {} ) # make writeable if (out_dir := self.get_str('out_dir')) is None or not pathlib.Path(out_dir).exists(): self.set('outdir', NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0]) - def __raw_get(self, key: str) -> Union[None, list, str, int]: + def __raw_get(self, key: str) -> None | list | str | int: """ Retrieve the raw data for the given key. @@ -143,7 +145,7 @@ def get_bool(self, key: str, *, default: bool = None) -> bool: return res - def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: + def set(self, key: str, val: int | str | list[str] | bool) -> None: """ Set the given key's data to the given value. diff --git a/config/windows.py b/config/windows.py index 94a158f1d..7fb53a372 100644 --- a/config/windows.py +++ b/config/windows.py @@ -5,6 +5,8 @@ Licensed under the GNU General Public License. See LICENSE file. """ +from __future__ import annotations + import ctypes import functools import pathlib @@ -12,7 +14,7 @@ import uuid import winreg from ctypes.wintypes import DWORD, HANDLE -from typing import List, Literal, Optional, Union +from typing import Literal from config import AbstractConfig, applongname, appname, logger, update_interval assert sys.platform == 'win32' @@ -32,7 +34,7 @@ CoTaskMemFree.argtypes = [ctypes.c_void_p] -def known_folder_path(guid: uuid.UUID) -> Optional[str]: +def known_folder_path(guid: uuid.UUID) -> str | None: """Look up a Windows GUID to actual folder path name.""" buf = ctypes.c_wchar_p() if SHGetKnownFolderPath(ctypes.create_string_buffer(guid.bytes_le), 0, 0, ctypes.byref(buf)): @@ -117,7 +119,7 @@ def __setup_winsparkle(self): logger.exception('Could not open WinSparkle handle') raise - def __get_regentry(self, key: str) -> Union[None, list, str, int]: + def __get_regentry(self, key: str) -> None | list | str | int: """Access the Registry for the raw entry.""" try: value, _type = winreg.QueryValueEx(self.__reg_handle, key) @@ -138,7 +140,7 @@ def __get_regentry(self, key: str) -> Union[None, list, str, int]: logger.warning(f'Registry key {key=} returned unknown type {_type=} {value=}') return None - def get_str(self, key: str, *, default: Optional[str] = None) -> str: + def get_str(self, key: str, *, default: str | None = None) -> str: """ Return the string referred to by the given key if it exists, or the default. @@ -153,7 +155,7 @@ def get_str(self, key: str, *, default: Optional[str] = None) -> str: return res - def get_list(self, key: str, *, default: Optional[list] = None) -> list: + def get_list(self, key: str, *, default: list | None = None) -> list: """ Return the list referred to by the given key if it exists, or the default. @@ -183,7 +185,7 @@ def get_int(self, key: str, *, default: int = 0) -> int: return res - def get_bool(self, key: str, *, default: Optional[bool] = None) -> bool: + def get_bool(self, key: str, *, default: bool | None = None) -> bool: """ Return the bool referred to by the given key if it exists, or the default. @@ -195,14 +197,14 @@ def get_bool(self, key: str, *, default: Optional[bool] = None) -> bool: return bool(res) - def set(self, key: str, val: Union[int, str, List[str], bool]) -> None: + def set(self, key: str, val: int | str | list[str] | bool) -> None: """ Set the given key's data to the given value. Implements :meth:`AbstractConfig.set`. """ # These are the types that winreg.REG_* below resolve to. - reg_type: Union[Literal[1], Literal[4], Literal[7]] + reg_type: Literal[1] | Literal[4] | Literal[7] if isinstance(val, str): reg_type = winreg.REG_SZ diff --git a/plugins/coriolis.py b/plugins/coriolis.py index 7161a4e07..283b49d8b 100644 --- a/plugins/coriolis.py +++ b/plugins/coriolis.py @@ -19,6 +19,7 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations import base64 import gzip @@ -26,7 +27,7 @@ import json import tkinter as tk from tkinter import ttk -from typing import TYPE_CHECKING, Union, Optional +from typing import TYPE_CHECKING import myNotebook as nb # noqa: N813 # its not my fault. from EDMCLogging import get_main_logger from plug import show_error @@ -80,7 +81,7 @@ def plugin_start3(path: str) -> str: return 'Coriolis' -def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame: +def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: """Set up plugin preferences.""" PADX = 10 # noqa: N806 @@ -130,7 +131,7 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk return conf_frame -def prefs_changed(cmdr: Optional[str], is_beta: bool) -> None: +def prefs_changed(cmdr: str | None, is_beta: bool) -> None: """ Update URLs and override mode based on user preferences. @@ -175,7 +176,7 @@ def _get_target_url(is_beta: bool) -> str: return coriolis_config.normal_url -def shipyard_url(loadout, is_beta) -> Union[str, bool]: +def shipyard_url(loadout, is_beta) -> str | bool: """Return a URL for the current ship.""" # most compact representation string = json.dumps(loadout, ensure_ascii=False, sort_keys=True, separators=(',', ':')).encode('utf-8') diff --git a/plugins/eddn.py b/plugins/eddn.py index d2e7d01ef..ee70d6e9b 100644 --- a/plugins/eddn.py +++ b/plugins/eddn.py @@ -18,6 +18,8 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations + import http import itertools import json @@ -37,9 +39,6 @@ Iterator, Mapping, MutableMapping, - Optional, - Dict, - List, ) from typing import OrderedDict as OrderedDictT from typing import Tuple, Union @@ -86,27 +85,27 @@ def __init__(self): self.odyssey = False # Track location to add to Journal events - self.system_address: Optional[str] = None - self.system_name: Optional[str] = None - self.coordinates: Optional[Tuple] = None - self.body_name: Optional[str] = None - self.body_id: Optional[int] = None - self.body_type: Optional[int] = None - self.station_name: Optional[str] = None - self.station_type: Optional[str] = None - self.station_marketid: Optional[str] = None + self.system_address: str | None = None + self.system_name: str | None = None + self.coordinates: tuple | None = None + self.body_name: str | None = None + self.body_id: int | None = None + self.body_type: int | None = None + self.station_name: str | None = None + self.station_type: str | None = None + self.station_marketid: str | None = None # Track Status.json data - self.status_body_name: Optional[str] = None + self.status_body_name: str | None = None # Avoid duplicates - self.marketId: Optional[str] = None - self.commodities: Optional[List[OrderedDictT[str, Any]]] = None - self.outfitting: Optional[Tuple[bool, List[str]]] = None - self.shipyard: Optional[Tuple[bool, List[Mapping[str, Any]]]] = None + self.marketId: str | None = None + self.commodities: list[OrderedDictT[str, Any]] | None = None + self.outfitting: Tuple[bool, list[str]] | None = None + self.shipyard: Tuple[bool, list[Mapping[str, Any]]] | None = None self.fcmaterials_marketid: int = 0 - self.fcmaterials: Optional[List[OrderedDictT[str, Any]]] = None + self.fcmaterials: list[OrderedDictT[str, Any]] | None = None self.fcmaterials_capi_marketid: int = 0 - self.fcmaterials_capi: Optional[List[OrderedDictT[str, Any]]] = None + self.fcmaterials_capi: list[OrderedDictT[str, Any]] | None = None # For the tkinter parent window, so we can call update_idletasks() self.parent: tk.Tk @@ -395,7 +394,7 @@ def send_message(self, msg: str) -> bool: """ logger.trace_if("plugin.eddn.send", "Sending message") should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('plugins.eddn.send', json.loads(msg)) if should_return: @@ -404,7 +403,7 @@ def send_message(self, msg: str) -> bool: # Even the smallest possible message compresses somewhat, so always compress encoded, compressed = text.gzip(json.dumps(new_data, separators=(',', ':')), max_size=0) - headers: Optional[Dict[str, str]] = None + headers: dict[str, str] | None = None if compressed: headers = {'Content-Encoding': 'gzip'} @@ -612,7 +611,7 @@ def __init__(self, parent: tk.Tk): self.sender = EDDNSender(self, self.eddn_url) - self.fss_signals: List[Mapping[str, Any]] = [] + self.fss_signals: list[Mapping[str, Any]] = [] def close(self): """Close down the EDDN class instance.""" @@ -636,7 +635,7 @@ def export_commodities(self, data: CAPIData, is_beta: bool) -> None: # noqa: CC :param is_beta: whether or not we're currently in beta mode """ should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./market', {}) if should_return: logger.warning("capi.request./market has been disabled by killswitch. Returning.") @@ -653,7 +652,7 @@ def export_commodities(self, data: CAPIData, is_beta: bool) -> None: # noqa: CC modules, ships ) - commodities: List[OrderedDictT[str, Any]] = [] + commodities: list[OrderedDictT[str, Any]] = [] for commodity in data['lastStarport'].get('commodities') or []: # Check 'marketable' and 'not prohibited' if (category_map.get(commodity['categoryname'], True) @@ -726,7 +725,7 @@ def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[dict, dict]: :param data: The raw CAPI data. :return: Sanity-checked data. """ - modules: Dict[str, Any] = data['lastStarport'].get('modules') + modules: dict[str, Any] = data['lastStarport'].get('modules') if modules is None or not isinstance(modules, dict): if modules is None: logger.debug('modules was None. FC or Damaged Station?') @@ -743,13 +742,13 @@ def safe_modules_and_ships(self, data: Mapping[str, Any]) -> Tuple[dict, dict]: # Set a safe value modules = {} - ships: Dict[str, Any] = data['lastStarport'].get('ships') + ships: dict[str, Any] = data['lastStarport'].get('ships') if ships is None or not isinstance(ships, dict): if ships is None: logger.debug('ships was None') else: - logger.error(f'ships was neither None nor a Dict! Type = {type(ships)}') + logger.error(f'ships was neither None nor a dict! Type = {type(ships)}') # Set a safe value ships = {'shipyard_list': {}, 'unavailable_list': []} @@ -769,7 +768,7 @@ def export_outfitting(self, data: CAPIData, is_beta: bool) -> None: :param is_beta: whether or not we're currently in beta mode """ should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) if should_return: logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") @@ -796,7 +795,7 @@ def export_outfitting(self, data: CAPIData, is_beta: bool) -> None: modules.values() ) - outfitting: List[str] = sorted( + outfitting: list[str] = sorted( self.MODULE_RE.sub(lambda match: match.group(0).capitalize(), mod['name'].lower()) for mod in to_search ) @@ -837,7 +836,7 @@ def export_shipyard(self, data: CAPIData, is_beta: bool) -> None: :param is_beta: whether or not we are in beta mode """ should_return: bool - new_data: Dict[str, Any] + new_data: dict[str, Any] should_return, new_data = killswitch.check_killswitch('capi.request./shipyard', {}) if should_return: logger.warning("capi.request./shipyard has been disabled by killswitch. Returning.") @@ -856,7 +855,7 @@ def export_shipyard(self, data: CAPIData, is_beta: bool) -> None: ships ) - shipyard: List[Mapping[str, Any]] = sorted( + shipyard: list[Mapping[str, Any]] = sorted( itertools.chain( (ship['name'].lower() for ship in (ships['shipyard_list'] or {}).values()), (ship['name'].lower() for ship in ships['unavailable_list'] or {}), @@ -899,8 +898,8 @@ def export_journal_commodities(self, cmdr: str, is_beta: bool, entry: Mapping[st :param is_beta: whether or not we're in beta mode :param entry: the journal entry containing the commodities data """ - items: List[Mapping[str, Any]] = entry.get('Items') or [] - commodities: List[OrderedDictT[str, Any]] = sorted((OrderedDict([ + items: list[Mapping[str, Any]] = entry.get('Items') or [] + commodities: list[OrderedDictT[str, Any]] = sorted((OrderedDict([ ('name', self.canonicalise(commodity['Name'])), ('meanPrice', commodity['MeanPrice']), ('buyPrice', commodity['BuyPrice']), @@ -947,11 +946,11 @@ def export_journal_outfitting(self, cmdr: str, is_beta: bool, entry: Mapping[str :param is_beta: Whether or not we're in beta mode :param entry: The relevant journal entry """ - modules: List[Mapping[str, Any]] = entry.get('Items', []) + modules: list[Mapping[str, Any]] = entry.get('Items', []) horizons: bool = entry.get('Horizons', False) # outfitting = sorted([self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), module['Name']) # for module in modules if module['Name'] != 'int_planetapproachsuite']) - outfitting: List[str] = sorted( + outfitting: list[str] = sorted( self.MODULE_RE.sub(lambda m: m.group(0).capitalize(), mod['Name']) for mod in filter(lambda m: m['Name'] != 'int_planetapproachsuite', modules) ) @@ -986,7 +985,7 @@ def export_journal_shipyard(self, cmdr: str, is_beta: bool, entry: Mapping[str, :param is_beta: Whether or not we're in beta mode :param entry: the relevant journal entry """ - ships: List[Mapping[str, Any]] = entry.get('PriceList') or [] + ships: list[Mapping[str, Any]] = entry.get('Pricelist') or [] horizons: bool = entry.get('Horizons', False) shipyard = sorted(ship['ShipType'] for ship in ships) # Don't send empty ships list - shipyard data is only guaranteed present if user has visited the shipyard. @@ -1042,7 +1041,7 @@ def send_message(self, cmdr: str, msg: MutableMapping[str, Any]) -> None: self.sender.send_message_by_id(msg_id) def standard_header( - self, game_version: Optional[str] = None, game_build: Optional[str] = None + self, game_version: str | None = None, game_build: str | None = None ) -> MutableMapping[str, Any]: """ Return the standard header for an EDDN message, given tracked state. @@ -1134,7 +1133,7 @@ def entry_augment_system_data( def export_journal_fssdiscoveryscan( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSDiscoveryScan to EDDN on the correct schema. @@ -1176,7 +1175,7 @@ def export_journal_fssdiscoveryscan( def export_journal_navbeaconscan( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an NavBeaconScan to EDDN on the correct schema. @@ -1218,7 +1217,7 @@ def export_journal_navbeaconscan( def export_journal_codexentry( # noqa: CCR001 self, cmdr: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send a CodexEntry to EDDN on the correct schema. @@ -1320,7 +1319,7 @@ def export_journal_codexentry( # noqa: CCR001 def export_journal_scanbarycentre( self, cmdr: str, system_starpos: list, is_beta: bool, entry: Mapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send a ScanBaryCentre to EDDN on the correct schema. @@ -1374,7 +1373,7 @@ def export_journal_scanbarycentre( def export_journal_navroute( self, cmdr: str, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send a NavRoute to EDDN on the correct schema. @@ -1447,7 +1446,7 @@ def export_journal_navroute( def export_journal_fcmaterials( self, cmdr: str, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FCMaterials message to EDDN on the correct schema. @@ -1531,7 +1530,7 @@ def export_journal_fcmaterials( def export_capi_fcmaterials( self, data: CAPIData, is_beta: bool, horizons: bool - ) -> Optional[str]: + ) -> str | None: """ Send CAPI-sourced 'onfootmicroresources' data on `fcmaterials/1` schema. @@ -1594,7 +1593,7 @@ def export_capi_fcmaterials( def export_journal_approachsettlement( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an ApproachSettlement to EDDN on the correct schema. @@ -1669,7 +1668,7 @@ def export_journal_approachsettlement( def export_journal_fssallbodiesfound( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSAllBodiesFound message to EDDN on the correct schema. @@ -1719,7 +1718,7 @@ def export_journal_fssallbodiesfound( def export_journal_fssbodysignals( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSBodySignals message to EDDN on the correct schema. @@ -1789,7 +1788,7 @@ def enqueue_journal_fsssignaldiscovered(self, entry: MutableMapping[str, Any]) - def export_journal_fsssignaldiscovered( self, cmdr: str, system_name: str, system_starpos: list, is_beta: bool, entry: MutableMapping[str, Any] - ) -> Optional[str]: + ) -> str | None: """ Send an FSSSignalDiscovered message to EDDN on the correct schema. @@ -1892,7 +1891,7 @@ def canonicalise(self, item: str) -> str: match = self.CANONICALISE_RE.match(item) return match and match.group(1) or item - def capi_gameversion_from_host_endpoint(self, capi_host: Optional[str], capi_endpoint: str) -> str: + def capi_gameversion_from_host_endpoint(self, capi_host: str | None, capi_endpoint: str) -> str: """ Return the correct CAPI gameversion string for the given host/endpoint. @@ -1910,7 +1909,7 @@ def capi_gameversion_from_host_endpoint(self, capi_host: Optional[str], capi_end gv = 'CAPI-Legacy-' else: - # Technically incorrect, but it will inform Listeners + # Technically incorrect, but it will inform listeners logger.error(f"{capi_host=} lead to bad gameversion") gv = 'CAPI-UNKNOWN-' ####################################################################### @@ -1924,7 +1923,7 @@ def capi_gameversion_from_host_endpoint(self, capi_host: Optional[str], capi_end gv += 'shipyard' else: - # Technically incorrect, but it will inform Listeners + # Technically incorrect, but it will inform listeners logger.error(f"{capi_endpoint=} lead to bad gameversion") gv += 'UNKNOWN' ####################################################################### @@ -1943,7 +1942,7 @@ def plugin_start3(plugin_dir: str) -> str: return 'EDDN' -def plugin_app(parent: tk.Tk) -> Optional[tk.Frame]: +def plugin_app(parent: tk.Tk) -> tk.Frame | None: """ Set up any plugin-specific UI. @@ -2183,7 +2182,7 @@ def filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]: """ Recursively remove any dict keys with names ending `_Localised` from a dict. - :param d: Dict to filter keys of. + :param d: dict to filter keys of. :return: The filtered dict. """ filtered: OrderedDictT[str, Any] = OrderedDict() @@ -2207,7 +2206,7 @@ def capi_filter_localised(d: Mapping[str, Any]) -> OrderedDictT[str, Any]: """ Recursively remove any dict keys for known CAPI 'localised' names. - :param d: Dict to filter keys of. + :param d: dict to filter keys of. :return: The filtered dict. """ filtered: OrderedDictT[str, Any] = OrderedDict() @@ -2234,7 +2233,7 @@ def journal_entry( # noqa: C901, CCR001 station: str, entry: MutableMapping[str, Any], state: Mapping[str, Any] -) -> Optional[str]: +) -> str | None: """ Process a new Journal entry. @@ -2491,7 +2490,7 @@ def journal_entry( # noqa: C901, CCR001 return None -def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> Optional[str]: +def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> str | None: """ Process new CAPI data for Legacy galaxy. @@ -2510,7 +2509,7 @@ def cmdr_data_legacy(data: CAPIData, is_beta: bool) -> Optional[str]: return cmdr_data(data, is_beta) -def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001 +def cmdr_data(data: CAPIData, is_beta: bool) -> str | None: # noqa: CCR001 """ Process new CAPI data for not-Legacy galaxy (might be beta). @@ -2611,7 +2610,7 @@ def capi_is_horizons(economies: MAP_STR_ANY, modules: MAP_STR_ANY, ships: MAP_ST return economies_colony or modules_horizons or ship_horizons -def dashboard_entry(cmdr: str, is_beta: bool, entry: Dict[str, Any]) -> None: +def dashboard_entry(cmdr: str, is_beta: bool, entry: dict[str, Any]) -> None: """ Process Status.json data to track things like current Body. diff --git a/plugins/edsm.py b/plugins/edsm.py index 2f644dcb9..33af692bb 100644 --- a/plugins/edsm.py +++ b/plugins/edsm.py @@ -18,6 +18,8 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations + import json import threading import tkinter as tk @@ -26,7 +28,7 @@ from threading import Thread from time import sleep from tkinter import ttk -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Mapping, MutableMapping, Optional, Set, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Mapping, MutableMapping, cast import requests import killswitch import monitor @@ -72,27 +74,27 @@ def __init__(self): self.game_build = "" # Handle only sending Live galaxy data - self.legacy_galaxy_last_notified: Optional[datetime] = None + self.legacy_galaxy_last_notified: datetime | None = None self.session: requests.Session = requests.Session() self.session.headers['User-Agent'] = user_agent self.queue: Queue = Queue() # Items to be sent to EDSM by worker thread - self.discarded_events: Set[str] = set() # List discarded events from EDSM - self.lastlookup: Dict[str, Any] # Result of last system lookup + self.discarded_events: set[str] = set() # List discarded events from EDSM + self.lastlookup: dict[str, Any] # Result of last system lookup # Game state self.multicrew: bool = False # don't send captain's ship info to EDSM while on a crew - self.coordinates: Optional[Tuple[int, int, int]] = None + self.coordinates: tuple[int, int, int] | None = None self.newgame: bool = False # starting up - batch initial burst of events self.newgame_docked: bool = False # starting up while docked self.navbeaconscan: int = 0 # batch up burst of Scan events after NavBeaconScan - self.system_link: Optional[tk.Widget] = None - self.system_name: Optional[tk.Tk] = None - self.system_address: Optional[int] = None # Frontier SystemAddress - self.system_population: Optional[int] = None - self.station_link: Optional[tk.Widget] = None - self.station_name: Optional[str] = None - self.station_marketid: Optional[int] = None # Frontier MarketID + self.system_link: tk.Widget | None = None + self.system_name: tk.Tk | None = None + self.system_address: int | None = None # Frontier SystemAddress + self.system_population: int | None = None + self.station_link: tk.Widget | None = None + self.station_name: str | None = None + self.station_marketid: int | None = None # Frontier MarketID self.on_foot = False self._IMG_KNOWN = None @@ -100,21 +102,21 @@ def __init__(self): self._IMG_NEW = None self._IMG_ERROR = None - self.thread: Optional[threading.Thread] = None + self.thread: threading.Thread | None = None - self.log: Optional[tk.IntVar] = None - self.log_button: Optional[ttk.Checkbutton] = None + self.log: tk.IntVar | None = None + self.log_button: ttk.Checkbutton | None = None - self.label: Optional[tk.Widget] = None + self.label: tk.Widget | None = None - self.cmdr_label: Optional[nb.Label] = None - self.cmdr_text: Optional[nb.Label] = None + self.cmdr_label: nb.Label | None = None + self.cmdr_text: nb.Label | None = None - self.user_label: Optional[nb.Label] = None - self.user: Optional[nb.Entry] = None + self.user_label: nb.Label | None = None + self.user: nb.Entry | None = None - self.apikey_label: Optional[nb.Label] = None - self.apikey: Optional[nb.Entry] = None + self.apikey_label: nb.Label | None = None + self.apikey: nb.Entry | None = None this = This() @@ -277,7 +279,7 @@ def toggle_password_visibility(): this.apikey.config(show="*") # type: ignore -def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk.Frame: +def plugin_prefs(parent: ttk.Notebook, cmdr: str | None, is_beta: bool) -> tk.Frame: """ Plugin preferences setup hook. @@ -361,7 +363,7 @@ def plugin_prefs(parent: ttk.Notebook, cmdr: Optional[str], is_beta: bool) -> tk return frame -def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR001 +def prefs_cmdr_changed(cmdr: str | None, is_beta: bool) -> None: # noqa: CCR001 """ Handle the Commander name changing whilst Settings was open. @@ -390,7 +392,7 @@ def prefs_cmdr_changed(cmdr: Optional[str], is_beta: bool) -> None: # noqa: CCR # LANG: We have no data on the current commander this.cmdr_text['text'] = _('None') - to_set: Union[Literal['normal'], Literal['disabled']] = tk.DISABLED + to_set: Literal['normal'] | Literal['disabled'] = tk.DISABLED if cmdr and not is_beta and this.log and this.log.get(): to_set = tk.NORMAL @@ -440,9 +442,9 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: config.set('edsm_out', this.log.get()) if cmdr and not is_beta: - cmdrs: List[str] = config.get_list('edsm_cmdrs', default=[]) - usernames: List[str] = config.get_list('edsm_usernames', default=[]) - apikeys: List[str] = config.get_list('edsm_apikeys', default=[]) + cmdrs: list[str] = config.get_list('edsm_cmdrs', default=[]) + usernames: list[str] = config.get_list('edsm_usernames', default=[]) + apikeys: list[str] = config.get_list('edsm_apikeys', default=[]) if this.user and this.apikey: if cmdr in cmdrs: @@ -460,7 +462,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: config.set('edsm_apikeys', apikeys) -def credentials(cmdr: str) -> Optional[Tuple[str, str]]: +def credentials(cmdr: str) -> tuple[str, str] | None: """ Get credentials for the given commander, if they exist. @@ -635,7 +637,7 @@ def journal_entry( # noqa: C901, CCR001 # Update system data -def cmdr_data(data: CAPIData, is_beta: bool) -> Optional[str]: # noqa: CCR001 +def cmdr_data(data: CAPIData, is_beta: bool) -> str | None: # noqa: CCR001 """ Process new CAPI data. @@ -722,7 +724,7 @@ def worker() -> None: # noqa: CCR001 C901 :return: None """ logger.debug('Starting...') - pending: List[Mapping[str, Any]] = [] # Unsent events + pending: list[Mapping[str, Any]] = [] # Unsent events closing = False cmdr: str = "" last_game_version = "" @@ -744,7 +746,7 @@ def worker() -> None: # noqa: CCR001 C901 logger.debug(f'{this.shutting_down=}, so setting closing = True') closing = True - item: Optional[Tuple[str, str, str, Mapping[str, Any]]] = this.queue.get() + item: tuple[str, str, str, Mapping[str, Any]] | None = this.queue.get() if item: (cmdr, game_version, game_build, entry) = item logger.trace_if(CMDR_EVENTS, f'De-queued ({cmdr=}, {game_version=}, {game_build=}, {entry["event"]=})') @@ -756,7 +758,7 @@ def worker() -> None: # noqa: CCR001 C901 retrying = 0 while retrying < 3: if item is None: - item = cast(Tuple[str, str, str, Mapping[str, Any]], ("", {})) + item = cast(tuple[str, str, str, Mapping[str, Any]], ("", {})) should_skip, new_item = killswitch.check_killswitch( 'plugins.edsm.worker', item, @@ -909,7 +911,7 @@ def worker() -> None: # noqa: CCR001 C901 last_game_build = game_build -def should_send(entries: List[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001 +def should_send(entries: list[Mapping[str, Any]], event: str) -> bool: # noqa: CCR001 """ Whether or not any of the given entries should be sent to EDSM. diff --git a/plugins/edsy.py b/plugins/edsy.py index 0c78a4292..a02d34248 100644 --- a/plugins/edsy.py +++ b/plugins/edsy.py @@ -18,11 +18,13 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations + import base64 import gzip import io import json -from typing import Any, Mapping, Union +from typing import Any, Mapping def plugin_start3(plugin_dir: str) -> str: @@ -36,7 +38,7 @@ def plugin_start3(plugin_dir: str) -> str: # Return a URL for the current ship -def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> Union[bool, str]: +def shipyard_url(loadout: Mapping[str, Any], is_beta: bool) -> bool | str: """ Construct a URL for ship loadout. diff --git a/plugins/inara.py b/plugins/inara.py index efe010df6..2a124f549 100644 --- a/plugins/inara.py +++ b/plugins/inara.py @@ -18,6 +18,7 @@ `build.py` TO ENSURE THE FILES ARE ACTUALLY PRESENT IN AN END-USER INSTALLATION ON WINDOWS. """ +from __future__ import annotations import json import threading @@ -29,9 +30,8 @@ from operator import itemgetter from threading import Lock, Thread from tkinter import ttk -from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Mapping, NamedTuple, Optional +from typing import TYPE_CHECKING, Any, Callable, Deque, Mapping, NamedTuple, Sequence, cast, Union from typing import OrderedDict as OrderedDictT -from typing import Sequence, Union, cast import requests import edmc_data import killswitch @@ -63,8 +63,8 @@ def _(x: str) -> str: class Credentials(NamedTuple): """Credentials holds the set of credentials required to identify an inara API payload to inara.""" - cmdr: Optional[str] - fid: Optional[str] + cmdr: str | None + fid: str | None api_key: str @@ -89,25 +89,25 @@ def __init__(self): self.parent: tk.Tk # Handle only sending Live galaxy data - self.legacy_galaxy_last_notified: Optional[datetime] = None + self.legacy_galaxy_last_notified: datetime | None = None self.lastlocation = None # eventData from the last Commander's Flight Log event self.lastship = None # eventData from the last addCommanderShip or setCommanderShip event # Cached Cmdr state - self.cmdr: Optional[str] = None - self.FID: Optional[str] = None # Frontier ID + self.cmdr: str | None = None + self.FID: str | None = None # Frontier ID self.multicrew: bool = False # don't send captain's ship info to Inara while on a crew self.newuser: bool = False # just entered API Key - send state immediately self.newsession: bool = True # starting a new session - wait for Cargo event self.undocked: bool = False # just undocked self.suppress_docked = False # Skip initial Docked event if started docked - self.cargo: Optional[List[OrderedDictT[str, Any]]] = None - self.materials: Optional[List[OrderedDictT[str, Any]]] = None + self.cargo: list[OrderedDictT[str, Any]] | None = None + self.materials: list[OrderedDictT[str, Any]] | None = None self.last_credits: int = 0 # Send credit update soon after Startup / new game - self.storedmodules: Optional[List[OrderedDictT[str, Any]]] = None - self.loadout: Optional[OrderedDictT[str, Any]] = None - self.fleet: Optional[List[OrderedDictT[str, Any]]] = None + self.storedmodules: list[OrderedDictT[str, Any]] | None = None + self.loadout: OrderedDictT[str, Any] | None = None + self.fleet: list[OrderedDictT[str, Any]] | None = None self.shipswap: bool = False # just swapped ship self.on_foot = False @@ -115,9 +115,9 @@ def __init__(self): # Main window clicks self.system_link: tk.Widget = None # type: ignore - self.system_name: Optional[str] = None # type: ignore - self.system_address: Optional[str] = None # type: ignore - self.system_population: Optional[int] = None + self.system_name: str | None = None # type: ignore + self.system_address: str | None = None # type: ignore + self.system_population: int | None = None self.station_link: tk.Widget = None # type: ignore self.station = None self.station_marketid = None @@ -129,7 +129,7 @@ def __init__(self): self.apikey: nb.Entry self.apikey_label: tk.Label - self.events: Dict[Credentials, Deque[Event]] = defaultdict(deque) + self.events: dict[Credentials, Deque[Event]] = defaultdict(deque) self.event_lock: Lock = threading.Lock() # protects events, for use when rewriting events def filter_events(self, key: Credentials, predicate: Callable[[Event], bool]) -> None: @@ -361,7 +361,7 @@ def prefs_changed(cmdr: str, is_beta: bool) -> None: ) -def credentials(cmdr: Optional[str]) -> Optional[str]: +def credentials(cmdr: str | None) -> str | None: """ Get the credentials for the current commander. @@ -383,7 +383,7 @@ def credentials(cmdr: Optional[str]) -> Optional[str]: def journal_entry( # noqa: C901, CCR001 - cmdr: str, is_beta: bool, system: str, station: str, entry: Dict[str, Any], state: Dict[str, Any] + cmdr: str, is_beta: bool, system: str, station: str, entry: dict[str, Any], state: dict[str, Any] ) -> str: """ Journal entry hook. @@ -394,7 +394,7 @@ def journal_entry( # noqa: C901, CCR001 # causing users to spam Inara with 'URL provider' queries, and we want to # stop that. should_return: bool - new_entry: Dict[str, Any] = {} + new_entry: dict[str, Any] = {} should_return, new_entry = killswitch.check_killswitch('plugins.inara.journal', entry, logger) if should_return: @@ -813,7 +813,7 @@ def journal_entry( # noqa: C901, CCR001 # Fleet if event_name == 'StoredShips': - fleet: List[OrderedDictT[str, Any]] = sorted( + fleet: list[OrderedDictT[str, Any]] = sorted( [OrderedDict({ 'shipType': x['ShipType'], 'shipGameID': x['ShipID'], @@ -860,7 +860,7 @@ def journal_entry( # noqa: C901, CCR001 # Stored modules if event_name == 'StoredModules': items = {mod['StorageSlot']: mod for mod in entry['Items']} # Impose an order - modules: List[OrderedDictT[str, Any]] = [] + modules: list[OrderedDictT[str, Any]] = [] for slot in sorted(items): item = items[slot] module: OrderedDictT[str, Any] = OrderedDict([ @@ -1088,7 +1088,7 @@ def journal_entry( # noqa: C901, CCR001 # # So we're going to do a lot of checking here and bail out if we dont like the look of ANYTHING here - to_send_data: Optional[Dict[str, Any]] = {} # This is a glorified sentinel until lower down. + to_send_data: dict[str, Any] | None = {} # This is a glorified sentinel until lower down. # On Horizons, neither of these exist on TouchDown star_system_name = entry.get('StarSystem', this.system_name) body_name = entry.get('Body', state['Body'] if state['BodyType'] == 'Planet' else None) @@ -1370,7 +1370,7 @@ def cmdr_data(data: CAPIData, is_beta): # noqa: CCR001, reanalyze me later pass -def make_loadout(state: Dict[str, Any]) -> OrderedDictT[str, Any]: # noqa: CCR001 +def make_loadout(state: dict[str, Any]) -> OrderedDictT[str, Any]: # noqa: CCR001 """ Construct an inara loadout from an event. @@ -1440,8 +1440,8 @@ def new_add_event( name: str, timestamp: str, data: EVENT_DATA, - cmdr: Optional[str] = None, - fid: Optional[str] = None + cmdr: str | None = None, + fid: str | None = None ): """ Add a journal event to the queue, to be sent to inara at the next opportunity. @@ -1470,11 +1470,11 @@ def new_add_event( this.events[key].append(Event(name, timestamp, data)) -def clean_event_list(event_list: List[Event]) -> List[Event]: +def clean_event_list(event_list: list[Event]) -> list[Event]: """ Check for killswitched events and remove or modify them as requested. - :param event_list: List of events to clean + :param event_list: list of events to clean :return: Cleaned list of events """ cleaned_events = [] @@ -1533,14 +1533,14 @@ def new_worker(): logger.debug('Done.') -def get_events(clear: bool = True) -> Dict[Credentials, List[Event]]: +def get_events(clear: bool = True) -> dict[Credentials, list[Event]]: """ Fetch a copy of all events from the current queue. :param clear: whether to clear the queues as we go, defaults to True :return: a copy of the event dictionary """ - events_copy: Dict[Credentials, List[Event]] = {} + events_copy: dict[Credentials, list[Event]] = {} with this.event_lock: for key, events in this.events.items(): @@ -1590,7 +1590,7 @@ def send_data(url: str, data: Mapping[str, Any]) -> bool: return True # Regardless of errors above, we DID manage to send it, therefore inform our caller as such -def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any]) -> None: +def handle_api_error(data: Mapping[str, Any], status: int, reply: dict[str, Any]) -> None: """ Handle API error response. @@ -1604,7 +1604,7 @@ def handle_api_error(data: Mapping[str, Any], status: int, reply: Dict[str, Any] plug.show_error(_('Error: Inara {MSG}').format(MSG=error_message)) -def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None: +def handle_success_reply(data: Mapping[str, Any], reply: dict[str, Any]) -> None: """ Handle successful API response. @@ -1619,7 +1619,7 @@ def handle_success_reply(data: Mapping[str, Any], reply: Dict[str, Any]) -> None handle_special_events(data_event, reply_event) -def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply_text: str) -> None: +def handle_individual_error(data_event: dict[str, Any], reply_status: int, reply_text: str) -> None: """ Handle individual API error. @@ -1638,7 +1638,7 @@ def handle_individual_error(data_event: Dict[str, Any], reply_status: int, reply )) -def handle_special_events(data_event: Dict[str, Any], reply_event: Dict[str, Any]) -> None: +def handle_special_events(data_event: dict[str, Any], reply_event: dict[str, Any]) -> None: """ Handle special events in the API response. From 473bd1cdf6b59fad4b159c8eabd2f70116f06efc Mon Sep 17 00:00:00 2001 From: David Sangrey Date: Thu, 16 Nov 2023 13:21:54 -0500 Subject: [PATCH 5/5] [2051] Replace More Old-Style Union/Opts And re-adds the missing but deprecated abstracts in config --- config/__init__.py | 13 ++++++++++- tests/config/_old_config.py | 26 ++++++++++++---------- tests/config/test_config.py | 12 +++++----- tests/journal_lock.py/test_journal_lock.py | 8 ++++--- tests/killswitch.py/test_apply.py | 6 +++-- tests/killswitch.py/test_killswitch.py | 8 +++---- 6 files changed, 45 insertions(+), 28 deletions(-) diff --git a/config/__init__.py b/config/__init__.py index e86a60d16..ca964274e 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -162,12 +162,23 @@ def appversion_nobuild() -> semantic_version.Version: class AbstractConfig(abc.ABC): - """Abstract root class of all platform specific Config implementations.""" + """ + Abstract root class of all platform specific Config implementations. + + Commented lines are no longer supported or replaced. + """ OUT_EDDN_SEND_STATION_DATA = 1 + # OUT_MKT_BPC = 2 # No longer supported OUT_MKT_TD = 4 OUT_MKT_CSV = 8 OUT_SHIP = 16 + # OUT_SHIP_EDS = 16 # Replaced by OUT_SHIP + # OUT_SYS_FILE = 32 # No longer supported + # OUT_STAT = 64 # No longer available + # OUT_SHIP_CORIOLIS = 128 # Replaced by OUT_SHIP + # OUT_SYS_EDSM = 256 # Now a plugin + # OUT_SYS_AUTO = 512 # Now always automatic OUT_MKT_MANUAL = 1024 OUT_EDDN_SEND_NON_STATION = 2048 OUT_EDDN_DELAY = 4096 diff --git a/tests/config/_old_config.py b/tests/config/_old_config.py index 6eab451bd..22f0b18f0 100644 --- a/tests/config/_old_config.py +++ b/tests/config/_old_config.py @@ -1,11 +1,13 @@ """Old Configuration Test File.""" +from __future__ import annotations + import numbers import sys import warnings from configparser import NoOptionError from os import getenv, makedirs, mkdir, pardir from os.path import dirname, expanduser, isdir, join, normpath -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING from config import applongname, appname, update_interval from EDMCLogging import get_main_logger @@ -80,7 +82,7 @@ RegDeleteValue.restype = LONG RegDeleteValue.argtypes = [HKEY, LPCWSTR] - def known_folder_path(guid: uuid.UUID) -> Optional[str]: + def known_folder_path(guid: uuid.UUID) -> str | None: """Look up a Windows GUID to actual folder path name.""" buf = ctypes.c_wchar_p() if SHGetKnownFolderPath(ctypes.create_string_buffer(guid.bytes_le), 0, 0, ctypes.byref(buf)): @@ -138,7 +140,7 @@ def __init__(self): self.identifier = f'uk.org.marginal.{appname.lower()}' NSBundle.mainBundle().infoDictionary()['CFBundleIdentifier'] = self.identifier - self.default_journal_dir: Optional[str] = join( + self.default_journal_dir: str | None = join( NSSearchPathForDirectoriesInDomains(NSApplicationSupportDirectory, NSUserDomainMask, True)[0], 'Frontier Developments', 'Elite Dangerous' @@ -152,7 +154,7 @@ def __init__(self): if not self.get('outdir') or not isdir(str(self.get('outdir'))): self.set('outdir', NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0]) - def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: + def get(self, key: str, default: None | list | str = None) -> None | list | str: """Look up a string configuration value.""" val = self.settings.get(key) if val is None: @@ -179,7 +181,7 @@ def getint(self, key: str, default: int = 0) -> int: logger.debug('The exception type is ...', exc_info=e) return default - def set(self, key: str, val: Union[int, str, list]) -> None: + def set(self, key: str, val: int | str | list) -> None: """Set value on the specified configuration key.""" self.settings[key] = val @@ -221,13 +223,13 @@ def __init__(self): journaldir = known_folder_path(FOLDERID_SavedGames) if journaldir: - self.default_journal_dir: Optional[str] = join(journaldir, 'Frontier Developments', 'Elite Dangerous') + self.default_journal_dir: str | None = join(journaldir, 'Frontier Developments', 'Elite Dangerous') else: self.default_journal_dir = None self.identifier = applongname - self.hkey: Optional[ctypes.c_void_p] = HKEY() + self.hkey: ctypes.c_void_p | None = HKEY() disposition = DWORD() if RegCreateKeyEx( HKEY_CURRENT_USER, @@ -280,7 +282,7 @@ def __init__(self): if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore self.set('outdir', known_folder_path(FOLDERID_Documents) or self.home) - def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: + def get(self, key: str, default: None | list | str = None) -> None | list | str: """Look up a string configuration value.""" key_type = DWORD() key_size = DWORD() @@ -327,7 +329,7 @@ def getint(self, key: str, default: int = 0) -> int: return key_val.value - def set(self, key: str, val: Union[int, str, list]) -> None: + def set(self, key: str, val: int | str | list) -> None: """Set value on the specified configuration key.""" if isinstance(val, str): buf = ctypes.create_unicode_buffer(val) @@ -373,7 +375,7 @@ def __init__(self): mkdir(self.plugin_dir) self.internal_plugin_dir = join(dirname(__file__), 'plugins') - self.default_journal_dir: Optional[str] = None + self.default_journal_dir: str | None = None self.home = expanduser('~') self.respath = dirname(__file__) self.identifier = f'uk.org.marginal.{appname.lower()}' @@ -394,7 +396,7 @@ def __init__(self): if not self.get('outdir') or not isdir(self.get('outdir')): # type: ignore # Not going to change self.set('outdir', expanduser('~')) - def get(self, key: str, default: Union[None, list, str] = None) -> Union[None, list, str]: + def get(self, key: str, default: None | list | str = None) -> None | list | str: """Look up a string configuration value.""" try: val = self.config.get(self.SECTION, key) @@ -429,7 +431,7 @@ def getint(self, key: str, default: int = 0) -> int: return default - def set(self, key: str, val: Union[int, str, list]) -> None: + def set(self, key: str, val: int | str | list) -> None: """Set value on the specified configuration key.""" if isinstance(val, bool): self.config.set(self.SECTION, key, val and '1' or '0') diff --git a/tests/config/test_config.py b/tests/config/test_config.py index ae80701c3..e839afc7a 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -13,7 +13,7 @@ import random import string import sys -from typing import Any, Iterable, List, cast +from typing import Any, Iterable, cast import pytest from pytest import mark @@ -28,12 +28,12 @@ from config import config # noqa: E402 -def _fuzz_list(length: int) -> List[str]: +def _fuzz_list(length: int) -> list[str]: out = [] for _ in range(length): out.append(_fuzz_generators[str](random.randint(0, 1337))) - return cast(List[str], out) + return cast(list[str], out) _fuzz_generators = { # Type annotating this would be a nightmare. @@ -70,7 +70,7 @@ def _get_fuzz(_type: Any, num_values=50, value_length=(0, 10)) -> list: big_int = int(0xFFFFFFFF) # 32 bit int -def _make_params(args: List[Any], id_name: str = 'random_test_{i}') -> list: +def _make_params(args: list[Any], id_name: str = 'random_test_{i}') -> list: return [pytest.param(x, id=id_name.format(i=i)) for i, x in enumerate(args)] @@ -115,7 +115,7 @@ def test_string(self, string: str) -> None: config.delete(name) @mark.parametrize("lst", _build_test_list(list_tests, _get_fuzz(list))) - def test_list(self, lst: List[str]) -> None: + def test_list(self, lst: list[str]) -> None: """Save a list and then ask for it back.""" name = f'list_test_{ hash("".join(lst)) }' config.set(name, lst) @@ -214,7 +214,7 @@ def test_string(self, string: str) -> None: assert res == string @mark.parametrize("lst", _build_test_list(list_tests, _get_fuzz(list))) - def test_list(self, lst: List[str]) -> None: + def test_list(self, lst: list[str]) -> None: """Save a list though the old config, recall it using the new config.""" lst = [x.replace("\r", "") for x in lst] # OldConfig on linux fails to store these correctly if sys.platform == 'win32': diff --git a/tests/journal_lock.py/test_journal_lock.py b/tests/journal_lock.py/test_journal_lock.py index 148c2cb5c..649140c87 100644 --- a/tests/journal_lock.py/test_journal_lock.py +++ b/tests/journal_lock.py/test_journal_lock.py @@ -1,9 +1,11 @@ """Tests for journal_lock.py code.""" +from __future__ import annotations + import multiprocessing as mp import os import pathlib import sys -from typing import Generator, Optional +from typing import Generator import pytest from pytest import MonkeyPatch, TempdirFactory, TempPathFactory from config import config @@ -118,7 +120,7 @@ def mock_journaldir( tmp_path_factory: TempdirFactory ) -> Generator: """Fixture for mocking config.get_str('journaldir').""" - def get_str(key: str, *, default: Optional[str] = None) -> str: + def get_str(key: str, *, default: str | None = None) -> str: """Mock config.*Config get_str to provide fake journaldir.""" if key == 'journaldir': return str(tmp_path_factory.getbasetemp()) @@ -137,7 +139,7 @@ def mock_journaldir_changing( tmp_path_factory: TempdirFactory ) -> Generator: """Fixture for mocking config.get_str('journaldir').""" - def get_str(key: str, *, default: Optional[str] = None) -> str: + def get_str(key: str, *, default: str | None = None) -> str: """Mock config.*Config get_str to provide fake journaldir.""" if key == 'journaldir': return tmp_path_factory.mktemp("changing") # type: ignore diff --git a/tests/killswitch.py/test_apply.py b/tests/killswitch.py/test_apply.py index 63657c696..ab9bb2679 100644 --- a/tests/killswitch.py/test_apply.py +++ b/tests/killswitch.py/test_apply.py @@ -1,6 +1,8 @@ """Test the apply functions used by killswitch to modify data.""" +from __future__ import annotations + import copy -from typing import Any, Optional +from typing import Any import pytest @@ -61,7 +63,7 @@ def test_apply_no_error() -> None: (False, 0), (str((1 << 63)-1), (1 << 63)-1), (True, 1), (str(1 << 1337), 1 << 1337) ] ) -def test_get_int(input: str, expected: Optional[int]) -> None: +def test_get_int(input: str, expected: int | None) -> None: """Check that _get_int doesn't throw when handed bad data.""" assert expected == killswitch._get_int(input) diff --git a/tests/killswitch.py/test_killswitch.py b/tests/killswitch.py/test_killswitch.py index 3b004a481..3c681ded7 100644 --- a/tests/killswitch.py/test_killswitch.py +++ b/tests/killswitch.py/test_killswitch.py @@ -1,7 +1,7 @@ """Tests of killswitch behaviour.""" -import copy -from typing import Optional, List +from __future__ import annotations +import copy import pytest import semantic_version @@ -34,7 +34,7 @@ ], ) def test_killswitch( - input: killswitch.UPDATABLE_DATA, kill: str, should_pass: bool, result: Optional[killswitch.UPDATABLE_DATA], + input: killswitch.UPDATABLE_DATA, kill: str, should_pass: bool, result: killswitch.UPDATABLE_DATA | None, version: str ) -> None: """Simple killswitch tests.""" @@ -85,7 +85,7 @@ def test_operator_precedence( ] ) def test_check_multiple( - names: List[str], input: killswitch.UPDATABLE_DATA, result: killswitch.UPDATABLE_DATA, expected_return: bool + names: list[str], input: killswitch.UPDATABLE_DATA, result: killswitch.UPDATABLE_DATA, expected_return: bool ) -> None: """Check that order is correct when checking multiple killswitches.""" should_return, data = TEST_SET.check_multiple_killswitches(input, *names, version='1.0.0')