diff --git a/mtda-cli b/mtda-cli index ba3e92b7..b041fc2c 100755 --- a/mtda-cli +++ b/mtda-cli @@ -44,8 +44,8 @@ class Application: if isinstance(result, str): print(result) else: - print("Device command '{0}' failed!".format( - " ".join(args)), file=sys.stderr) + print(f"Device command '{' '.join(args)}' failed!", + file=sys.stderr) def console_clear(self, args): self.client().console_clear() @@ -139,7 +139,7 @@ class Application: new_status = server.target_status() if previous_status != new_status: server.console_print( - "\r\n*** Target is now %s ***\r\n" % (new_status)) + f"\r\n*** Target is now {new_status} ***\r\n") elif c == 'q': self.screen.capture_stop() self.exiting = True @@ -153,8 +153,7 @@ class Application: new_status, writing, written = server.storage_status() if new_status != previous_status: server.console_print( - "\r\n*** Storage now connected to " - "%s ***\r\n" % (new_status)) + f"\r\n*** Storage now connected to {new_status} ***\r\n") elif c == 't': server.toggle_timestamps() elif c == 'u': @@ -179,7 +178,7 @@ class Application: r = requests.post(url=endpoint, data=data) server = self.client() server.console_print( - "\r\n*** console buffer posted to %s ***\r\n" % (r.text)) + f"\r\n*** console buffer posted to {r.text} ***\r\n") def console_prompt(self, args): data = self.client().console_prompt(args.prompt) @@ -320,11 +319,11 @@ class Application: def _human_readable_size(self, size): if size < 1024*1024: - return "{:d} KiB".format(int(size/1024)) + return f"{int(size / 1024):d} KiB" elif size < 1024*1024*1024: - return "{:d} MiB".format(int(size/1024/1024)) + return f"{int(size / 1024 / 1024):d} MiB" else: - return "{:.2f} GiB".format(size/1024/1024/1024) + return f"{size / 1024 / 1024 / 1024:.2f} GiB" def _storage_write_cb(self, imgname, totalread, inputsize, totalwritten, imagesize): @@ -370,7 +369,7 @@ class Application: sys.stdout.flush() except Exception as e: msg = e.msg if hasattr(e, 'msg') else str(e) - print("\n'storage write' failed! ({})".format(msg), + print(f"\n'storage write' failed! ({msg})", file=sys.stderr) result = 1 return result @@ -410,7 +409,7 @@ class Application: tgt_status = client.target_status() uptime = "" if tgt_status == "ON": - uptime = " (up %s)" % self.target_uptime() + uptime = f" (up {self.target_uptime()})" try: remote_version = client.agent_version() except (zerorpc.RemoteError) as e: @@ -427,11 +426,11 @@ class Application: socket.gethostname(), host.version, "")) print("Remote : %s (%s)%30s\r" % ( remote, remote_version, "")) - print("Prefix key: : ctrl-%s\r" % (prefix_key)) - print("Session : %s\r" % (session)) + print(f"Prefix key: : ctrl-{prefix_key}\r") + print(f"Session : {session}\r") print("Target : %-6s%s%s\r" % (tgt_status, locked, uptime)) print("Storage on : %-6s%s\r" % (storage_status, locked)) - print("Storage writes : %s (%s)\r" % (written, writing)) + print(f"Storage writes : {written} ({writing})\r") # Print status of the USB ports ports = client.usb_ports() @@ -442,7 +441,7 @@ class Application: # Print video stream details url = client.video_url() if url is not None: - print("Video stream : %s\r" % (url)) + print(f"Video stream : {url}\r") def target_off(self, args=None): status = self.client().target_off() @@ -506,7 +505,7 @@ class Application: def print_version(self): agent = MultiTenantDeviceAccess() - print("MTDA version: %s" % agent.version) + print(f"MTDA version: {agent.version}") def main(self): parser = ArgumentParser( diff --git a/mtda-config b/mtda-config index 4b91e194..dd094208 100755 --- a/mtda-config +++ b/mtda-config @@ -31,7 +31,7 @@ class Config: def __save_as_config(self, config): with open("./.config", "w") as config_file: for key in config: - config_file.write("{}={}\n".format(key, config[key])) + config_file.write(f"{key}={config[key]}\n") def __save_as_ini(self, config, output_file=None): if not output_file: @@ -54,15 +54,14 @@ class Config: config_name = template.group(2).lower() # Write the module_name if module_name != current_module: - config_file.write("\n[{}]\n".format(module_name)) + config_file.write(f"\n[{module_name}]\n") current_module = module_name - config_file.write(" {} = {}\n" - .format(config_name, config[key])) + config_file.write(f" {config_name} = {config[key]}\n") def save_config(self, nodes=[], output_file=None): config_entries = {} for node in nodes: - config = "CONFIG_{}".format(node.item.name) + config = f"CONFIG_{node.item.name}" config_entries[config] = node.item.str_value logging.debug("save_config: %s=%s", config, config_entries[config]) @@ -86,8 +85,7 @@ class Config: if module_name not in skip_list: # Handle invalid cases later (config, value) = line.split("=") - config = "CONFIG_{}_{}".format(module_name, - config.strip()).upper() + config = f"CONFIG_{module_name}_{config.strip()}".upper() config_entries[config] = value.strip() return config_entries @@ -353,7 +351,7 @@ class ConfigApp: "Q/q - quit | Esc - previous screen | Arrow keys - navigation" ) footer_right_text = urwid.Text( - "Kconfig: {}".format(self.kconfig_file), align="right" + f"Kconfig: {self.kconfig_file}", align="right" ) footer_column = urwid.Columns([footer_left_text, footer_right_text]) footer_column = urwid.AttrMap(footer_column, "footer") @@ -377,7 +375,7 @@ def start_config(): "-v", "--version", action="version", - version="mtda config generator {}".format(mtda.__version__), + version=f"mtda config generator {mtda.__version__}", ) parser.add_argument( "-d", @@ -413,12 +411,12 @@ def start_config(): if args.config: if not os.path.exists(args.config): - print("Cannot access config file [ {} ]\n".format(args.config)) + print(f"Cannot access config file [ {args.config} ]\n") parser.print_help() sys.exit(1) if not os.path.exists(kconfig_file): - print("Cannot access Kconfig file [ {} ]\n".format(kconfig_file)) + print(f"Cannot access Kconfig file [ {kconfig_file} ]\n") parser.print_help() sys.exit(1) diff --git a/mtda-service b/mtda-service index 464b390e..a0d26431 100755 --- a/mtda-service +++ b/mtda-service @@ -84,11 +84,11 @@ class Application: name = self.agent.name info = zeroconf.ServiceInfo( type_=deviceType, - name='{}.{}'.format(name, deviceType), + name=f'{name}.{deviceType}', addresses=[socket.inet_aton(self._ip())], port=int(self.agent.ctrlport), properties=props, - server='{}.local.'.format(name)) + server=f'{name}.local.') try: zc.register_service(info) @@ -113,7 +113,7 @@ class Application: def print_version(self): agent = MultiTenantDeviceAccess() - print("MTDA version: %s" % agent.version) + print(f"MTDA version: {agent.version}") def main(self): config = None diff --git a/mtda-systemd-helper b/mtda-systemd-helper index bf23c27f..97d91f91 100755 --- a/mtda-systemd-helper +++ b/mtda-systemd-helper @@ -48,8 +48,7 @@ class Application: sys.exit(0) if config is not None and os.path.exists(config) is False: - print('could not find config file: ' - '{}'.format(config), file=sys.stderr) + print(f'could not find config file: {config}', file=sys.stderr) return 1 self.agent = MultiTenantDeviceAccess() diff --git a/mtda/assistant/homekit.py b/mtda/assistant/homekit.py index 39e8b50e..502a67bc 100644 --- a/mtda/assistant/homekit.py +++ b/mtda/assistant/homekit.py @@ -43,13 +43,13 @@ def get_relay(self, status=None): return "1" if status == "ON" else 0 def relay_changed(self, status): - self.mtda.debug(3, "mtda.assistant.homekit.relay_changed(%s)" % status) + self.mtda.debug(3, f"mtda.assistant.homekit.relay_changed({status})") result = self.get_relay(status) self.relay_on.set_value(result) - self.mtda.debug(3, "mtda.assistant.homekit.relay_changed(): " - "%s" % str(result)) + self.mtda.debug(3, "mtda.assistant.homekit." + f"relay_changed(): {str(result)}") return result def set_relay(self, state): @@ -63,8 +63,8 @@ def set_relay(self, state): self.mtda.target_off('homekit') result = self.get_relay() - self.mtda.debug(3, "mtda.assistant.homekit.set_relay(): " - "%s" % str(result)) + self.mtda.debug(3, "mtda.assistant.homekit." + f"set_relay(): {str(result)}") return result def get_relay_in_use(self, state): @@ -76,8 +76,8 @@ def setup_message(self): pincode = self.driver.state.pincode.decode() result = self.mtda.env_set('homekit-setup-code', pincode, 'homekit') - self.mtda.debug(3, "mtda.assistant.homekit.setup_message(): " - "%s" % str(result)) + self.mtda.debug(3, "mtda.assistant.homekit." + f"setup_message(): {str(result)}") return result @@ -103,8 +103,8 @@ def configure(self, conf): dir = os.path.dirname(self.state) os.makedirs(dir, mode=0o755, exist_ok=True) - self.mtda.debug(3, "mtda.assistant.homekit.configure(): " - "%s" % str(result)) + self.mtda.debug(3, "mtda.assistant.homekit." + f"configure(): {str(result)}") return result def probe(self): diff --git a/mtda/client.py b/mtda/client.py index b7f16f8a..58f0ea91 100644 --- a/mtda/client.py +++ b/mtda/client.py @@ -55,7 +55,7 @@ def __init__(self, host=None, session=None, config_files=None, if name.endswith("'s"): name = name.replace("'s", "") elif USER is not None and HOST is not None: - name = "%s@%s" % (USER, HOST) + name = f"{USER}@{HOST}" else: name = "mtda" self._session = os.getenv('MTDA_SESSION', name) @@ -167,7 +167,7 @@ def storage_mount(self, part=None): def storage_network(self, remote): cmd = '/usr/sbin/nbd-client' if os.path.exists(cmd) is False: - raise RuntimeError('{} not found'.format(cmd)) + raise RuntimeError(f'{cmd} not found') rdev = self._impl.storage_network() if rdev is None: @@ -256,7 +256,7 @@ def storage_write_image(self, path, callback=None): import xml.etree.ElementTree as ET bmap = ET.fromstring(bmap) - print("Discovered bmap file '{}'".format(bmap_path)) + print(f"Discovered bmap file '{bmap_path}'") bmapDict = self.parseBmap(bmap, bmap_path) self._impl.storage_bmap_dict(bmapDict, self._session) image_size = bmapDict['ImageSize'] @@ -311,8 +311,7 @@ def parseBmap(self, bmap, bmap_path): "chksum": child.attrib["chksum"] }) except Exception: - print("Error parsing '%s', probably not a bmap 2.0 file" - % bmap_path) + print(f"Error parsing '{bmap_path}', probably not a bmap 2.0 file") return None return bmapDict @@ -507,7 +506,7 @@ def bmap(self, path): def copy(self): if os.path.exists(self._path) is False: - raise IOError('{}: image not found!'.format(self._path)) + raise IOError(f'{self._path}: image not found!') image = open(self._path, 'rb') try: diff --git a/mtda/console/docker.py b/mtda/console/docker.py index f4d0c331..8912ea4a 100644 --- a/mtda/console/docker.py +++ b/mtda/console/docker.py @@ -36,7 +36,7 @@ def configure(self, conf, role='console'): def probe(self): self.mtda.debug(3, "console.docker.probe()") result = self.docker.variant == "docker" - self.mtda.debug(3, "console.docker.probe(): {}".format(result)) + self.mtda.debug(3, f"console.docker.probe(): {result}") return result def open(self): @@ -55,7 +55,7 @@ def open(self): self.mtda.debug(4, "console.docker.open(): already opened") self._opened = result - self.mtda.debug(3, "console.docker.open(): {}".format(result)) + self.mtda.debug(3, f"console.docker.open(): {result}") return result def close(self): @@ -68,7 +68,7 @@ def close(self): self._socket = None self._opened = False - self.mtda.debug(3, "console.docker.close(): {}".format(result)) + self.mtda.debug(3, f"console.docker.close(): {result}") return result """ Return number of pending bytes to read""" @@ -81,12 +81,12 @@ def pending(self): fcntl.ioctl(self._fd, termios.FIONREAD, avail, 1) result = avail[0] - self.mtda.debug(3, "console.docker.pending(): {}".format(result)) + self.mtda.debug(3, f"console.docker.pending(): {result}") return result """ Read bytes from the console""" def read(self, n=1): - self.mtda.debug(3, "console.docker.read({})".format(n)) + self.mtda.debug(3, f"console.docker.read({n})") result = None if self._opened is True: @@ -101,18 +101,18 @@ def read(self, n=1): if result is None: result = b'' - self.mtda.debug(3, "console.docker.read(): {}".format(result)) + self.mtda.debug(3, f"console.docker.read(): {result}") return result """ Write to the console""" def write(self, data): - self.mtda.debug(3, "console.docker.write(data={})".format(data)) + self.mtda.debug(3, f"console.docker.write(data={data})") result = None if self._opened is True: result = os.write(self._fd, data) - self.mtda.debug(3, "console.docker.write(): {}".format(result)) + self.mtda.debug(3, f"console.docker.write(): {result}") return result diff --git a/mtda/console/logger.py b/mtda/console/logger.py index 01fbaa5d..220a5989 100644 --- a/mtda/console/logger.py +++ b/mtda/console/logger.py @@ -130,7 +130,7 @@ def run(self, cmd): # Send requested command self._clear() - self.write("%s\n" % (cmd)) + self.write(f"{cmd}\n") # Wait for the command to complete self.rx_cond.wait_for(self._matchprompt) @@ -165,10 +165,10 @@ def _match_any(self): result = False line = self._tail(False) if line is not None and self._what in line: - self.mtda.debug(2, "matched '%s'" % str(self._what)) + self.mtda.debug(2, f"matched '{str(self._what)}'") result = True - self.mtda.debug(3, "console.logger._match_any: %s" % str(result)) + self.mtda.debug(3, f"console.logger._match_any: {str(result)}") return result def wait(self, what, timeout=None): @@ -183,7 +183,7 @@ def wait(self, what, timeout=None): if result is True: self.rx_lock.release() - self.mtda.debug(3, "console.logger.wait: %s" % str(result)) + self.mtda.debug(3, f"console.logger.wait: {str(result)}") return result def write(self, data, raw=False): @@ -194,8 +194,8 @@ def write(self, data, raw=False): data = bytes(data, "utf-8") self.console.write(data) except IOError as e: - print("write error on the console ({0})!".format( - e.strerror), file=sys.stderr) + print(f"write error on the console ({e.strerror})!", + file=sys.stderr) def reset_timer(self): self.basetime = 0 @@ -245,7 +245,7 @@ def process_rx(self, data): newdata.extend(b'\r') if self.timestamps is True: elapsed = now - self.basetime - timestr = "[%4.6f] " % elapsed + timestr = f"[{elapsed:4.6f}] " newdata.extend(timestr.encode("utf-8")) linefeeds = linefeeds + 1 data = newdata @@ -334,8 +334,7 @@ def pause(self): self.rx_active.clear() result = self.console.close() - self.mtda.debug(3, "console.logger.pause(): " - "{}".format(result)) + self.mtda.debug(3, f"console.logger.pause(): {result}") return result def resume(self): @@ -346,8 +345,7 @@ def resume(self): if result is True: self.rx_active.set() - self.mtda.debug(3, "console.logger.resume(): " - "{}".format(result)) + self.mtda.debug(3, f"console.logger.resume(): {result}") return result @property diff --git a/mtda/console/qemu.py b/mtda/console/qemu.py index 3173b07e..8b84f144 100644 --- a/mtda/console/qemu.py +++ b/mtda/console/qemu.py @@ -32,7 +32,7 @@ def configure(self, conf, role='console'): def probe(self): self.mtda.debug(3, "console.qemu.probe()") result = self.qemu.variant == "qemu" - self.mtda.debug(3, "console.qemu.probe(): %s" % str(result)) + self.mtda.debug(3, f"console.qemu.probe(): {str(result)}") return result def open(self): @@ -50,7 +50,7 @@ def open(self): else: self.mtda.debug(4, "console.qemu.open(): already opened") - self.mtda.debug(3, "console.qemu.open(): %s" % str(result)) + self.mtda.debug(3, f"console.qemu.open(): {str(result)}") return result def close(self): @@ -58,7 +58,7 @@ def close(self): result = True - self.mtda.debug(3, "console.qemu.close(): %s" % str(result)) + self.mtda.debug(3, f"console.qemu.close(): {str(result)}") return result """ Return number of pending bytes to read""" @@ -71,7 +71,7 @@ def pending(self): fcntl.ioctl(self.rx, termios.FIONREAD, avail, 1) result = avail[0] - self.mtda.debug(3, "console.qemu.pending(): %s" % str(result)) + self.mtda.debug(3, f"console.qemu.pending(): {str(result)}") return result """ Read bytes from the console""" @@ -88,18 +88,18 @@ def read(self, n=1): if result is None: result = b'' - self.mtda.debug(3, "console.qemu.read(): %s" % str(result)) + self.mtda.debug(3, f"console.qemu.read(): {str(result)}") return result """ Write to the console""" def write(self, data): - self.mtda.debug(3, "console.qemu.write(data=%s)" % str(data)) + self.mtda.debug(3, f"console.qemu.write(data={str(data)})") result = None if self.opened is True: result = self.tx.write(data) - self.mtda.debug(3, "console.qemu.write(): %s" % str(result)) + self.mtda.debug(3, f"console.qemu.write(): {str(result)}") return result diff --git a/mtda/console/remote.py b/mtda/console/remote.py index eb121479..da52e451 100644 --- a/mtda/console/remote.py +++ b/mtda/console/remote.py @@ -30,7 +30,7 @@ def __init__(self, host, port, screen): def connect(self): context = zmq.Context() socket = context.socket(zmq.SUB) - socket.connect("tcp://%s:%s" % (self.host, self.port)) + socket.connect(f"tcp://{self.host}:{self.port}") socket.setsockopt(zmq.SUBSCRIBE, self.topic) self.context = context self.socket = socket diff --git a/mtda/console/screen.py b/mtda/console/screen.py index b7e08b81..7321a3c3 100644 --- a/mtda/console/screen.py +++ b/mtda/console/screen.py @@ -32,7 +32,7 @@ def capture_data(self, data): delta = time.monotonic_ns() - self.capture_time delta = delta / 1000000000.0 data = json.dumps(data.decode("latin-1")) - entry = '[%0.6f, "o", %s]\n' % (delta, data) + entry = f'[{delta:0.6f}, "o", {data}]\n' self.capture_fd.write(entry) def capture_enabled(self): diff --git a/mtda/console/serial.py b/mtda/console/serial.py index 8ec1e47e..56e91bab 100644 --- a/mtda/console/serial.py +++ b/mtda/console/serial.py @@ -44,11 +44,10 @@ def configure_systemd(self, dir): result = None if self.port is not None: - dropin = os.path.join(dir, 'auto-dep-{}.conf'.format(self.role)) + dropin = os.path.join(dir, f'auto-dep-{self.role}.conf') SystemdDeviceUnit.create_device_dependency(dropin, self.port) - self.mtda.debug(3, "console.serial.configure_systemd(): " - "{}".format(result)) + self.mtda.debug(3, f"console.serial.configure_systemd(): {result}") return result def probe(self): @@ -62,10 +61,10 @@ def probe(self): self.ser.port = self.port self.ser.baudrate = self.rate else: - self.mtda.debug(1, "console.serial.probe(): " - "{} does not exist".format(self.port)) + self.mtda.debug(1, "console.serial." + f"probe(): {self.port} does not exist") - self.mtda.debug(3, "console.serial.probe(): {}".format(result)) + self.mtda.debug(3, f"console.serial.probe(): {result}") return result def open(self): @@ -82,7 +81,7 @@ def open(self): result = self.opened - self.mtda.debug(3, "console.serial.open(): %s" % str(result)) + self.mtda.debug(3, f"console.serial.open(): {str(result)}") return result def close(self): @@ -99,7 +98,7 @@ def close(self): self.mtda.debug(4, "console.serial.close(): already closed") result = self.opened is False - self.mtda.debug(3, "console.serial.close(): {}".format(result)) + self.mtda.debug(3, f"console.serial.close(): {result}") return result """ Return number of pending bytes to read""" @@ -110,7 +109,7 @@ def pending(self): if self.ser is not None and self.opened is True: result = self.ser.inWaiting() - self.mtda.debug(3, "console.serial.pending(): %s" % str(result)) + self.mtda.debug(3, f"console.serial.pending(): {str(result)}") return result """ Read bytes from the console""" @@ -121,18 +120,18 @@ def read(self, n=1): if self.ser is not None and self.opened is True: result = self.ser.read(n) - self.mtda.debug(3, "console.serial.read(): %s" % str(result)) + self.mtda.debug(3, f"console.serial.read(): {str(result)}") return result """ Write to the console""" def write(self, data): - self.mtda.debug(3, "console.serial.write(data=%s)" % str(data)) + self.mtda.debug(3, f"console.serial.write(data={str(data)})") result = None if self.ser is not None and self.opened is True: result = self.ser.write(data) - self.mtda.debug(3, "console.serial.write(): %s" % str(result)) + self.mtda.debug(3, f"console.serial.write(): {str(result)}") return result diff --git a/mtda/console/telnet.py b/mtda/console/telnet.py index d0173e6c..651f80de 100644 --- a/mtda/console/telnet.py +++ b/mtda/console/telnet.py @@ -45,7 +45,7 @@ def configure(self, conf, role='console'): def probe(self): self.mtda.debug(3, "console.telnet.probe()") result = True - self.mtda.debug(3, "console.telnet.probe(): %s" % str(result)) + self.mtda.debug(3, f"console.telnet.probe(): {str(result)}") return result def open(self): @@ -61,7 +61,7 @@ def open(self): except OSError: result = False - self.mtda.debug(3, "console.telnet.open(): %s" % str(result)) + self.mtda.debug(3, f"console.telnet.open(): {str(result)}") return result def close(self): @@ -74,7 +74,7 @@ def close(self): self.telnet = None result = (self.opened is False) - self.mtda.debug(3, "console.telnet.close(): %s" % str(result)) + self.mtda.debug(3, f"console.telnet.close(): {str(result)}") return result """ Return number of pending bytes to read""" @@ -86,7 +86,7 @@ def pending(self): else: result = False - self.mtda.debug(3, "console.telnet.pending(): %s" % str(result)) + self.mtda.debug(3, f"console.telnet.pending(): {str(result)}") return result """ Read bytes from the console""" @@ -118,7 +118,7 @@ def read(self, n=1): except socket.timeout: self.mtda.debug(2, "console.telnet.read(): timeout!") - self.mtda.debug(3, "console.telnet.read(): %d bytes" % len(data)) + self.mtda.debug(3, f"console.telnet.read(): {len(data)} bytes") return data """ Write to the console""" @@ -131,7 +131,7 @@ def write(self, data): self.mtda.debug(2, "console.telnet.write(): not connected!") result = None - self.mtda.debug(3, "console.telnet.write(): %s" % str(result)) + self.mtda.debug(3, f"console.telnet.write(): {str(result)}") return result diff --git a/mtda/console/usbf.py b/mtda/console/usbf.py index 670f14e4..c911c022 100644 --- a/mtda/console/usbf.py +++ b/mtda/console/usbf.py @@ -31,7 +31,7 @@ def configure(self, conf, role='console'): self.port = "/dev/ttyGS0" if role == "console" else "/dev/ttyGS1" result = Composite.configure(role, conf) - self.mtda.debug(3, "console.usbf.configure(): {}".format(result)) + self.mtda.debug(3, f"console.usbf.configure(): {result}") return result def configure_systemd(self, dir): diff --git a/mtda/discovery.py b/mtda/discovery.py index 678650f3..60d2d88c 100644 --- a/mtda/discovery.py +++ b/mtda/discovery.py @@ -50,7 +50,7 @@ def listen(self): def lookup(self, name): result = None - name = '{}.{}'.format(name, self.domain) + name = f'{name}.{self.domain}' info = self.zeroconf.get_service_info(self.domain, name) if info is not None: result = socket.inet_ntoa(info.addresses[0]) diff --git a/mtda/keyboard/hid.py b/mtda/keyboard/hid.py index 087f1a7b..7ff82f08 100644 --- a/mtda/keyboard/hid.py +++ b/mtda/keyboard/hid.py @@ -38,10 +38,10 @@ def configure(self, conf): result = Composite.configure('keyboard', conf) if 'device' in conf: self.dev = conf['device'] - self.mtda.debug(4, "keyboard.hid.configure(): " - "will use {}".format(self.dev)) + self.mtda.debug(4, "keyboard.hid." + f"configure(): will use {self.dev}") - self.mtda.debug(3, "keyboard.hid.configure(): {}".format(result)) + self.mtda.debug(3, f"keyboard.hid.configure(): {result}") return result def probe(self): @@ -50,10 +50,10 @@ def probe(self): result = True if self.dev is None: result = False - self.mtda.debug(1, "keyboard.hid.probe(): " - "{} not configured".format(self.dev)) + self.mtda.debug(1, "keyboard.hid." + f"probe(): {self.dev} not configured") - self.mtda.debug(3, "keyboard.hid.probe(): {}".format(result)) + self.mtda.debug(3, f"keyboard.hid.probe(): {result}") return result def write_report(self, report): @@ -66,7 +66,7 @@ def write_report(self, report): else: result = 0 - self.mtda.debug(3, "keyboard.hid.write_report(): {}".format(result)) + self.mtda.debug(3, f"keyboard.hid.write_report(): {result}") return result def idle(self): @@ -77,20 +77,18 @@ def idle(self): self.fd.close() self.fd = None - self.mtda.debug(3, "keyboard.hid.idle(): {}".format(result)) + self.mtda.debug(3, f"keyboard.hid.idle(): {result}") return result def press(self, key, mod=0x00, repeat=1): self.mtda.debug(3, "keyboard.hid.press()") if os.path.exists(self.dev) is False: - self.mtda.debug(1, "keyboard.hid.press(): " - "{} not found".format(self.dev)) + self.mtda.debug(1, f"keyboard.hid.press(): {self.dev} not found") return False if self.fd is None: - self.mtda.debug(4, "keyboard.hid.press(): " - "opening {}".format(self.dev)) + self.mtda.debug(4, f"keyboard.hid.press(): opening {self.dev}") self.fd = open(self.dev, mode="r+b", buffering=0) NULL_CHAR = chr(0) diff --git a/mtda/keyboard/qemu.py b/mtda/keyboard/qemu.py index 146535c8..f63b7faa 100644 --- a/mtda/keyboard/qemu.py +++ b/mtda/keyboard/qemu.py @@ -33,7 +33,7 @@ def probe(self): self.mtda.debug(1, "keyboard.qemu.probe(): " "a qemu power controller is required") - self.mtda.debug(3, "keyboard.qemu.probe(): %s" % str(result)) + self.mtda.debug(3, f"keyboard.qemu.probe(): {str(result)}") return result def idle(self): @@ -58,7 +58,7 @@ def press(self, key, repeat=1): while repeat > 0: repeat = repeat - 1 key = symbols[key] if key in symbols else key - self.qemu.cmd("sendkey {}".format(key)) + self.qemu.cmd(f"sendkey {key}") time.sleep(0.1) return result diff --git a/mtda/main.py b/mtda/main.py index 8b6ff239..6b83259e 100644 --- a/mtda/main.py +++ b/mtda/main.py @@ -126,7 +126,7 @@ def command(self, args, session=None): if self.power_locked(session) is False: result = self.power.command(args) - self.mtda.debug(3, "main.command(): %s" % str(result)) + self.mtda.debug(3, f"main.command(): {str(result)}") return result def _composite_start(self): @@ -139,13 +139,13 @@ def _composite_start(self): status, _, _ = self.storage_status() enabled = status == CONSTS.STORAGE.ON_TARGET self.mtda.debug(3, "main._composite_start(): " - "with storage? {}".format(enabled)) + f"with storage? {enabled}") Composite.storage_toggle(enabled) result = Composite.install() elif self.keyboard is not None: result = Composite.install() - self.mtda.debug(3, "main._composite_start(): {}".format(result)) + self.mtda.debug(3, f"main._composite_start(): {result}") return result def _composite_stop(self): @@ -157,7 +157,7 @@ def _composite_stop(self): from mtda.support.usb import Composite Composite.remove() - self.mtda.debug(3, "main._composite_stop(): {}".format(result)) + self.mtda.debug(3, f"main._composite_stop(): {result}") return result def config_set_power_timeout(self, timeout, session=None): @@ -169,8 +169,7 @@ def config_set_power_timeout(self, timeout, session=None): self._power_expiry = None self._session_check() - self.mtda.debug(3, "main.config_set_power_timeout(): " - "{}".format(result)) + self.mtda.debug(3, f"main.config_set_power_timeout(): {result}") return result def config_set_session_timeout(self, timeout, session=None): @@ -190,8 +189,7 @@ def config_set_session_timeout(self, timeout, session=None): self._sessions[s] = now + timeout self._session_check() - self.mtda.debug(3, "main.config_set_session_timeout(): " - "{}".format(result)) + self.mtda.debug(3, f"main.config_set_session_timeout(): {result}") return result def console_prefix_key(self): @@ -224,7 +222,7 @@ def console_getkey(self): result = self.console_input.getkey() except AttributeError: print("Initialize the console using console_init first") - self.mtda.debug(3, "main.console_getkey(): %s" % str(result)) + self.mtda.debug(3, f"main.console_getkey(): {str(result)}") return result def console_init(self): @@ -244,7 +242,7 @@ def console_clear(self, session=None): else: result = None - self.mtda.debug(3, "main.console_clear(): %s" % str(result)) + self.mtda.debug(3, f"main.console_clear(): {str(result)}") return result def console_dump(self, session=None): @@ -259,7 +257,7 @@ def console_dump(self, session=None): if self.console_logger is not None: result = self.console_logger.dump() - self.mtda.debug(3, "main.console_dump(): %s" % str(result)) + self.mtda.debug(3, f"main.console_dump(): {str(result)}") return result def console_flush(self, session=None): @@ -274,7 +272,7 @@ def console_flush(self, session=None): if self.console_logger is not None: result = self.console_logger.flush() - self.mtda.debug(3, "main.console_flush(): %s" % str(result)) + self.mtda.debug(3, f"main.console_flush(): {str(result)}") return result def console_head(self, session=None): @@ -285,7 +283,7 @@ def console_head(self, session=None): if self.console_logger is not None: result = self.console_logger.head() - self.mtda.debug(3, "main.console_head(): %s" % str(result)) + self.mtda.debug(3, f"main.console_head(): {str(result)}") return result def console_lines(self, session=None): @@ -296,7 +294,7 @@ def console_lines(self, session=None): if self.console_logger is not None: result = self.console_logger.lines() - self.mtda.debug(3, "main.console_lines(): %s" % str(result)) + self.mtda.debug(3, f"main.console_lines(): {str(result)}") return result def console_locked(self, session=None): @@ -305,7 +303,7 @@ def console_locked(self, session=None): self._session_check(session) result = self._check_locked(session) - self.mtda.debug(3, "main.console_locked(): %s" % str(result)) + self.mtda.debug(3, f"main.console_locked(): {str(result)}") return result def console_print(self, data, session=None): @@ -316,7 +314,7 @@ def console_print(self, data, session=None): if self.console_logger is not None: result = self.console_logger.print(data) - self.mtda.debug(3, "main.console_print(): %s" % str(result)) + self.mtda.debug(3, f"main.console_print(): {str(result)}") return result def console_prompt(self, newPrompt=None, session=None): @@ -328,7 +326,7 @@ def console_prompt(self, newPrompt=None, session=None): self.console_logger is not None: result = self.console_logger.prompt(newPrompt) - self.mtda.debug(3, "main.console_prompt(): %s" % str(result)) + self.mtda.debug(3, f"main.console_prompt(): {str(result)}") return result def console_remote(self, host, screen): @@ -347,7 +345,7 @@ def console_remote(self, host, screen): else: self.console_output = None - self.mtda.debug(3, "main.console_remote(): %s" % str(result)) + self.mtda.debug(3, f"main.console_remote(): {str(result)}") return result def console_run(self, cmd, session=None): @@ -359,7 +357,7 @@ def console_run(self, cmd, session=None): self.console_logger is not None: result = self.console_logger.run(cmd) - self.mtda.debug(3, "main.console_run(): %s" % str(result)) + self.mtda.debug(3, f"main.console_run(): {str(result)}") return result def console_send(self, data, raw=False, session=None): @@ -371,7 +369,7 @@ def console_send(self, data, raw=False, session=None): self.console_logger is not None: result = self.console_logger.write(data, raw) - self.mtda.debug(3, "main.console_send(): %s" % str(result)) + self.mtda.debug(3, f"main.console_send(): {str(result)}") return result def console_tail(self, session=None): @@ -382,7 +380,7 @@ def console_tail(self, session=None): self.console_logger is not None: result = self.console_logger.tail() - self.mtda.debug(3, "main.console_tail(): %s" % str(result)) + self.mtda.debug(3, f"main.console_tail(): {str(result)}") return result def console_toggle(self, session=None): @@ -395,7 +393,7 @@ def console_toggle(self, session=None): if self.monitor_output is not None: self.monitor_output.toggle() - self.mtda.debug(3, "main.console_toggle(): %s" % str(result)) + self.mtda.debug(3, f"main.console_toggle(): {str(result)}") return result def console_wait(self, what, timeout=None, session=None): @@ -406,12 +404,12 @@ def console_wait(self, what, timeout=None, session=None): if session is not None and timeout is None: timeout = CONSTS.RPC.TIMEOUT self.warn('console_wait() without timeout, ' - 'using default ({})'.format(timeout)) + f'using default ({timeout})') if self.console_locked(session) is False and \ self.console_logger is not None: result = self.console_logger.wait(what, timeout) - self.mtda.debug(3, "main.console_wait(): %s" % str(result)) + self.mtda.debug(3, f"main.console_wait(): {str(result)}") return result def debug(self, level, msg): @@ -420,7 +418,7 @@ def debug(self, level, msg): prefix = "# " else: prefix = "# debug%d: " % level - msg = str(msg).replace("\n", "\n%s ... " % prefix) + msg = str(msg).replace("\n", f"\n{prefix} ... ") lines = msg.splitlines() sys.stderr.buffer.write(prefix.encode("utf-8")) for line in lines: @@ -429,10 +427,10 @@ def debug(self, level, msg): sys.stderr.buffer.flush() def warn(self, msg): - print('warning: {}'.format(msg), file=sys.stderr) + print(f'warning: {msg}', file=sys.stderr) def error(self, msg): - print('error: {}'.format(msg), file=sys.stderr) + print(f'error: {msg}', file=sys.stderr) def env_get(self, name, default=None, session=None): self.mtda.debug(3, "env_get()") @@ -441,7 +439,7 @@ def env_get(self, name, default=None, session=None): if name in self.env: result = self.env[name] - self.mtda.debug(3, "env_get(): %s" % str(result)) + self.mtda.debug(3, f"env_get(): {str(result)}") return result def env_set(self, name, value, session=None): @@ -456,9 +454,9 @@ def env_set(self, name, value, session=None): old_value = value self.env[name] = value - self.env["_%s" % name] = old_value + self.env[f"_{name}"] = old_value - self.mtda.debug(3, "env_set(): %s" % str(result)) + self.mtda.debug(3, f"env_set(): {str(result)}") return result def keyboard_write(self, what, session=None): @@ -501,7 +499,7 @@ def keyboard_write(self, what, session=None): what = what[1:] self.keyboard.write(key) - self.mtda.debug(3, "main.keyboard_write(): {}".format(result)) + self.mtda.debug(3, f"main.keyboard_write(): {result}") return result def monitor_remote(self, host, screen): @@ -522,7 +520,7 @@ def monitor_remote(self, host, screen): else: self.monitor_output = None - self.mtda.debug(3, "main.monitor_remote(): %s" % str(result)) + self.mtda.debug(3, f"main.monitor_remote(): {str(result)}") return result def monitor_send(self, data, raw=False, session=None): @@ -534,7 +532,7 @@ def monitor_send(self, data, raw=False, session=None): self.monitor_logger is not None: result = self.monitor_logger.write(data, raw) - self.mtda.debug(3, "main.monitor_send(): %s" % str(result)) + self.mtda.debug(3, f"main.monitor_send(): {str(result)}") return result def monitor_wait(self, what, timeout=None, session=None): @@ -550,7 +548,7 @@ def monitor_wait(self, what, timeout=None, session=None): self.monitor_logger is not None: result = self.monitor_logger.wait(what, timeout) - self.mtda.debug(3, "main.monitor_wait(): %s" % str(result)) + self.mtda.debug(3, f"main.monitor_wait(): {str(result)}") return result def pastebin_api_key(self): @@ -570,7 +568,7 @@ def power_locked(self, session=None): else: result = self._check_locked(session) - self.mtda.debug(3, "main.power_locked(): %s" % str(result)) + self.mtda.debug(3, f"main.power_locked(): {str(result)}") return result def publish(self, topic, data): @@ -595,7 +593,7 @@ def storage_bytes_written(self, session=None): self._session_check(session) result = self._writer.written - self.mtda.debug(3, "main.storage_bytes_written(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_bytes_written(): {str(result)}") return result def storage_compression(self, compression, session=None): @@ -608,7 +606,7 @@ def storage_compression(self, compression, session=None): result = self._writer.compression.value self._writer.compression = compression - self.mtda.debug(3, "main.storage_compression(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_compression(): {str(result)}") return result def storage_bmap_dict(self, bmapDict, session=None): @@ -620,7 +618,7 @@ def storage_bmap_dict(self, bmapDict, session=None): else: self.storage.setBmap(bmapDict) result = True - self.mtda.debug(3, "main.storage_bmap_dict()(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_bmap_dict()(): {str(result)}") def storage_close(self, session=None): self.mtda.debug(3, "main.storage_close()") @@ -644,7 +642,7 @@ def storage_close(self, session=None): if self.storage is not None: self.storage_locked() - self.mtda.debug(3, "main.storage_close(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_close(): {str(result)}") return result def storage_locked(self, session=None): @@ -681,7 +679,7 @@ def storage_locked(self, session=None): result = True if result is True: - self.mtda.debug(4, "storage_locked(): {}".format(reason)) + self.mtda.debug(4, f"storage_locked(): {reason}") event = CONSTS.STORAGE.LOCKED else: event = CONSTS.STORAGE.UNLOCKED @@ -692,7 +690,7 @@ def storage_locked(self, session=None): self._storage_locked = result self._storage_event(event, reason) - self.mtda.debug(3, "main.storage_locked(): {}".format(result)) + self.mtda.debug(3, f"main.storage_locked(): {result}") return result def storage_mount(self, part=None, session=None): @@ -712,7 +710,7 @@ def storage_mount(self, part=None, session=None): if self.storage is not None: self.storage_locked() - self.mtda.debug(3, "main.storage_mount(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_mount(): {str(result)}") return result def storage_update(self, dst, offset, session=None): @@ -726,10 +724,9 @@ def storage_update(self, dst, offset, session=None): try: result = self.storage.update(dst, offset) except (FileNotFoundError, IOError) as e: - self.mtda.debug(1, "main.storage_update(): " - "%s" % str(e.args[0])) + self.mtda.debug(1, f"main.storage_update(): {str(e.args[0])}") - self.mtda.debug(3, "main.storage_update(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_update(): {str(result)}") return result def storage_network(self, session=None): @@ -749,7 +746,7 @@ def storage_network(self, session=None): with open(conf, 'w') as f: f.write('[mtda-storage]\n') f.write('authfile = /etc/nbd-server/allow\n') - f.write('exportname = {}\n'.format(file)) + f.write(f'exportname = {file}\n') f.close() cmd = ['systemctl', 'restart', 'nbd-server'] @@ -761,7 +758,7 @@ def storage_network(self, session=None): self._storage_event(CONSTS.STORAGE.ON_NETWORK) result = True - self.mtda.debug(3, "main.storage_network(): {}".format(result)) + self.mtda.debug(3, f"main.storage_network(): {result}") return result def storage_open(self, session=None): @@ -799,7 +796,7 @@ def storage_status(self, session=None): self._writer.writing, self._writer.written) - self.mtda.debug(3, "main.storage_status(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_status(): {str(result)}") return result def storage_to_host(self, session=None): @@ -814,7 +811,7 @@ def storage_to_host(self, session=None): self.error('cannot switch storage to host: locked') result = False - self.mtda.debug(3, "main.storage_to_host(): {}".format(result)) + self.mtda.debug(3, f"main.storage_to_host(): {result}") return result def storage_to_target(self, session=None): @@ -830,7 +827,7 @@ def storage_to_target(self, session=None): self.error('cannot switch storage to target: locked') result = False - self.mtda.debug(3, "main.storage_to_target(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_to_target(): {str(result)}") return result def storage_swap(self, session=None): @@ -848,7 +845,7 @@ def storage_swap(self, session=None): result, writing, written = self.storage_status(session) return result - self.mtda.debug(3, "main.storage_swap(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_swap(): {str(result)}") return result def storage_write(self, data, session=None): @@ -883,7 +880,7 @@ def storage_write(self, data, session=None): self.error('storage_write failed: write or decompression error') result = -1 - self.mtda.debug(3, "main.storage_write(): %s" % str(result)) + self.mtda.debug(3, f"main.storage_write(): {str(result)}") return result def systemd_configure(self): @@ -935,7 +932,7 @@ def toggle_timestamps(self): print("no console configured/found!", file=sys.stderr) result = None - self.mtda.debug(3, "main.toggle_timestamps(): %s" % str(result)) + self.mtda.debug(3, f"main.toggle_timestamps(): {str(result)}") return result def target_lock(self, session): @@ -946,12 +943,12 @@ def target_lock(self, session): owner = self.target_owner() if owner is None or owner == session: self._lock_owner = session - self._session_event("LOCKED %s" % session) + self._session_event(f"LOCKED {session}") result = True else: result = False - self.mtda.debug(3, "main.target_lock(): %s" % str(result)) + self.mtda.debug(3, f"main.target_lock(): {str(result)}") return result def target_locked(self, session): @@ -1006,7 +1003,7 @@ def _parse_script(self, script): if script is not None: result = script.replace("... ", " ") - self.mtda.debug(3, "main._parse_script(): %s" % str(result)) + self.mtda.debug(3, f"main._parse_script(): {str(result)}") return result def exec_power_on_script(self): @@ -1015,11 +1012,11 @@ def exec_power_on_script(self): result = None if self.power_on_script: self.mtda.debug(4, "exec_power_on_script(): " - "%s" % self.power_on_script) + f"{self.power_on_script}") env = self._env_for_script() result = exec(self.power_on_script, env) - self.mtda.debug(3, "main.exec_power_on_script(): %s" % str(result)) + self.mtda.debug(3, f"main.exec_power_on_script(): {str(result)}") return result def _target_on(self, session=None): @@ -1058,7 +1055,7 @@ def _target_on(self, session=None): if self.storage is not None: self.storage_locked() - self.mtda.debug(3, "main._target_on(): {}".format(result)) + self.mtda.debug(3, f"main._target_on(): {result}") return result def target_on(self, session=None): @@ -1073,7 +1070,7 @@ def target_on(self, session=None): if self.power_locked(session) is False: result = self._target_on(session) - self.mtda.debug(3, "main.target_on(): {}".format(result)) + self.mtda.debug(3, f"main.target_on(): {result}") return result def exec_power_off_script(self): @@ -1120,7 +1117,7 @@ def _target_off(self, session=None): if self.storage is not None: self.storage_locked() - self.mtda.debug(3, "main._target_off(): {}".format(result)) + self.mtda.debug(3, f"main._target_off(): {result}") return result def target_off(self, session=None): @@ -1135,7 +1132,7 @@ def target_off(self, session=None): if self.power_locked(session) is False: result = self._target_off(session) - self.mtda.debug(3, "main.target_off(): {}".format(result)) + self.mtda.debug(3, f"main.target_off(): {result}") return result def _target_status(self, session=None): @@ -1146,7 +1143,7 @@ def _target_status(self, session=None): else: result = self.power.status() - self.mtda.debug(3, "main._target_status(): {}".format(result)) + self.mtda.debug(3, f"main._target_status(): {result}") return result def target_status(self, session=None): @@ -1155,7 +1152,7 @@ def target_status(self, session=None): with self._power_lock: result = self._target_status(session) - self.mtda.debug(3, "main.target_status(): {}".format(result)) + self.mtda.debug(3, f"main.target_status(): {result}") return result def target_toggle(self, session=None): @@ -1175,7 +1172,7 @@ def target_toggle(self, session=None): else: result = CONSTS.POWER.LOCKED - self.mtda.debug(3, "main.target_toggle(): {}".format(result)) + self.mtda.debug(3, f"main.target_toggle(): {result}") return result def target_unlock(self, session): @@ -1185,11 +1182,11 @@ def target_unlock(self, session): self._session_check(session) with self._session_lock: if self.target_owner() == session: - self._session_event("UNLOCKED %s" % session) + self._session_event(f"UNLOCKED {session}") self._lock_owner = None result = True - self.mtda.debug(3, "main.target_unlock(): %s" % str(result)) + self.mtda.debug(3, f"main.target_unlock(): {str(result)}") return result def target_uptime(self, session=None): @@ -1199,7 +1196,7 @@ def target_uptime(self, session=None): if self._uptime > 0: result = time.monotonic() - self._uptime - self.mtda.debug(3, "main.target_uptime(): %s" % str(result)) + self.mtda.debug(3, f"main.target_uptime(): {str(result)}") return result def usb_find_by_class(self, className, session=None): @@ -1299,14 +1296,13 @@ def usb_toggle(self, ndx, session=None): print("invalid USB switch #" + str(ndx), file=sys.stderr) def video_url(self, host="", opts=None): - self.mtda.debug(3, "main.video_url(host={0}, " - "opts={1})".format(host, opts)) + self.mtda.debug(3, f"main.video_url(host={host}, opts={opts})") result = None if self.video is not None: result = self.video.url(host, opts) - self.mtda.debug(3, "main.video_url(): %s" % str(result)) + self.mtda.debug(3, f"main.video_url(): {str(result)}") return result def load_config(self, remote=None, is_server=False, config_files=None): @@ -1315,8 +1311,7 @@ def load_config(self, remote=None, is_server=False, config_files=None): if config_files is None: config_files = os.getenv('MTDA_CONFIG', self.config_files) - self.mtda.debug(2, "main.load_config(): " - "config_files={}".format(config_files)) + self.mtda.debug(2, f"main.load_config(): config_files={config_files}") self.remote = os.getenv('MTDA_REMOTE', remote) self.is_remote = remote is not None @@ -1343,14 +1338,14 @@ def load_config(self, remote=None, is_server=False, config_files=None): if parser.has_section(sub): try: postconf = None - hook = 'post_configure_{}'.format(sub) + hook = f'post_configure_{sub}' if hasattr(self, hook): postconf = getattr(self, hook) self.load_subsystem(sub, parser, postconf) except configparser.NoOptionError: - self.error("variant not defined for '{}'!".format(sub)) + self.error(f"variant not defined for '{sub}'!") except ImportError: - self.error("{} could not be found/loaded!".format(sub)) + self.error(f"{sub} could not be found/loaded!") # configure additional components if parser.has_section('environment'): self.load_environment(parser) @@ -1391,13 +1386,12 @@ def load_environment(self, parser): for opt in parser.options('environment'): value = parser.get('environment', opt) - self.mtda.debug(4, "main.load_environment(): " - "%s => %s" % (opt, value)) + self.mtda.debug(4, f"main.load_environment(): {opt} => {value}") self.env_set(opt, value) def load_subsystem(self, subsystem, parser, postconf=None): variant = parser.get(subsystem, 'variant') - mod = importlib.import_module("mtda.{}.{}".format(subsystem, variant)) + mod = importlib.import_module(f"mtda.{subsystem}.{variant}") factory = getattr(mod, 'instantiate') instance = factory(self) setattr(self, subsystem, instance) @@ -1458,8 +1452,8 @@ def load_remote_config(self, parser): watcher = mtda.discovery.Watcher(CONSTS.MDNS.TYPE) ip = watcher.lookup(self.remote) if ip is not None: - self.debug(2, "resolved '{}' ({}) " - "using Zeroconf".format(self.remote, ip)) + self.debug(2, f"resolved '{self.remote}' " + f"({ip}) using Zeroconf") self.remote = ip else: self.remote = None @@ -1482,7 +1476,7 @@ def load_timeouts_config(self, parser): self._power_timeout = self._power_timeout * 60 self._session_timeout = self._session_timeout * 60 - self.mtda.debug(3, "main.load_timeouts_config: %s" % str(result)) + self.mtda.debug(3, f"main.load_timeouts_config: {str(result)}") return result def load_ui_config(self, parser): @@ -1529,8 +1523,8 @@ def load_usb_port_config(self, parser, section): except configparser.NoOptionError: print('usb switch variant not defined!', file=sys.stderr) except ImportError: - print('usb switch "%s" could not be found/loaded!' % ( - variant), file=sys.stderr) + print(f'usb switch "{variant}" could not be found/loaded!', + file=sys.stderr) def load_www_config(self, parser): self.mtda.debug(3, "main.load_www_config()") @@ -1539,18 +1533,18 @@ def load_www_config(self, parser): self._www.configure(dict(parser.items('www'))) def notify(self, what, info): - self.mtda.debug(3, "main.notify({},{})".format(what, info)) + self.mtda.debug(3, f"main.notify({what},{info})") result = None if self.socket is not None: import zmq with self._socket_lock: self.socket.send(CONSTS.CHANNEL.EVENTS, flags=zmq.SNDMORE) - self.socket.send_string("{} {}".format(what, info)) + self.socket.send_string(f"{what} {info}") if self._www is not None: self._www.notify(what, info) - self.mtda.debug(3, "main.notify: {}".format(result)) + self.mtda.debug(3, f"main.notify: {result}") return result def start(self): @@ -1584,7 +1578,7 @@ def start(self): import zmq context = zmq.Context() socket = context.socket(zmq.PUB) - socket.bind("tcp://*:%s" % self.conport) + socket.bind(f"tcp://*:{self.conport}") else: socket = None self.socket = socket @@ -1694,7 +1688,7 @@ def stop(self): self.socket = None def _session_check(self, session=None): - self.mtda.debug(3, "main._session_check(%s)" % str(session)) + self.mtda.debug(3, f"main._session_check({str(session)})") events = [] now = time.monotonic() @@ -1705,7 +1699,7 @@ def _session_check(self, session=None): # Register new session if session is not None: if session not in self._sessions: - events.append("ACTIVE %s" % session) + events.append(f"ACTIVE {session}") self._sessions[session] = now + self._session_timeout # Check for inactive sessions @@ -1716,7 +1710,7 @@ def _session_check(self, session=None): if left <= 0: inactive.append(s) for s in inactive: - events.append("INACTIVE %s" % s) + events.append(f"INACTIVE {s}") self._sessions.pop(s, "") # Check if we should arm the auto power-off timer @@ -1741,7 +1735,7 @@ def _session_check(self, session=None): if session == self._lock_owner: self._lock_expiry = now + self._lock_timeout elif now >= self._lock_expiry: - events.append("UNLOCKED %s" % self._lock_owner) + events.append(f"UNLOCKED {self._lock_owner}") self._lock_owner = None # Send event sessions generated above @@ -1754,17 +1748,17 @@ def _session_check(self, session=None): self.mtda.debug(2, "device powered down after {} seconds of " "inactivity".format(self._power_timeout)) - self.mtda.debug(3, "main._session_check: %s" % str(result)) + self.mtda.debug(3, f"main._session_check: {str(result)}") return result def _session_event(self, info): - self.mtda.debug(3, "main._session_event(%s)" % str(info)) + self.mtda.debug(3, f"main._session_event({str(info)})") result = None if info is not None: self.notify(CONSTS.EVENTS.SESSION, info) - self.mtda.debug(3, "main._session_event: %s" % str(result)) + self.mtda.debug(3, f"main._session_event: {str(result)}") return result def _check_locked(self, session): diff --git a/mtda/power/docker.py b/mtda/power/docker.py index c729ac1e..b41d491e 100644 --- a/mtda/power/docker.py +++ b/mtda/power/docker.py @@ -53,11 +53,11 @@ def probe(self): self._client.images.pull(distro, tag=version) result = self._start() - self.mtda.debug(3, "power.docker.probe(): {}".format(result)) + self.mtda.debug(3, f"power.docker.probe(): {result}") return result def _start(self, image=None): - self.mtda.debug(3, "power.docker._start({})".format(image)) + self.mtda.debug(3, f"power.docker._start({image})") if image is None: image = self._image @@ -80,7 +80,7 @@ def _start(self, image=None): hostname=self._name, name=self._name) - self.mtda.debug(3, "power.docker._start(): {}".format(result)) + self.mtda.debug(3, f"power.docker._start(): {result}") return result def _stop(self): @@ -100,11 +100,10 @@ def _stop(self): "container {} has vanished".format(cid)) pass except docker.errors.APIError as e: - self.mtda.debug(1, "power.docker._stop(): " - "{}".format(e)) + self.mtda.debug(1, f"power.docker._stop(): {e}") result = False - self.mtda.debug(3, "power.docker._stop(): {}".format(result)) + self.mtda.debug(3, f"power.docker._stop(): {result}") return result def command(self, args): @@ -123,8 +122,7 @@ def _import_close(self): result = True self._proc = None - self.mtda.debug(3, "power.docker._import_close(): " - "{}".format(result)) + self.mtda.debug(3, f"power.docker._import_close(): {result}") return result def import_close(self): @@ -144,8 +142,7 @@ def _import_open(self): stdout=subprocess.PIPE) result = self._proc.stdin - self.mtda.debug(3, "power.docker._import_open(): " - "{}".format(result)) + self.mtda.debug(3, f"power.docker._import_open(): {result}") return result def import_open(self): @@ -164,15 +161,14 @@ def _on(self): self._container.start() self._container = self._client.containers.get(cid) except docker.errors.NotFound: - self.mtda.debug(1, "power.docker._on(): " - "container {} has vanished".format(cid)) + self.mtda.debug(1, "power.docker." + f"_on(): container {cid} has vanished") result = False except docker.errors.APIError as e: - self.mtda.debug(1, "power.docker._on(): " - "{}".format(e)) + self.mtda.debug(1, f"power.docker._on(): {e}") result = False - self.mtda.debug(3, "power.docker._on(): {}".format(result)) + self.mtda.debug(3, f"power.docker._on(): {result}") return result def on(self): @@ -188,15 +184,14 @@ def _off(self): self._container.stop() self._container = self._client.containers.get(cid) except docker.errors.NotFound: - self.mtda.debug(1, "power.docker._off(): " - "container {} has vanished".format(cid)) + self.mtda.debug(1, "power.docker." + f"_off(): container {cid} has vanished") result = False except docker.errors.APIError as e: - self.mtda.debug(1, "power.docker._off(): " - "{}".format(e)) + self.mtda.debug(1, f"power.docker._off(): {e}") result = False - self.mtda.debug(3, "power.docker._off(): {}".format(result)) + self.mtda.debug(3, f"power.docker._off(): {result}") return result def off(self): @@ -213,7 +208,7 @@ def _socket(self): 'stderr': True, 'stream': True}) - self.mtda.debug(3, "power.docker._socket(): {}".format(result)) + self.mtda.debug(3, f"power.docker._socket(): {result}") return result def socket(self): @@ -231,10 +226,10 @@ def _status(self): elif status == "created" or status == "exited": result = self.POWER_OFF else: - self.mtda.debug(1, "power.docker._status(): " - "unknown status: {}".format(status)) + self.mtda.debug(1, "power.docker." + f"_status(): unknown status: {status}") - self.mtda.debug(3, "power.docker._status(): {}".format(result)) + self.mtda.debug(3, f"power.docker._status(): {result}") return result def status(self): diff --git a/mtda/power/gpio.py b/mtda/power/gpio.py index 88912ef5..f81e79e8 100644 --- a/mtda/power/gpio.py +++ b/mtda/power/gpio.py @@ -34,7 +34,7 @@ def configure(self, conf): for gpio in conf['gpio'].split(','): self.gpiopair.append(itemgetter(0, 1)(gpio.split('@'))) - self.mtda.debug(3, "power.gpio.configure(): {}".format(result)) + self.mtda.debug(3, f"power.gpio.configure(): {result}") return result def probe(self): @@ -55,19 +55,18 @@ def probe(self): for line in self.lines: try: if line.is_used() is False: - self.mtda.debug(3, "power.gpiochip{}@pin{} is free for " - "use" .format(chip, pin)) + self.mtda.debug(3, f"power.gpiochip{chip}@pin{pin} " + "is free for use") line.request(consumer='mtda', type=gpiod.LINE_REQ_DIR_OUT) else: raise ValueError("gpiochip{}@pin{} is in use by other " "service" .format(chip, pin)) except OSError: - self.mtda.debug(3, "line {} is not configured correctly" - .format(line)) + self.mtda.debug(3, f"line {line} is not configured correctly") result = True - self.mtda.debug(3, "power.gpio.probe(): {}".format(result)) + self.mtda.debug(3, f"power.gpio.probe(): {result}") return result def command(self, args): @@ -75,7 +74,7 @@ def command(self, args): result = False - self.mtda.debug(3, "power.gpio.command(): {}".format(result)) + self.mtda.debug(3, f"power.gpio.command(): {result}") return result def on(self): @@ -87,7 +86,7 @@ def on(self): line.set_value(1) result = self.status() == self.POWER_ON - self.mtda.debug(3, "power.gpio.on(): {}".format(result)) + self.mtda.debug(3, f"power.gpio.on(): {result}") return result def off(self): @@ -99,7 +98,7 @@ def off(self): line.set_value(0) result = self.status() == self.POWER_OFF - self.mtda.debug(3, "power.gpio.off(): {}".format(result)) + self.mtda.debug(3, f"power.gpio.off(): {result}") return result def status(self): @@ -114,15 +113,14 @@ def status(self): value = self.POWER_ON else: value = self.POWER_OFF - self.mtda.debug(3, "power.gpio.status: " - "line {} is {}" .format(line, value)) + self.mtda.debug(3, f"power.gpio.status: line {line} is {value}") if first is True: first = False result = value elif value != result: result = self.POWER_UNSURE - self.mtda.debug(3, "power.gpio.status(): {}".format(result)) + self.mtda.debug(3, f"power.gpio.status(): {result}") return result diff --git a/mtda/power/qemu.py b/mtda/power/qemu.py index 6f79d434..81cc5923 100644 --- a/mtda/power/qemu.py +++ b/mtda/power/qemu.py @@ -82,8 +82,8 @@ def configure(self, conf): self.watchdog = conf['watchdog'] n = 0 while True: - key = 'storage.{}'.format(n) - sizekey = 'storage.{}.size'.format(n) + key = f'storage.{n}' + sizekey = f'storage.{n}.size' if key in conf: path = os.path.realpath(conf[key]) size = int(conf[sizekey]) if sizekey in conf else 16 @@ -97,11 +97,11 @@ def probe(self): if self.executable is None: raise ValueError("qemu executable not specified!") - result = os.system("%s --version" % self.executable) + result = os.system(f"{self.executable} --version") if result != 0: - raise ValueError("could not execute %s!" % self.executable) + raise ValueError(f"could not execute {self.executable}!") if self.swtpm is not None and os.path.exists(self.swtpm) is False: - raise ValueError("swtpm (%s) could not be found!" % self.swtpm) + raise ValueError(f"swtpm ({self.swtpm}) could not be found!") def getpid(self, pidfile, timeout=30): result = 0 @@ -154,27 +154,27 @@ def start(self): options += " -serial pipe:/tmp/qemu-serial" options += " -device e1000,netdev=net0" options += " -netdev user,id=net0," - options += "hostfwd=tcp::2222-:22,hostname={0}".format(self.hostname) + options += f"hostfwd=tcp::2222-:22,hostname={self.hostname}" options += " -device qemu-xhci" options += " -vnc :0,websocket=on" # extra options if self.bios is not None: - options += " -bios %s" % self.bios + options += f" -bios {self.bios}" if self.cpu is not None: - options += " -cpu %s" % self.cpu + options += f" -cpu {self.cpu}" if self.smp is not None: if self.smp == 0: - options += " -smp %s" % multiprocessing.cpu_count() + options += f" -smp {multiprocessing.cpu_count()}" else: - options += " -smp %s" % self.smp + options += f" -smp {self.smp}" if self.machine is not None: - options += " -machine %s" % self.machine + options += f" -machine {self.machine}" if self.pflash_ro is not None: if pathlib.Path(self.pflash_ro).is_file(): if os.access(self.pflash_ro, os.R_OK): options += " -drive if=pflash,format=raw," - options += "readonly=on,file=%s" % self.pflash_ro + options += f"readonly=on,file={self.pflash_ro}" else: raise ValueError("Read-only pflash file (%s) " "cannot be read." % self.pflash_ro) @@ -184,7 +184,7 @@ def start(self): if self.pflash_rw is not None: try: options += " -drive if=pflash,format=raw," - options += "file=%s" % self.pflash_rw + options += f"file={self.pflash_rw}" if pathlib.Path(self.pflash_rw).is_file(): if not os.access(self.pflash_rw, os.W_OK): raise ValueError("Writeable pflash file (%s) has no " @@ -207,13 +207,13 @@ def start(self): "%s" % (self.pflash_rw, e)) if len(self.drives) > 0: for drv, size in self.drives: - options += " -drive file={},media=disk,format=raw".format(drv) + options += f" -drive file={drv},media=disk,format=raw" if os.path.exists(drv) is False: sparse = pathlib.Path(drv) sparse.touch() os.truncate(str(sparse), size*1024*1024*1024) if self.watchdog is not None: - options += " -device {},id=watchdog0".format(self.watchdog) + options += f" -device {self.watchdog},id=watchdog0" # swtpm options if self.swtpm is not None: @@ -224,7 +224,7 @@ def start(self): + " socket -d" + " --tpmstate dir=/tmp/qemu-swtpm" + " --ctrl type=unixio,path=/tmp/qemu-swtpm/sock" - + " --pid file=%s --tpm2" % pidfile.name) + + f" --pid file={pidfile.name} --tpm2") if result == 0: self.pidOfSwTpm = self.getpid(pidfile.name) self.mtda.debug(2, "power.qemu.start(): " @@ -246,7 +246,7 @@ def start(self): if self.mtda.www is not None and os.path.exists(self.websockify): # bind on the same address as our web service host = self.mtda.www.host - cmd = self.websockify + " -D {}:5901 {}:5900".format(host, host) + cmd = self.websockify + f" -D {host}:5901 {host}:5900" result = os.system(cmd) if result == 0: self.pidOfWebsockify = self.getproc(cmd) @@ -255,8 +255,8 @@ def start(self): "[{0}]".format(self.pidOfWebsockify)) with tempfile.NamedTemporaryFile() as pidfile: - options += " -pidfile {0}".format(pidfile.name) - result = os.system("%s %s" % (self.executable, options)) + options += f" -pidfile {pidfile.name}" + result = os.system(f"{self.executable} {options}") if result == 0: self.pidOfQemu = self.getpid(pidfile.name) self.mtda.debug(2, "power.qemu.start(): " @@ -272,15 +272,13 @@ def start(self): def kill(self, name, pid, timeout=3): tries = timeout if psutil.pid_exists(pid): - self.mtda.debug(2, "terminating {0} " - "[{1}] using SIGTERM".format(name, pid)) + self.mtda.debug(2, f"terminating {name} [{pid}] using SIGTERM") os.kill(pid, signal.SIGTERM) while tries > 0 and psutil.pid_exists(pid): time.sleep(1) tries = tries - 1 if psutil.pid_exists(pid): - self.mtda.debug(2, "terminating {0} " - "[{1}] using SIGKILL".format(name, pid)) + self.mtda.debug(2, f"terminating {name} [{pid}] using SIGKILL") os.kill(pid, signal.SIGKILL) return psutil.pid_exists(pid) @@ -329,7 +327,7 @@ def monitor_command_output(self): if output.endswith("(qemu) "): output = output[:-7] - self.mtda.debug(3, "power.qemu.monitor_command_output(): %s" % output) + self.mtda.debug(3, f"power.qemu.monitor_command_output(): {output}") return output def _cmd(self, what): @@ -350,7 +348,7 @@ def _cmd(self, what): # provide response from the monitor output = self.monitor_command_output() - self.mtda.debug(3, "power.qemu._cmd(): %s" % str(output)) + self.mtda.debug(3, f"power.qemu._cmd(): {str(output)}") return output def cmd(self, what): @@ -360,7 +358,7 @@ def cmd(self, what): result = self._cmd(what) self.lock.release() - self.mtda.debug(3, "power.qemu.cmd(): %s" % str(result)) + self.mtda.debug(3, f"power.qemu.cmd(): {str(result)}") return result def command(self, args): @@ -369,7 +367,7 @@ def command(self, args): result = self.cmd(" ".join(args)) result = "\n".join(result.splitlines()[1:]) - self.mtda.debug(3, "power.qemu.command(): %s" % str(result)) + self.mtda.debug(3, f"power.qemu.command(): {str(result)}") return result def on(self): @@ -408,9 +406,9 @@ def status(self): break if result == self.POWER_UNSURE: - self.mtda.debug(1, "unknown power status: %s" % str(status)) + self.mtda.debug(1, f"unknown power status: {str(status)}") - self.mtda.debug(3, "power.qemu.status(): %s" % str(result)) + self.mtda.debug(3, f"power.qemu.status(): {str(result)}") return result def usb_ids(self): @@ -420,7 +418,7 @@ def usb_ids(self): for line in lines: line = line.strip() if line.startswith("Device "): - self.mtda.debug(2, "power.qemu.usb_ids(): {0}".format(line)) + self.mtda.debug(2, f"power.qemu.usb_ids(): {line}") match = re.findall(r'ID: (\S+)$', line) if match: results.append(match[0]) @@ -433,8 +431,8 @@ def usb_add(self, id, file): self.lock.acquire() if id not in self.usb_ids(): - self.mtda.debug(2, "power.qemu.usb_add(): " - "adding '{0}' as '{1}'".format(file, id)) + self.mtda.debug(2, "power.qemu." + f"usb_add(): adding '{file}' as '{id}'") cmdstr = "drive_add 0 if=none,id={0},file={1}" output = self._cmd(cmdstr.format(id, file)) added = False @@ -458,7 +456,7 @@ def usb_add(self, id, file): "usb-storage '{0}' could not be added " "({1})!".format(id, reason)) - self.mtda.debug(3, "power.qemu.usb_add(): %s" % str(result)) + self.mtda.debug(3, f"power.qemu.usb_add(): {str(result)}") self.lock.release() return result @@ -469,17 +467,17 @@ def usb_rm(self, id): self.lock.acquire() if id in self.usb_ids(): - self._cmd("device_del {0}".format(id)) + self._cmd(f"device_del {id}") result = (id not in self.usb_ids()) if result: - self.mtda.debug(2, "power.qemu.usb_rm(): " - "usb-storage '{0}' removed".format(id)) + self.mtda.debug(2, "power.qemu." + f"usb_rm(): usb-storage '{id}' removed") else: self.mtda.debug(1, "power.qemu.usb_rm(): " "usb-storage '{0}' could not be " "removed!".format(id)) - self.mtda.debug(3, "power.qemu.usb_rm(): %s" % str(result)) + self.mtda.debug(3, f"power.qemu.usb_rm(): {str(result)}") self.lock.release() return result diff --git a/mtda/power/shellcmd.py b/mtda/power/shellcmd.py index f772b2ce..d90756f3 100644 --- a/mtda/power/shellcmd.py +++ b/mtda/power/shellcmd.py @@ -37,13 +37,13 @@ def configure(self, conf): def probe(self): if self.on_cmd is None: raise ValueError("on-cmd not specified") - self.mtda.debug(3, "on-cmd: {}".format(self.on_cmd)) + self.mtda.debug(3, f"on-cmd: {self.on_cmd}") if self.off_cmd is None: raise ValueError("off-cmd not specified") - self.mtda.debug(3, "off-cmd: {}".format(self.off_cmd)) + self.mtda.debug(3, f"off-cmd: {self.off_cmd}") if self.check_on is None: raise ValueError("check-on not specified") - self.mtda.debug(3, "check_on: {}".format(self.check_on)) + self.mtda.debug(3, f"check_on: {self.check_on}") def command(self, args): return False @@ -64,7 +64,7 @@ def off(self): def status(self): proc = subprocess.run(self.check_on, shell=True, capture_output=True) - self.mtda.debug(3, "check_on: {}".format(proc.returncode)) + self.mtda.debug(3, f"check_on: {proc.returncode}") if proc.returncode == 0: return self.POWER_ON elif proc.returncode == 1: diff --git a/mtda/power/usbrelay.py b/mtda/power/usbrelay.py index 908e29da..38e2345b 100644 --- a/mtda/power/usbrelay.py +++ b/mtda/power/usbrelay.py @@ -37,13 +37,13 @@ def probe(self): raise ValueError("usbrelay: 'lines' not configured!") if os.path.exists(self.exe) is False: - raise ValueError("{0} was not found!".format(self.exe)) + raise ValueError(f"{self.exe} was not found!") # Make sure all configured lines were detected statuses = self._get_lines() for line in self.lines: if line not in statuses: - raise ValueError("usbrelay: {0} not detected!".format(line)) + raise ValueError(f"usbrelay: {line} not detected!") def command(self, args): return False @@ -68,8 +68,8 @@ def status(self): value = self.POWER_ON else: value = self.POWER_OFF - self.mtda.debug(3, "power.usbrelay.status: " - "line {0} is {1}".format(line, value)) + self.mtda.debug(3, "power.usbrelay." + f"status: line {line} is {value}") if first is True: first = False result = value @@ -104,7 +104,7 @@ def _set_lines(self, value): try: for line in self.lines: subprocess.check_call( - [self.exe, "{0}={1}".format(line, value)]) + [self.exe, f"{line}={value}"]) except subprocess.CalledProcessError: return False return True diff --git a/mtda/scripts/__init__.py b/mtda/scripts/__init__.py index 28043f2a..0a00db9c 100644 --- a/mtda/scripts/__init__.py +++ b/mtda/scripts/__init__.py @@ -17,7 +17,7 @@ def load_device_scripts(variant, env): try: scripts = importlib.import_module("mtda.scripts." + variant) for op in ops.keys(): - name = "{}_{}".format(variant, op.replace('-', '_')) + name = f"{variant}_{op.replace('-', '_')}" if hasattr(scripts, name) is False: continue method = getattr(scripts, name) @@ -32,14 +32,13 @@ def load_device_scripts(variant, env): def op_handler(name): if name in ops and variant in ops[name]: - mtda.debug(2, "calling '{}' device script".format(name)) + mtda.debug(2, f"calling '{name}' device script") mtda.env_set(name, '0') result = ops[name][variant]() - mtda.env_set(name, '{}'.format(result)) + mtda.env_set(name, f'{result}') else: if variant != 'unknown': - mtda.debug(1, "no '{}' script provided " - "for '{}'".format(name, variant)) + mtda.debug(1, f"no '{name}' script provided for '{variant}'") mtda.env_set(name, 'not supported') diff --git a/mtda/scripts/qemu.py b/mtda/scripts/qemu.py index 326edd18..e47e0af2 100644 --- a/mtda/scripts/qemu.py +++ b/mtda/scripts/qemu.py @@ -97,7 +97,7 @@ def qemu_power_on(): def qemu_select_item(target, tries=10): - mtda.debug(1, "wanting to select '{}'".format(target)) + mtda.debug(1, f"wanting to select '{target}'") output = "" while target not in output and tries > 0: diff --git a/mtda/storage/docker.py b/mtda/storage/docker.py index e72bed86..78e3453f 100644 --- a/mtda/storage/docker.py +++ b/mtda/storage/docker.py @@ -34,7 +34,7 @@ def close(self): self._handle = None result = self._docker.import_close() - self.mtda.debug(3, "storage.docker.close(): {}".format(result)) + self.mtda.debug(3, f"storage.docker.close(): {result}") return result def configure(self, conf): @@ -52,7 +52,7 @@ def open(self): self._handle = self._docker.import_open() result = self._handle is not None - self.mtda.debug(3, "storage.docker.open(): {}".format(result)) + self.mtda.debug(3, f"storage.docker.open(): {result}") return result def probe(self): @@ -64,7 +64,7 @@ def probe(self): self.mtda.debug(1, "storage.docker.probe(): " "docker power controller required!") - self.mtda.debug(3, "storage.docker.probe(): {}".format(result)) + self.mtda.debug(3, f"storage.docker.probe(): {result}") self._lock.release() return result @@ -93,7 +93,7 @@ def write(self, data): if self._handle is not None: result = self._handle.write(data) - self.mtda.debug(3, "storage.docker.write(): {}".format(result)) + self.mtda.debug(3, f"storage.docker.write(): {result}") return result diff --git a/mtda/storage/helpers/image.py b/mtda/storage/helpers/image.py index cd1f142d..8d7a1a40 100644 --- a/mtda/storage/helpers/image.py +++ b/mtda/storage/helpers/image.py @@ -64,7 +64,7 @@ def _close(self): except subprocess.CalledProcessError: result = False - self.mtda.debug(3, "storage.helpers.image._close(): %s" % str(result)) + self.mtda.debug(3, f"storage.helpers.image._close(): {str(result)}") return result def close(self): @@ -73,7 +73,7 @@ def close(self): with self.lock: result = self._close() - self.mtda.debug(3, "storage.helpers.image.close(): %s" % str(result)) + self.mtda.debug(3, f"storage.helpers.image.close(): {str(result)}") return result def _mountpoint(self, path=""): @@ -127,7 +127,7 @@ def _umount(self): self.device = None self.isloop = False - self.mtda.debug(3, "storage.helpers.image._umount(): %s" % str(result)) + self.mtda.debug(3, f"storage.helpers.image._umount(): {str(result)}") Image._is_storage_mounted = result is False return result @@ -136,7 +136,7 @@ def umount(self): with self.lock: result = self._umount() - self.mtda.debug(3, "storage.helpers.image.umount(): %s" % str(result)) + self.mtda.debug(3, f"storage.helpers.image.umount(): {str(result)}") return result def _get_partitions(self): @@ -150,9 +150,9 @@ def _get_partitions(self): device = os.path.join( "/run", "user", str(os.getuid()), "mtda", "storage", "0") os.makedirs(device, exist_ok=True) - cmd = "/usr/bin/partitionfs -s {0} {1}".format(self.file, device) - self.mtda.debug(2, "storage.helpers.image._get_partitions(): " - "{0}".format(cmd)) + cmd = f"/usr/bin/partitionfs -s {self.file} {device}" + self.mtda.debug(2, "storage.helpers" + f".image._get_partitions(): {cmd}") result = (os.system(cmd)) == 0 if result: self.device = device + "/" @@ -170,8 +170,8 @@ def _get_partitions(self): self.device = device self.isloop = True - self.mtda.debug(3, "storage.helpers.image._get_partitions(): " - "%s" % str(result)) + self.mtda.debug(3, "storage.helpers.image." + f"_get_partitions(): {str(result)}") return result def _mount_part(self, path): @@ -183,7 +183,7 @@ def _mount_part(self, path): if os.path.exists(path) is False: self.mtda.debug(1, "storage.helpers.image._mount_part(): " - "{0} does not exist!".format(path)) + f"{path} does not exist!") elif os.path.ismount(mountpoint) is False: os.makedirs(mountpoint, exist_ok=True) if pathlib.Path(path).is_block_device(): @@ -199,17 +199,18 @@ def _mount_part(self, path): elif 'FAT (32 bit)' in fstype: cmd = ["/usr/bin/fusefat", path, mountpoint] else: - self.mtda.debug(1, "storage.helpers.image._mount_part(): " - "{0}".format(fstype)) - self.mtda.debug(1, "storage.helpers.image._mount_part(): " + self.mtda.debug(1, "storage.helpers.image." + f"_mount_part(): {fstype}") + self.mtda.debug(1, "storage.helpers.image." + "_mount_part(): " "file-system not supported") if cmd: cmd = " ".join(cmd) self.mtda.debug(2, "storage.helpers.image._mount_part(): " "mounting {0} on {1}" .format(path, mountpoint)) - self.mtda.debug(2, "storage.helpers.image._mount_part(): " - "{0}".format(cmd)) + self.mtda.debug(2, "storage.helpers.image." + f"_mount_part(): {cmd}") result = (os.system(cmd)) == 0 if result is False: @@ -218,15 +219,15 @@ def _mount_part(self, path): self.mtda.debug(1, "storage.helpers.image._mount_part(): " "{0} is a mount point".format(mountpoint)) - self.mtda.debug(3, "storage.helpers.image._mount_part(): " - "%s" % str(result)) + self.mtda.debug(3, "storage.helpers.image." + f"_mount_part(): {str(result)}") return result def _part_dev(self, path, part): if path[-1:].isdigit(): - return "{0}p{1}".format(path, part) + return f"{path}p{part}" else: - return "{0}{1}".format(path, part) + return f"{path}{part}" def mount(self, part=None): self.mtda.debug(3, "storage.helpers.image.mount()") @@ -234,7 +235,8 @@ def mount(self, part=None): result = self._mount_impl(part) if result is True: Image._is_storage_mounted = True - self.mtda.debug(3, "storage.helpers.image.mount(): %s" % str(result)) + self.mtda.debug(3, "storage.helpers.image." + f"mount(): {str(result)}") return result def _mount_impl(self, part=None): @@ -268,7 +270,7 @@ def open(self): self.mtda.debug(3, "storage.helpers.image.open()") with self.lock: result = self._open_impl() - self.mtda.debug(3, "storage.helpers.image.open(): %s" % str(result)) + self.mtda.debug(3, f"storage.helpers.image.open(): {str(result)}") return result def _open_impl(self): @@ -281,7 +283,7 @@ def _open_impl(self): def path(self): self.mtda.debug(3, "storage.helpers.image.path()") result = self.file - self.mtda.debug(3, "storage.helpers.image.open(): {}".format(result)) + self.mtda.debug(3, f"storage.helpers.image.open(): {result}") return result def status(self): @@ -289,7 +291,7 @@ def status(self): with self.lock: result = self._status() - self.mtda.debug(3, "storage.helpers.image.status(): %s" % str(result)) + self.mtda.debug(3, f"storage.helpers.image.status(): {str(result)}") return result def _get_hasher_by_name(self): @@ -321,7 +323,7 @@ def tell(self): if self.handle is not None: result = self.handle.tell() - self.mtda.debug(3, "storage.helpers.image.tell(): %s" % str(result)) + self.mtda.debug(3, f"storage.helpers.image.tell(): {str(result)}") return result def _locate(self, dst): @@ -337,7 +339,7 @@ def _locate(self, dst): result = path break - self.mtda.debug(3, "storage.helpers.image._locate(): %s" % str(result)) + self.mtda.debug(3, f"storage.helpers.image._locate(): {str(result)}") return result def update(self, dst, offset): @@ -360,8 +362,8 @@ def update(self, dst, offset): self.mtda.debug(1, "storage.helpers.image.update(): " "shared storage already opened!") - self.mtda.debug(3, "storage.helpers.image.update(): " - "%s" % str(result)) + self.mtda.debug(3, "storage.helpers.image." + f"update(): {str(result)}") return result def write(self, data): @@ -377,7 +379,7 @@ def write(self, data): # No bmap result = self.handle.write(data) - self.mtda.debug(3, "storage.helpers.image.write(): %s" % str(result)) + self.mtda.debug(3, f"storage.helpers.image.write(): {str(result)}") return result def _write_with_bmap(self, data): diff --git a/mtda/storage/qemu.py b/mtda/storage/qemu.py index 78ff6b19..d1ec2bbc 100644 --- a/mtda/storage/qemu.py +++ b/mtda/storage/qemu.py @@ -45,7 +45,7 @@ def configure(self, conf): if 'name' in conf: self.name = conf['name'] - self.mtda.debug(3, "storage.qemu.configure(): %s" % str(result)) + self.mtda.debug(3, f"storage.qemu.configure(): {str(result)}") self.lock.release() return result @@ -60,7 +60,7 @@ def _add(self): "usb storage could not be added!") result = False - self.mtda.debug(3, "storage.qemu._add(): %s" % str(result)) + self.mtda.debug(3, f"storage.qemu._add(): {str(result)}") return result def _rm(self): @@ -75,7 +75,7 @@ def _rm(self): self.mtda.debug(1, "storage.qemu._rm(): " "usb storage could not be removed!") - self.mtda.debug(3, "storage.qemu._rm(): %s" % str(result)) + self.mtda.debug(3, f"storage.qemu._rm(): {str(result)}") return result """ Get file used by the USB Function driver""" @@ -88,7 +88,7 @@ def probe(self): self.mtda.debug(1, "storage.qemu.probe(): " "qemu power controller required!") - self.mtda.debug(3, "storage.qemu.probe(): %s" % str(result)) + self.mtda.debug(3, f"storage.qemu.probe(): {str(result)}") self.lock.release() return result @@ -104,7 +104,7 @@ def to_host(self): if result is True: self.mode = CONSTS.STORAGE.ON_HOST - self.mtda.debug(3, "storage.qemu.to_host(): %s" % str(result)) + self.mtda.debug(3, f"storage.qemu.to_host(): {str(result)}") self.lock.release() return result @@ -121,7 +121,7 @@ def to_target(self): if result is True: self.mode = CONSTS.STORAGE.ON_TARGET - self.mtda.debug(3, "storage.qemu.to_target(): %s" % str(result)) + self.mtda.debug(3, f"storage.qemu.to_target(): {str(result)}") self.lock.release() return result @@ -131,7 +131,7 @@ def _status(self): result = self.mode - self.mtda.debug(3, "storage.qemu.status(): %s" % str(result)) + self.mtda.debug(3, f"storage.qemu.status(): {str(result)}") return result diff --git a/mtda/storage/samsung.py b/mtda/storage/samsung.py index 83a976f0..781bc1ac 100644 --- a/mtda/storage/samsung.py +++ b/mtda/storage/samsung.py @@ -40,9 +40,9 @@ def configure(self, conf): ]) except subprocess.CalledProcessError: result = False - self.mtda.debug(2, "storage.samsung.configure(): serial: %s" % - self.serial) - self.mtda.debug(3, "storage.samsung.configure(): %s" % str(result)) + self.mtda.debug(2, "storage.samsung." + f"configure(): serial: {self.serial}") + self.mtda.debug(3, f"storage.samsung.configure(): {str(result)}") return result """ Check presence of the sdmux""" @@ -57,7 +57,7 @@ def probe(self): except subprocess.CalledProcessError: result = False - self.mtda.debug(3, "storage.samsung.probe(): %s" % str(result)) + self.mtda.debug(3, f"storage.samsung.probe(): {str(result)}") return result """ Attach the SD card to the host""" @@ -72,7 +72,7 @@ def to_host(self): except subprocess.CalledProcessError: result = False - self.mtda.debug(3, "storage.samsung.to_host(): %s" % str(result)) + self.mtda.debug(3, f"storage.samsung.to_host(): {str(result)}") return result """ Attach the SD card to the target""" @@ -91,7 +91,7 @@ def to_target(self): except subprocess.CalledProcessError: result = False - self.mtda.debug(3, "storage.samsung.to_target(): %s" % str(result)) + self.mtda.debug(3, f"storage.samsung.to_target(): {str(result)}") self.lock.release() return result @@ -115,7 +115,7 @@ def _status(self): self.mtda.debug(1, "storage.samsung.status(): sd-mux-ctrl failed!") result = CONSTS.STORAGE.UNKNOWN - self.mtda.debug(3, "storage.samsung.status(): %s" % str(result)) + self.mtda.debug(3, f"storage.samsung.status(): {str(result)}") return result diff --git a/mtda/storage/usbf.py b/mtda/storage/usbf.py index a0611936..544ffe54 100644 --- a/mtda/storage/usbf.py +++ b/mtda/storage/usbf.py @@ -36,12 +36,11 @@ def cleanup(self): result = None if self.loop is not None: - self.mtda.debug(2, "storage.usbf.cleanup(): " - "removing {}".format(self.loop)) + self.mtda.debug(2, f"storage.usbf.cleanup(): removing {self.loop}") cmd = ['/usr/sbin/losetup', '-d', self.loop] self.loop = subprocess.check_call(cmd) - self.mtda.debug(3, "storage.usbf.cleanup(): {}".format(result)) + self.mtda.debug(3, f"storage.usbf.cleanup(): {result}") return result """ Configure this storage controller from the provided configuration""" @@ -72,7 +71,7 @@ def configure(self, conf): if self.file is not None: result = Composite.configure('storage', conf) - self.mtda.debug(3, "storage.usbf.configure(): {}".format(result)) + self.mtda.debug(3, f"storage.usbf.configure(): {result}") return result def configure_systemd(self, dir): @@ -100,13 +99,13 @@ def probe(self): if os.path.exists(self.file) is True: result = True else: - self.mtda.debug(1, "storage.usbf.probe(): " - "{} not found!".format(self.file)) + self.mtda.debug(1, "storage.usbf." + f"probe(): {self.file} not found!") else: self.mtda.debug(1, "storage.usbf.probe(): " "file not configured!") - self.mtda.debug(3, "storage.usbf.probe(): {}".format(result)) + self.mtda.debug(3, f"storage.usbf.probe(): {result}") return result """ Attach the shared storage device to the host""" @@ -117,7 +116,7 @@ def to_host(self): self.mode = CONSTS.STORAGE.ON_HOST result = True - self.mtda.debug(3, "storage.usbf.to_host(): {}".format(result)) + self.mtda.debug(3, f"storage.usbf.to_host(): {result}") self.lock.release() return result @@ -134,7 +133,7 @@ def to_target(self): self.mode = CONSTS.STORAGE.ON_TARGET self.lock.release() - self.mtda.debug(3, "storage.usbf.to_target(): {}".format(result)) + self.mtda.debug(3, f"storage.usbf.to_target(): {result}") return result """ Determine where the shared storage device is attached""" @@ -143,7 +142,7 @@ def _status(self): result = self.mode - self.mtda.debug(3, "storage.usbf.status(): {}".format(result)) + self.mtda.debug(3, f"storage.usbf.status(): {result}") return result diff --git a/mtda/storage/writer.py b/mtda/storage/writer.py index 1dbeaa18..da68d43f 100644 --- a/mtda/storage/writer.py +++ b/mtda/storage/writer.py @@ -39,8 +39,7 @@ def compression(self): result = self._compression - self.mtda.debug(3, "storage.writer.compression.get(): " - "%s" % str(result)) + self.mtda.debug(3, f"storage.writer.compression.get(): {str(result)}") return result @compression.setter @@ -63,8 +62,7 @@ def compression(self, compression): self._compression = compression result = compression - self.mtda.debug(3, "storage.writer.compression.set(): " - "%s" % str(result)) + self.mtda.debug(3, f"storage.writer.compression.set(): {str(result)}") @property def failed(self): @@ -80,7 +78,7 @@ def put(self, chunk, block=True, timeout=None): # if thread is started and put data is not empty if len(chunk) > 0 and self._exiting is False: self._writing = True - self.mtda.debug(3, "storage.writer.put(): %s" % str(result)) + self.mtda.debug(3, f"storage.writer.put(): {str(result)}") return result def start(self): @@ -91,7 +89,7 @@ def start(self): daemon=True, name='writer') self._thread.start() - self.mtda.debug(3, "storage.writer.start(): %s" % str(result)) + self.mtda.debug(3, f"storage.writer.start(): {str(result)}") return result def stop(self): @@ -111,7 +109,7 @@ def stop(self): self._thread = None self._zdec = None - self.mtda.debug(3, "storage.writer.stop(): %s" % str(result)) + self.mtda.debug(3, f"storage.writer.stop(): {str(result)}") return result def worker(self): @@ -129,7 +127,7 @@ def worker(self): try: self._write(chunk) except Exception as e: - self.mtda.debug(1, "storage.writer.worker(): {}".format(e)) + self.mtda.debug(1, f"storage.writer.worker(): {e}") self._failed = True self._writing = False pass @@ -138,7 +136,7 @@ def worker(self): self.mtda.debug(1, "storage.writer.worker(): " "write or decompression error!") - self.mtda.debug(3, "storage.writer.worker(): %s" % str(result)) + self.mtda.debug(3, f"storage.writer.worker(): {str(result)}") return result def write_raw(self, data, session=None): @@ -148,10 +146,10 @@ def write_raw(self, data, session=None): try: result = self.storage.write(data) except OSError as e: - self.mtda.debug(1, "storage.writer.write_raw(): {}".format(e)) + self.mtda.debug(1, f"storage.writer.write_raw(): {e}") raise - self.mtda.debug(3, "storage.writer.write_raw(): %s" % str(result)) + self.mtda.debug(3, f"storage.writer.write_raw(): {str(result)}") return result def write_gz(self, data, session=None): @@ -170,10 +168,10 @@ def write_gz(self, data, session=None): cont = len(data) > 0 result = self.storage.write(uncompressed) except (OSError, zlib.error) as e: - self.mtda.debug(1, "storage.writer.write_gz(): {}".format(e)) + self.mtda.debug(1, f"storage.writer.write_gz(): {e}") raise - self.mtda.debug(3, "storage.writer.write_gz(): %s" % str(result)) + self.mtda.debug(3, f"storage.writer.write_gz(): {str(result)}") return result def write_bz2(self, data): @@ -194,10 +192,10 @@ def write_bz2(self, data): except EOFError: result = 0 except OSError as e: - self.mtda.debug(1, "storage.writer.write_bz2(): {}".format(e)) + self.mtda.debug(1, f"storage.writer.write_bz2(): {e}") raise - self.mtda.debug(3, "storage.writer.write_bz2(): %s" % str(result)) + self.mtda.debug(3, f"storage.writer.write_bz2(): {str(result)}") return result def write_zst(self, data): @@ -211,10 +209,10 @@ def write_zst(self, data): try: result = self._zdec.write(data) except OSError as e: - self.mtda.debug(1, "storage.writer.write_zst(): {}".format(e)) + self.mtda.debug(1, f"storage.writer.write_zst(): {e}") raise - self.mtda.debug(3, "storage.writer.write_zst(): %s" % str(result)) + self.mtda.debug(3, f"storage.writer.write_zst(): {str(result)}") return result def write_xz(self, data): @@ -235,10 +233,10 @@ def write_xz(self, data): except EOFError: result = 0 except OSError as e: - self.mtda.debug(1, "storage.writer.write_xz(): {}".format(e)) + self.mtda.debug(1, f"storage.writer.write_xz(): {e}") raise - self.mtda.debug(3, "storage.writer.write_xz(): %s" % str(result)) + self.mtda.debug(3, f"storage.writer.write_xz(): {str(result)}") return result @property diff --git a/mtda/support/usb.py b/mtda/support/usb.py index 0454429d..fd74091e 100644 --- a/mtda/support/usb.py +++ b/mtda/support/usb.py @@ -34,12 +34,12 @@ def debug(level, msg): Composite.mtda.debug(level, msg) def configure(what, conf): - Composite.debug(3, "composite.configure('{}')".format(what)) + Composite.debug(3, f"composite.configure('{what}')") with Composite.lock: result = Composite._configure(what, conf) - Composite.debug(3, "composite.configure(): {}".format(result)) + Composite.debug(3, f"composite.configure(): {result}") return result def _configure(what, conf): @@ -53,8 +53,8 @@ def _configure(what, conf): if what in Composite.functions: Composite.functions[what]['configured'] = True Composite.functions[what]['enabled'] = True - Composite.debug(2, "composite.configure(): " - "{} configured & enabled".format(what)) + Composite.debug(2, "composite." + f"configure(): {what} configured & enabled") else: Composite.debug(1, "composite.configure(): " "not supported") @@ -80,7 +80,7 @@ def install(): with Composite.lock: result = Composite._install() - Composite.debug(3, "composite.install(): {}".format(result)) + Composite.debug(3, f"composite.install(): {result}") return result def _install(): @@ -128,8 +128,8 @@ def _install(): write(lun + "ro", "0") write(lun + "nofua", "0") write(lun + "file", file) - Composite.debug(2, "composite.install(): " - "storage device/file: {}".format(file)) + Composite.debug(2, "composite." + f"install(): storage device/file: {file}") Composite._installed = Composite._enable() return Composite._installed @@ -139,7 +139,7 @@ def remove(): with Composite.lock: result = Composite._remove() - Composite.debug(3, "composite.remove(): {}".format(result)) + Composite.debug(3, f"composite.remove(): {result}") return result def _remove(): @@ -170,8 +170,8 @@ def _create_functions(): continue if function['enabled'] is False: continue - Composite.debug(2, "composite._create_functions: " - "registering {}".format(name)) + Composite.debug(2, "composite." + f"_create_functions: registering {name}") path = Composite.path + "/functions/" + name if not os.path.exists(path): os.makedirs(path) diff --git a/mtda/usb/gpio.py b/mtda/usb/gpio.py index e599ff52..204de0f1 100644 --- a/mtda/usb/gpio.py +++ b/mtda/usb/gpio.py @@ -47,7 +47,7 @@ def configure(self, conf): self.gpiopair.append(itemgetter(0, 1)(gpio.split('@'))) result = True - self.mtda.debug(3, "usb.gpio.configure(): %s" % str(result)) + self.mtda.debug(3, f"usb.gpio.configure(): {str(result)}") return result def probe(self): @@ -65,18 +65,17 @@ def probe(self): for line in self.lines: try: if line.is_used() is False: - self.mtda.debug(3, "power.gpiochip{}@pin{} is free for " - "use" .format(chip, pin)) + self.mtda.debug(3, f"power.gpiochip{chip}@pin{pin} " + "is free for use") line.request(consumer='mtda', type=gpiod.LINE_REQ_DIR_OUT) else: raise ValueError("gpiochip{}@pin{} is in use by other " "service" .format(chip, pin)) except OSError: - self.mtda.debug(3, "line {} is not configured correctly" - .format(line)) + self.mtda.debug(3, f"line {line} is not configured correctly") result = True - self.mtda.debug(3, "usb.gpio.probe(): {}" .format(result)) + self.mtda.debug(3, f"usb.gpio.probe(): {result}") return result def on(self): @@ -85,7 +84,7 @@ def on(self): for line in self.lines: line.set_value(self.enable) result = self.status() == self.POWERED_ON - self.mtda.debug(3, "usb.gpio.on(): {}" .format(result)) + self.mtda.debug(3, f"usb.gpio.on(): {result}") return result def off(self): @@ -94,7 +93,7 @@ def off(self): for line in self.lines: line.set_value(self.disable) result = self.status() == self.POWERED_OFF - self.mtda.debug(3, "usb.gpio.off(): {}" .format(result)) + self.mtda.debug(3, f"usb.gpio.off(): {result}") return result def status(self): @@ -108,8 +107,7 @@ def status(self): else: result = self.POWERED_OFF - self.mtda.debug(3, "usb.gpio.status():" - "line {} is {}" .format(line, value)) + self.mtda.debug(3, f"usb.gpio.status():line {line} is {value}") return result def toggle(self): @@ -122,7 +120,7 @@ def toggle(self): self.on() result = self.POWERED_ON - self.mtda.debug(3, "usb.gpio.toggle(): %s" % str(result)) + self.mtda.debug(3, f"usb.gpio.toggle(): {str(result)}") return result diff --git a/mtda/usb/qemu_mass_storage.py b/mtda/usb/qemu_mass_storage.py index 1b2c841e..bfa4ed70 100644 --- a/mtda/usb/qemu_mass_storage.py +++ b/mtda/usb/qemu_mass_storage.py @@ -42,8 +42,7 @@ def configure(self, conf): if 'name' in conf: self.name = conf['name'] - self.mtda.debug(3, "usb.qemu_mass_storage.configure(): " - "%s" % str(result)) + self.mtda.debug(3, f"usb.qemu_mass_storage.configure(): {str(result)}") self.lock.release() return result @@ -55,7 +54,7 @@ def probe(self): self.mtda.debug(1, "usb.qemu_mass_storage.probe(): " "qemu power controller required!") - self.mtda.debug(3, "usb.qemu_mass_storage.probe(): %s" % str(result)) + self.mtda.debug(3, f"usb.qemu_mass_storage.probe(): {str(result)}") return result def on(self): @@ -70,7 +69,7 @@ def on(self): "usb storage could not be added!") result = False - self.mtda.debug(3, "usb.qemu_mass_storage.add(): %s" % str(result)) + self.mtda.debug(3, f"usb.qemu_mass_storage.add(): {str(result)}") self.lock.release() return result @@ -87,7 +86,7 @@ def off(self): self.mtda.debug(1, "usb.qemu_mass_storage.off(): " "usb storage could not be removed!") - self.mtda.debug(3, "usb.qemu_mass_storage.off(): %s" % str(result)) + self.mtda.debug(3, f"usb.qemu_mass_storage.off(): {str(result)}") self.lock.release() return result @@ -99,7 +98,7 @@ def status(self): if self.id is None: result = self.POWERED_OFF - self.mtda.debug(3, "usb.qemu_mass_storage.status(): %s" % str(result)) + self.mtda.debug(3, f"usb.qemu_mass_storage.status(): {str(result)}") self.lock.release() return result @@ -113,7 +112,7 @@ def toggle(self): self.on() result = self.POWERED_ON - self.mtda.debug(3, "usb.qemu_mass_storage.toggle(): %s" % str(result)) + self.mtda.debug(3, f"usb.qemu_mass_storage.toggle(): {str(result)}") return result diff --git a/mtda/utils.py b/mtda/utils.py index 76e02e34..e23e5d97 100644 --- a/mtda/utils.py +++ b/mtda/utils.py @@ -28,5 +28,5 @@ def create_device_dependency(dropin_path: str, device_path: str): device = device_path[1:].replace('-', '\\x2d').replace('/', '-') with open(dropin_path, 'w') as f: f.write('[Unit]\n') - f.write('Wants={}.device\n'.format(device)) - f.write('After={}.device\n'.format(device)) + f.write(f'Wants={device}.device\n') + f.write(f'After={device}.device\n') diff --git a/mtda/video/mjpg_streamer.py b/mtda/video/mjpg_streamer.py index 5cbd6d1b..565e836a 100644 --- a/mtda/video/mjpg_streamer.py +++ b/mtda/video/mjpg_streamer.py @@ -70,15 +70,15 @@ def probe(self): raise ValueError("mjpg_streamer executable not specified!") result = True - if os.system("%s --version" % self.executable) != 0: - raise ValueError("could not execute %s!" % self.executable) + if os.system(f"{self.executable} --version") != 0: + raise ValueError(f"could not execute {self.executable}!") if os.path.exists(self.dev) is False: self.mtda.debug(1, "video.mjpg_streamer.probe(): " "video device (%s) not found!" % self.dev) result = False - self.mtda.debug(3, "video.mjpg_streamer.probe(): %s" % str(result)) + self.mtda.debug(3, f"video.mjpg_streamer.probe(): {str(result)}") return result def getpid(self): @@ -105,7 +105,7 @@ def start(self): options += self.output.format(self.port, self.www) options += " -b" - result = os.system("%s %s" % (self.executable, options)) + result = os.system(f"{self.executable} {options}") if result == 0: self.pid = self.getpid() self.mtda.debug(2, "video.mjpg_streamer.start(): " @@ -121,15 +121,13 @@ def start(self): def kill(self, name, pid, timeout=3): tries = timeout if psutil.pid_exists(pid): - self.mtda.debug(2, "terminating {0} " - "[{1}] using SIGTERM".format(name, pid)) + self.mtda.debug(2, f"terminating {name} [{pid}] using SIGTERM") os.kill(pid, signal.SIGTERM) while tries > 0 and psutil.pid_exists(pid): time.sleep(1) tries = tries - 1 if psutil.pid_exists(pid): - self.mtda.debug(2, "terminating {0} " - "[{1}] using SIGKILL".format(name, pid)) + self.mtda.debug(2, f"terminating {name} [{pid}] using SIGKILL") os.kill(pid, signal.SIGKILL) return psutil.pid_exists(pid) @@ -146,19 +144,19 @@ def stop(self): self.pid = None self.lock.release() - self.mtda.debug(3, "video.mjpg_streamer.stop(): %s" % str(result)) + self.mtda.debug(3, f"video.mjpg_streamer.stop(): {str(result)}") return result def url(self, host="", opts=None): - self.mtda.debug(3, "video.mjpg_streamer.url(host='%s')" % str(host)) + self.mtda.debug(3, f"video.mjpg_streamer.url(host='{str(host)}')") if host is None or host == "": host = socket.getfqdn() - self.mtda.debug(3, "video.mjpg_streamer.url: " - "using host='%s'" % str(host)) - result = "http://{0}:{1}/?action=stream".format(host, self.port) + self.mtda.debug(3, "video.mjpg_streamer." + f"url: using host='{str(host)}'") + result = f"http://{host}:{self.port}/?action=stream" - self.mtda.debug(3, "video.mjpg_streamer.url(): %s" % str(result)) + self.mtda.debug(3, f"video.mjpg_streamer.url(): {str(result)}") return result diff --git a/mtda/video/qemu.py b/mtda/video/qemu.py index 232782f6..f7d8ae2b 100644 --- a/mtda/video/qemu.py +++ b/mtda/video/qemu.py @@ -30,7 +30,7 @@ def configure(self, conf): if 'sink' in conf: self.sink = conf['sink'] - self.mtda.debug(3, "video.qemu.configure(): " "%s" % str(result)) + self.mtda.debug(3, f"video.qemu.configure(): {str(result)}") return result @property @@ -45,7 +45,7 @@ def probe(self): self.mtda.debug(1, "video.qemu.probe(): " "qemu power controller required!") - self.mtda.debug(3, "video.qemu.probe(): %s" % str(result)) + self.mtda.debug(3, f"video.qemu.probe(): {str(result)}") return result def start(self): @@ -55,19 +55,18 @@ def stop(self): return True def url(self, host="", opts=None): - self.mtda.debug(3, "video.qemu.url(host={0}, " - "opts={1}".format(host, opts)) + self.mtda.debug(3, f"video.qemu.url(host={host}, opts={opts}") if host is None or host == "": host = socket.getfqdn() - self.mtda.debug(3, "video.qemu.url: using host='%s'" % str(host)) - result = 'gst-pipeline: rfbsrc host={0} ! {1}'.format(host, self.sink) + self.mtda.debug(3, f"video.qemu.url: using host='{str(host)}'") + result = f'gst-pipeline: rfbsrc host={host} ! {self.sink}' if opts is not None: if 'sink' in opts: if 'name' in opts['sink']: - result += ' name="{0}"'.format(opts['sink']['name']) + result += f" name=\"{opts['sink']['name']}\"" - self.mtda.debug(3, "video.qemu.url(): %s" % str(result)) + self.mtda.debug(3, f"video.qemu.url(): {str(result)}") return result diff --git a/mtda/video/ustreamer.py b/mtda/video/ustreamer.py index 2fe61484..bd751213 100644 --- a/mtda/video/ustreamer.py +++ b/mtda/video/ustreamer.py @@ -40,8 +40,8 @@ def configure(self, conf): if 'device' in conf: self.dev = conf['device'] - self.mtda.debug(4, 'video.ustreamer.configure(): ' - 'will use %s' % str(self.dev)) + self.mtda.debug(4, 'video.ustreamer.' + f'configure(): will use {str(self.dev)}') if 'executable' in conf: self.executable = conf['executable'] if 'port' in conf: @@ -72,7 +72,7 @@ def probe(self): self.mtda.debug(1, 'error calling %s: %s', self.executable, str(e)) result = False - self.mtda.debug(3, 'video.ustreamer.probe(): %s' % str(result)) + self.mtda.debug(3, f'video.ustreamer.probe(): {str(result)}') return result def start(self): @@ -111,15 +111,15 @@ def stop(self): return True def url(self, host="", opts=None): - self.mtda.debug(3, "video.ustreamer.url(host='%s')" % str(host)) + self.mtda.debug(3, f"video.ustreamer.url(host='{str(host)}')") if host is None or host == "": host = socket.getfqdn() - self.mtda.debug(3, "video.ustreamer.url: " - "using host='%s'" % str(host)) - result = "http://{0}:{1}/?action=stream".format(host, self.port) + self.mtda.debug(3, "video.ustreamer." + f"url: using host='{str(host)}'") + result = f"http://{host}:{self.port}/?action=stream" - self.mtda.debug(3, 'video.ustreamer.url(): %s' % str(result)) + self.mtda.debug(3, f'video.ustreamer.url(): {str(result)}') return result