diff --git a/pymux/__init__.py b/pymux/__init__.py index baffc48..e69de29 100644 --- a/pymux/__init__.py +++ b/pymux/__init__.py @@ -1 +0,0 @@ -from __future__ import unicode_literals diff --git a/pymux/__main__.py b/pymux/__main__.py index a95c19f..fb64663 100644 --- a/pymux/__main__.py +++ b/pymux/__main__.py @@ -1,8 +1,7 @@ """ Make sure `python -m pymux` works. """ -from __future__ import unicode_literals from .entry_points.run_pymux import run -if __name__ == '__main__': +if __name__ == "__main__": run() diff --git a/pymux/arrangement.py b/pymux/arrangement.py index 7c3ef58..39feffa 100644 --- a/pymux/arrangement.py +++ b/pymux/arrangement.py @@ -7,51 +7,49 @@ An arrangement consists of a list of windows. And a window has a list of panes, arranged by ordering them in HSplit/VSplit instances. """ -from __future__ import unicode_literals - -from ptterm import Terminal -from prompt_toolkit.application.current import get_app, set_app -from prompt_toolkit.buffer import Buffer - import math import os -import weakref -import six +from enum import Enum +from typing import Dict, List, Optional, Union +from weakref import WeakKeyDictionary, ref + +from prompt_toolkit.application import Application, get_app, get_app_or_none, set_app +from prompt_toolkit.buffer import Buffer +from ptterm import Terminal -__all__ = ( - 'LayoutTypes', - 'Pane', - 'HSplit', - 'VSplit', - 'Window', - 'Arrangement', -) +__all__ = [ + "LayoutTypes", + "Pane", + "HSplit", + "VSplit", + "Window", + "Arrangement", +] -class LayoutTypes: +class LayoutTypes(Enum): # The values are in lowercase with dashes, because that is what users can # use at the command line. - EVEN_HORIZONTAL = 'even-horizontal' - EVEN_VERTICAL = 'even-vertical' - MAIN_HORIZONTAL = 'main-horizontal' - MAIN_VERTICAL = 'main-vertical' - TILED = 'tiled' - - _ALL = [EVEN_HORIZONTAL, EVEN_VERTICAL, MAIN_HORIZONTAL, MAIN_VERTICAL, TILED] + EVEN_HORIZONTAL = "EVEN-HORIZONTAL" + EVEN_VERTICAL = "EVEN-VERTICAL" + MAIN_HORIZONTAL = "MAIN-HORIZONTAL" + MAIN_VERTICAL = "MAIN-VERTICAL" + TILED = "TILED" -class Pane(object): +class Pane: """ One pane, containing one process and a search buffer for going into copy mode or displaying the help. """ - _pane_counter = 1000 # Start at 1000, to be sure to not confuse this with pane indexes. - def __init__(self, terminal=None): - assert isinstance(terminal, Terminal) + _pane_counter = ( + 1000 # Start at 1000, to be sure to not confuse this with pane indexes. + ) + def __init__(self, terminal: Terminal) -> None: self.terminal = terminal - self.chosen_name = None + self.chosen_name: Optional[str] = None # Displayed the clock instead of this pane content. self.clock_mode = False @@ -66,16 +64,15 @@ def __init__(self, terminal=None): # get_tokens_for_line, that returns the token list with color # information for each line. self.scroll_buffer = Buffer(read_only=True) - self.copy_get_tokens_for_line = lambda lineno: [] self.display_scroll_buffer = False - self.scroll_buffer_title = '' + self.scroll_buffer_title = "" @property def process(self): return self.terminal.process @property - def name(self): + def name(self) -> str: """ The name for the window as displayed in the title bar and status bar. """ @@ -88,23 +85,23 @@ def name(self): if name: return os.path.basename(name) - return '' + return "" - def enter_copy_mode(self): + def enter_copy_mode(self) -> None: """ Suspend the process, and copy the screen content to the `scroll_buffer`. That way the user can search through the history and copy/paste. """ self.terminal.enter_copy_mode() - def focus(self): + def focus(self) -> None: """ Focus this pane. """ get_app().layout.focus(self.terminal) -class _WeightsDictionary(weakref.WeakKeyDictionary): +class _WeightsDictionary(WeakKeyDictionary): """ Dictionary for the weights: weak keys, but defaults to 1. @@ -114,10 +111,11 @@ class _WeightsDictionary(weakref.WeakKeyDictionary): This dictionary maps the child (another HSplit/VSplit or Pane), to the size. (Integer.) """ + def __getitem__(self, key): try: # (Don't use 'super' here. This is a classobj in Python2.) - return weakref.WeakKeyDictionary.__getitem__(self, key) + return WeakKeyDictionary.__getitem__(self, key) except KeyError: return 1 @@ -127,6 +125,7 @@ class _Split(list): Base class for horizontal and vertical splits. (This is a higher level split than prompt_toolkit.layout.HSplit.) """ + def __init__(self, *a, **kw): list.__init__(self, *a, **kw) @@ -138,30 +137,31 @@ def __hash__(self): return id(self) def __repr__(self): - return '%s(%s)' % (self.__class__.__name__, list.__repr__(self)) + return "%s(%s)" % (self.__class__.__name__, list.__repr__(self)) class HSplit(_Split): - """ Horizontal split. """ + """Horizontal split.""" class VSplit(_Split): - """ Horizontal split. """ + """Horizontal split.""" -class Window(object): +class Window: """ Pymux window. """ + _window_counter = 1000 # Start here, to avoid confusion with window index. - def __init__(self, index=0): + def __init__(self, index: int = 0) -> None: self.index = index - self.root = HSplit() - self._active_pane = None - self._prev_active_pane = None - self.chosen_name = None - self.previous_selected_layout = None + self.root: Union[VSplit, HSplit] = HSplit() + self._active_pane: Optional[Pane] = None + self._prev_active_pane: Optional["ref[Pane]"] = None + self.chosen_name: Optional[str] = None + self.previous_selected_layout: Optional[LayoutTypes] = None #: When true, the current pane is zoomed in. self.zoom = False @@ -173,50 +173,51 @@ def __init__(self, index=0): Window._window_counter += 1 self.window_id = Window._window_counter - def invalidation_hash(self): + def invalidation_hash(self) -> str: """ Return a hash (string) that can be used to determine when the layout has to be rebuild. """ -# if not self.root: -# return '' + # if not self.root: + # return '' - def _hash_for_split(split): + def _hash_for_split(split: Union[HSplit, VSplit]) -> str: result = [] for item in split: if isinstance(item, (VSplit, HSplit)): result.append(_hash_for_split(item)) elif isinstance(item, Pane): - result.append('p%s' % item.pane_id) + result.append("p%s" % item.pane_id) if isinstance(split, HSplit): - return 'HSplit(%s)' % (','.join(result)) + return "HSplit(%s)" % (",".join(result)) else: - return 'VSplit(%s)' % (','.join(result)) + return "VSplit(%s)" % (",".join(result)) - return '' % ( - self.window_id, self.zoom, _hash_for_split(self.root)) + return "" % ( + self.window_id, + self.zoom, + _hash_for_split(self.root), + ) @property - def active_pane(self): + def active_pane(self) -> Optional[Pane]: """ The current active :class:`.Pane`. """ return self._active_pane @active_pane.setter - def active_pane(self, value): - assert isinstance(value, Pane) - + def active_pane(self, value: Pane) -> None: # Remember previous active pane. if self._active_pane: - self._prev_active_pane = weakref.ref(self._active_pane) + self._prev_active_pane = ref(self._active_pane) self.zoom = False self._active_pane = value @property - def previous_active_pane(self): + def previous_active_pane(self) -> Optional[Pane]: """ The previous active :class:`.Pane` or `None` if unknown. """ @@ -226,9 +227,10 @@ def previous_active_pane(self): # window. if p and p in self.panes: return p + return None @property - def name(self): + def name(self) -> str: """ The name for this window as it should be displayed in the status bar. """ @@ -240,15 +242,12 @@ def name(self): if pane: return pane.name - return '' + return "" - def add_pane(self, pane, vsplit=False): + def add_pane(self, pane: Pane, vsplit: bool = False) -> None: """ Add another pane to this Window. """ - assert isinstance(pane, Pane) - assert isinstance(vsplit, bool) - split_cls = VSplit if vsplit else HSplit if self.active_pane is None: @@ -272,12 +271,10 @@ def add_pane(self, pane, vsplit=False): self.active_pane = pane self.zoom = False - def remove_pane(self, pane): + def remove_pane(self, pane: Pane) -> None: """ Remove pane from this Window. """ - assert isinstance(pane, Pane) - if pane in self.panes: # When this pane was focused, switch to previous active or next in order. if pane == self.active_pane: @@ -305,8 +302,8 @@ def remove_pane(self, pane): p = p2 @property - def panes(self): - " List with all panes from this Window. " + def panes(self) -> List[Pane]: + "List with all panes from this Window." result = [] for s in self.splits: @@ -317,8 +314,8 @@ def panes(self): return result @property - def splits(self): - " Return a list with all HSplit/VSplit instances. " + def splits(self) -> List[Union[HSplit, VSplit]]: + "Return a list with all HSplit/VSplit instances." result = [] def collect(split): @@ -332,37 +329,44 @@ def collect(split): return result def _get_parent(self, item): - " The HSplit/VSplit that contains the active pane. " + "The HSplit/VSplit that contains the active pane." for s in self.splits: if item in s: return s @property - def has_panes(self): - " True when this window contains at least one pane. " + def has_panes(self) -> bool: + "True when this window contains at least one pane." return len(self.panes) > 0 @property def active_process(self): - " Return `Process` that should receive user input. " + "Return `Process` that should receive user input." p = self.active_pane if p is not None: return p.process - def focus_next(self, count=1): - " Focus the next pane. " + def focus_next(self, count=1) -> None: + "Focus the next pane." panes = self.panes if panes: - self.active_pane = panes[(panes.index(self.active_pane) + count) % len(panes)] + self.active_pane = panes[ + (panes.index(self.active_pane or panes[0]) + count) % len(panes) + ] else: self.active_pane = None # No panes left. - def focus_previous(self): - " Focus the previous pane. " + def focus_previous(self) -> None: + "Focus the previous pane." self.focus_next(count=-1) - def rotate(self, count=1, with_pane_before_only=False, with_pane_after_only=False): + def rotate( + self, + count: int = 1, + with_pane_before_only: bool = False, + with_pane_after_only: bool = False, + ) -> None: """ Rotate panes. When `with_pane_before_only` or `with_pane_after_only` is True, only rotate @@ -370,7 +374,7 @@ def rotate(self, count=1, with_pane_before_only=False, with_pane_after_only=Fals """ # Create (split, index, pane, weight) tuples. items = [] - current_pane_index = None + current_pane_index: Optional[int] = None for s in self.splits: for index, item in enumerate(s): @@ -380,11 +384,12 @@ def rotate(self, count=1, with_pane_before_only=False, with_pane_after_only=Fals current_pane_index = len(items) - 1 # Only before after? Reduce list of panes. - if with_pane_before_only: - items = items[current_pane_index - 1:current_pane_index + 1] + if current_pane_index is not None: + if with_pane_before_only: + items = items[current_pane_index - 1 : current_pane_index + 1] - elif with_pane_after_only: - items = items[current_pane_index:current_pane_index + 2] + elif with_pane_after_only: + items = items[current_pane_index : current_pane_index + 2] # Rotate positions. for i, triple in enumerate(items): @@ -395,12 +400,10 @@ def rotate(self, count=1, with_pane_before_only=False, with_pane_after_only=Fals split[index] = new_item split.weights[new_item] = weight - def select_layout(self, layout_type): + def select_layout(self, layout_type: LayoutTypes) -> None: """ Select one of the predefined layouts. """ - assert layout_type in LayoutTypes._ALL - # When there is only one pane, always choose EVEN_HORIZONTAL, # Otherwise, we create VSplit/HSplit instances with an empty list of # children. @@ -417,22 +420,26 @@ def select_layout(self, layout_type): # main-horizontal. elif layout_type == LayoutTypes.MAIN_HORIZONTAL: - self.root = HSplit([ - self.active_pane, - VSplit([p for p in self.panes if p != self.active_pane]) - ]) + self.root = HSplit( + [ + self.active_pane, + VSplit([p for p in self.panes if p != self.active_pane]), + ] + ) # main-vertical. elif layout_type == LayoutTypes.MAIN_VERTICAL: - self.root = VSplit([ - self.active_pane, - HSplit([p for p in self.panes if p != self.active_pane]) - ]) + self.root = VSplit( + [ + self.active_pane, + HSplit([p for p in self.panes if p != self.active_pane]), + ] + ) # tiled. elif layout_type == LayoutTypes.TILED: panes = self.panes - column_count = math.ceil(len(panes) ** .5) + column_count = math.ceil(len(panes) ** 0.5) rows = HSplit() current_row = VSplit() @@ -450,7 +457,7 @@ def select_layout(self, layout_type): self.previous_selected_layout = layout_type - def select_next_layout(self, count=1): + def select_next_layout(self, count: int = 1) -> None: """ Select next layout. (Cycle through predefined layouts.) """ @@ -459,10 +466,10 @@ def select_next_layout(self, count=1): if len(self.panes) == 2: all_layouts = [LayoutTypes.EVEN_HORIZONTAL, LayoutTypes.EVEN_VERTICAL] else: - all_layouts = LayoutTypes._ALL + all_layouts = list(LayoutTypes) # Get index of current layout. - layout = self.previous_selected_layout or LayoutTypes._ALL[-1] + layout = self.previous_selected_layout or list(LayoutTypes)[-1] try: index = all_layouts.index(layout) except ValueError: @@ -472,32 +479,36 @@ def select_next_layout(self, count=1): new_layout = all_layouts[(index + count) % len(all_layouts)] self.select_layout(new_layout) - def select_previous_layout(self): + def select_previous_layout(self) -> None: self.select_next_layout(count=-1) - def change_size_for_active_pane(self, up=0, right=0, down=0, left=0): + def change_size_for_active_pane( + self, up: int = 0, right: int = 0, down: int = 0, left: int = 0 + ) -> None: """ Increase the size of the current pane in any of the four directions. """ child = self.active_pane - self.change_size_for_pane(child, up=up, right=right, down=down, left=left) + if child is not None: + self.change_size_for_pane(child, up=up, right=right, down=down, left=left) - def change_size_for_pane(self, pane, up=0, right=0, down=0, left=0): + def change_size_for_pane(self, pane: Pane, up=0, right=0, down=0, left=0): """ Increase the size of the current pane in any of the four directions. Positive values indicate an increase, negative values a decrease. """ - assert isinstance(pane, Pane) def find_split_and_child(split_cls, is_before): - " Find the split for which we will have to update the weights. " + "Find the split for which we will have to update the weights." child = pane split = self._get_parent(child) def found(): - return isinstance(split, split_cls) and ( - not is_before or split.index(child) > 0) and ( - is_before or split.index(child) < len(split) - 1) + return ( + isinstance(split, split_cls) + and (not is_before or split.index(child) > 0) + and (is_before or split.index(child) < len(split) - 1) + ) while split and not found(): child = split @@ -506,7 +517,7 @@ def found(): return split, child # split can be None! def handle_side(split_cls, is_before, amount, trying_other_side=False): - " Increase weights on one side. (top/left/right/bottom). " + "Increase weights on one side. (top/left/right/bottom)." if amount: split, child = find_split_and_child(split_cls, is_before) @@ -532,49 +543,54 @@ def handle_side(split_cls, is_before, amount, trying_other_side=False): # case it's logical to move the left border to the right # instead. if not trying_other_side: - handle_side(split_cls, not is_before, -amount, - trying_other_side=True) + handle_side( + split_cls, not is_before, -amount, trying_other_side=True + ) handle_side(VSplit, True, left) handle_side(VSplit, False, right) handle_side(HSplit, True, up) handle_side(HSplit, False, down) - def get_pane_index(self, pane): - " Return the index of the given pane. ValueError if not found. " - assert isinstance(pane, Pane) + def get_pane_index(self, pane: Pane): + "Return the index of the given pane. ValueError if not found." return self.panes.index(pane) -class Arrangement(object): +class Arrangement: """ Arrangement class for one Pymux session. This contains the list of windows and the layout of the panes for each window. All the clients share the same Arrangement instance, but they can have different windows active. """ - def __init__(self): - self.windows = [] + + def __init__(self) -> None: + self.windows: List[Window] = [] self.base_index = 0 - self._active_window_for_cli = weakref.WeakKeyDictionary() - self._prev_active_window_for_cli = weakref.WeakKeyDictionary() + self._active_window_for_cli: "WeakKeyDictionary[Application, Window]" = ( + WeakKeyDictionary() + ) + self._prev_active_window_for_cli: "WeakKeyDictionary[Application, Window]" = ( + WeakKeyDictionary() + ) # The active window of the last CLI. Used as default when a new session # is attached. - self._last_active_window = None + self._last_active_window: Optional[Window] = None - def invalidation_hash(self): + def invalidation_hash(self) -> str: """ When this changes, the layout needs to be rebuild. """ if not self.windows: - return '' + return "" w = self.get_active_window() return w.invalidation_hash() - def get_active_window(self): + def get_active_window(self) -> Window: """ The current active :class:`.Window`. """ @@ -583,11 +599,12 @@ def get_active_window(self): try: return self._active_window_for_cli[app] except KeyError: - self._active_window_for_cli[app] = self._last_active_window or self.windows[0] + self._active_window_for_cli[app] = ( + self._last_active_window or self.windows[0] + ) return self.windows[0] - def set_active_window(self, window): - assert isinstance(window, Window) + def set_active_window(self, window: Window) -> None: app = get_app() previous = self.get_active_window() @@ -595,19 +612,17 @@ def set_active_window(self, window): self._active_window_for_cli[app] = window self._last_active_window = window - def set_active_window_from_pane_id(self, pane_id): + def set_active_window_from_pane_id(self, pane_id: int) -> None: """ Make the window with this pane ID the active Window. """ - assert isinstance(pane_id, int) - for w in self.windows: for p in w.panes: if p.pane_id == pane_id: self.set_active_window(w) - def get_previous_active_window(self): - " The previous active Window or None if unknown. " + def get_previous_active_window(self) -> Optional[Window]: + "The previous active Window or None if unknown." app = get_app() try: @@ -616,12 +631,14 @@ def get_previous_active_window(self): return None def get_window_by_index(self, index): - " Return the Window with this index or None if not found. " + "Return the Window with this index or None if not found." for w in self.windows: if w.index == index: return w - def create_window(self, pane, name=None, set_active=True): + def create_window( + self, pane: Pane, name: Optional[str] = None, set_active: bool = True + ) -> None: """ Create a new window that contains just this pane. @@ -629,9 +646,6 @@ def create_window(self, pane, name=None, set_active=True): :param name: If given, name for the new window. :param set_active: When True, focus the new window. """ - assert isinstance(pane, Pane) - assert name is None or isinstance(name, six.text_type) - # Take the first available index. taken_indexes = [w.index for w in self.windows] @@ -647,7 +661,7 @@ def create_window(self, pane, name=None, set_active=True): # Sort windows by index. self.windows = sorted(self.windows, key=lambda w: w.index) - app = get_app(return_none=True) + app = get_app_or_none() if app is not None and set_active: self.set_active_window(w) @@ -658,32 +672,28 @@ def create_window(self, pane, name=None, set_active=True): assert w.active_pane == pane assert w._get_parent(pane) - def move_window(self, window, new_index): + def move_window(self, window: Window, new_index: int) -> None: """ Move window to a new index. """ - assert isinstance(window, Window) - assert isinstance(new_index, int) - window.index = new_index # Sort windows by index. self.windows = sorted(self.windows, key=lambda w: w.index) - def get_active_pane(self): + def get_active_pane(self) -> Optional[Pane]: """ The current :class:`.Pane` from the current window. """ w = self.get_active_window() if w is not None: return w.active_pane + return None - def remove_pane(self, pane): + def remove_pane(self, pane: Pane) -> None: """ Remove a :class:`.Pane`. (Look in all windows.) """ - assert isinstance(pane, Pane) - for w in self.windows: w.remove_pane(pane) @@ -697,19 +707,21 @@ def remove_pane(self, pane): self.windows.remove(w) - def focus_previous_window(self): + def focus_previous_window(self) -> None: w = self.get_active_window() - self.set_active_window(self.windows[ - (self.windows.index(w) - 1) % len(self.windows)]) + self.set_active_window( + self.windows[(self.windows.index(w) - 1) % len(self.windows)] + ) - def focus_next_window(self): + def focus_next_window(self) -> None: w = self.get_active_window() - self.set_active_window(self.windows[ - (self.windows.index(w) + 1) % len(self.windows)]) + self.set_active_window( + self.windows[(self.windows.index(w) + 1) % len(self.windows)] + ) - def break_pane(self, set_active=True): + def break_pane(self, set_active: bool = True) -> None: """ When the current window has multiple panes, remove the pane from this window and put it in a new window. @@ -720,17 +732,18 @@ def break_pane(self, set_active=True): if len(w.panes) > 1: pane = w.active_pane - self.get_active_window().remove_pane(pane) - self.create_window(pane, set_active=set_active) + if pane is not None: + self.get_active_window().remove_pane(pane) + self.create_window(pane, set_active=set_active) - def rotate_window(self, count=1): - " Rotate the panes in the active window. " + def rotate_window(self, count: int = 1) -> None: + "Rotate the panes in the active window." w = self.get_active_window() w.rotate(count=count) @property - def has_panes(self): - " True when any of the windows has a :class:`.Pane`. " + def has_panes(self) -> bool: + "True when any of the windows has a :class:`.Pane`." for w in self.windows: if w.has_panes: return True diff --git a/pymux/client/__init__.py b/pymux/client/__init__.py index dacbf9e..a602a09 100644 --- a/pymux/client/__init__.py +++ b/pymux/client/__init__.py @@ -1,3 +1,2 @@ -from __future__ import unicode_literals from .base import Client from .defaults import create_client, list_clients diff --git a/pymux/client/base.py b/pymux/client/base.py index 8443360..b9635a4 100644 --- a/pymux/client/base.py +++ b/pymux/client/base.py @@ -1,16 +1,13 @@ -from __future__ import unicode_literals +from abc import ABC from prompt_toolkit.output import ColorDepth -from abc import ABCMeta -from six import with_metaclass - __all__ = [ - 'Client', + "Client", ] -class Client(with_metaclass(ABCMeta, object)): +class Client(ABC): def run_command(self, command, pane_id=None): """ Ask the server to run this command. diff --git a/pymux/client/defaults.py b/pymux/client/defaults.py index 929e874..d236f77 100644 --- a/pymux/client/defaults.py +++ b/pymux/client/defaults.py @@ -1,24 +1,28 @@ -from __future__ import unicode_literals from prompt_toolkit.utils import is_windows + __all__ = [ - 'create_client', - 'list_clients', + "create_client", + "list_clients", ] def create_client(socket_name): if is_windows(): from .windows import WindowsClient + return WindowsClient(socket_name) else: from .posix import PosixClient + return PosixClient(socket_name) def list_clients(): if is_windows(): from .windows import list_clients + return list_clients() else: from .posix import list_clients + return list_clients() diff --git a/pymux/client/posix.py b/pymux/client/posix.py index 53b8662..e18b70c 100644 --- a/pymux/client/posix.py +++ b/pymux/client/posix.py @@ -1,13 +1,3 @@ -from __future__ import unicode_literals - -from prompt_toolkit.eventloop.select import select_fds -from prompt_toolkit.input.posix_utils import PosixStdinReader -from prompt_toolkit.input.vt100 import raw_mode, cooked_mode -from prompt_toolkit.output.vt100 import _get_size, Vt100_Output -from prompt_toolkit.output import ColorDepth - -from pymux.utils import nonblocking - import getpass import glob import json @@ -16,14 +6,20 @@ import socket import sys import tempfile -from .base import Client +from select import select -INPUT_TIMEOUT = .5 +from prompt_toolkit.input.posix_utils import PosixStdinReader +from prompt_toolkit.input.vt100 import cooked_mode, raw_mode +from prompt_toolkit.output import ColorDepth +from prompt_toolkit.output.vt100 import Vt100_Output, _get_size +from pymux.utils import nonblocking -__all__ = ( - 'PosixClient', - 'list_clients', -) +from .base import Client + +__all__ = [ + "PosixClient", + "list_clients", +] class PosixClient(Client): @@ -46,7 +42,7 @@ def __init__(self, socket_name): # decoding otherwise. (Also don't pass errors='ignore', because # that doesn't work for parsing mouse input escape sequences, which # consist of a fixed number of bytes.) - self._stdin_reader = PosixStdinReader(sys.stdin.fileno(), errors='replace') + self._stdin_reader = PosixStdinReader(sys.stdin.fileno(), errors="replace") def run_command(self, command, pane_id=None): """ @@ -54,47 +50,45 @@ def run_command(self, command, pane_id=None): :param pane_id: Optional identifier of the current pane. """ - self._send_packet({ - 'cmd': 'run-command', - 'data': command, - 'pane_id': pane_id - }) + self._send_packet({"cmd": "run-command", "data": command, "pane_id": pane_id}) - def attach(self, detach_other_clients=False, color_depth=ColorDepth.DEPTH_8_BIT): + def attach( + self, detach_other_clients: bool = False, color_depth=ColorDepth.DEPTH_8_BIT + ): """ Attach client user interface. """ - assert isinstance(detach_other_clients, bool) - self._send_size() - self._send_packet({ - 'cmd': 'start-gui', - 'detach-others': detach_other_clients, - 'color-depth': color_depth, - 'term': os.environ.get('TERM', ''), - 'data': '' - }) + self._send_packet( + { + "cmd": "start-gui", + "detach-others": detach_other_clients, + "color-depth": color_depth, + "term": os.environ.get("TERM", ""), + "data": "", + } + ) with raw_mode(sys.stdin.fileno()): - data_buffer = b'' + data_buffer = b"" stdin_fd = sys.stdin.fileno() socket_fd = self.socket.fileno() - current_timeout = INPUT_TIMEOUT # Timeout, used to flush escape sequences. try: + def winch_handler(signum, frame): self._send_size() signal.signal(signal.SIGWINCH, winch_handler) while True: - r = select_fds([stdin_fd, socket_fd], current_timeout) + r, _, _ = select([stdin_fd, socket_fd], [], []) if socket_fd in r: # Received packet from server. data = self.socket.recv(1024) - if data == b'': + if data == b"": # End of file. Connection closed. # Reset terminal o = Vt100_Output.from_pty(sys.stdout) @@ -107,20 +101,15 @@ def winch_handler(signum, frame): else: data_buffer += data - while b'\0' in data_buffer: - pos = data_buffer.index(b'\0') + while b"\0" in data_buffer: + pos = data_buffer.index(b"\0") self._process(data_buffer[:pos]) - data_buffer = data_buffer[pos + 1:] + data_buffer = data_buffer[pos + 1 :] elif stdin_fd in r: # Got user input. self._process_stdin() - current_timeout = INPUT_TIMEOUT - else: - # Timeout. (Tell the server to flush the vt100 Escape.) - self._send_packet({'cmd': 'flush-input'}) - current_timeout = None finally: signal.signal(signal.SIGWINCH, signal.SIG_IGN) @@ -128,32 +117,32 @@ def _process(self, data_buffer): """ Handle incoming packet from server. """ - packet = json.loads(data_buffer.decode('utf-8')) + packet = json.loads(data_buffer.decode("utf-8")) - if packet['cmd'] == 'out': + if packet["cmd"] == "out": # Call os.write manually. In Python2.6, sys.stdout.write doesn't use UTF-8. - os.write(sys.stdout.fileno(), packet['data'].encode('utf-8')) + os.write(sys.stdout.fileno(), packet["data"].encode("utf-8")) - elif packet['cmd'] == 'suspend': + elif packet["cmd"] == "suspend": # Suspend client process to background. - if hasattr(signal, 'SIGTSTP'): + if hasattr(signal, "SIGTSTP"): os.kill(os.getpid(), signal.SIGTSTP) - elif packet['cmd'] == 'mode': + elif packet["cmd"] == "mode": # Set terminal to raw/cooked. - action = packet['data'] + action = packet["data"] - if action == 'raw': + if action == "raw": cm = raw_mode(sys.stdin.fileno()) cm.__enter__() self._mode_context_managers.append(cm) - elif action == 'cooked': + elif action == "cooked": cm = cooked_mode(sys.stdin.fileno()) cm.__enter__() self._mode_context_managers.append(cm) - elif action == 'restore' and self._mode_context_managers: + elif action == "restore" and self._mode_context_managers: cm = self._mode_context_managers.pop() cm.__exit__() @@ -167,35 +156,34 @@ def _process_stdin(self): # Send input in chunks of 4k. step = 4056 for i in range(0, len(data), step): - self._send_packet({ - 'cmd': 'in', - 'data': data[i:i + step], - }) + self._send_packet( + { + "cmd": "in", + "data": data[i : i + step], + } + ) def _send_packet(self, data): - " Send to server. " - data = json.dumps(data).encode('utf-8') + "Send to server." + data = json.dumps(data).encode("utf-8") # Be sure that our socket is blocking, otherwise, the send() call could # raise `BlockingIOError` if the buffer is full. self.socket.setblocking(1) - self.socket.send(data + b'\0') + self.socket.send(data + b"\0") def _send_size(self): - " Report terminal size to server. " + "Report terminal size to server." rows, cols = _get_size(sys.stdout.fileno()) - self._send_packet({ - 'cmd': 'size', - 'data': [rows, cols] - }) + self._send_packet({"cmd": "size", "data": [rows, cols]}) def list_clients(): """ List all the servers that are running. """ - p = '%s/pymux.sock.%s.*' % (tempfile.gettempdir(), getpass.getuser()) + p = "%s/pymux.sock.%s.*" % (tempfile.gettempdir(), getpass.getuser()) for path in glob.glob(p): try: yield PosixClient(path) diff --git a/pymux/client/windows.py b/pymux/client/windows.py index c22ab7d..9d2641e 100644 --- a/pymux/client/windows.py +++ b/pymux/client/windows.py @@ -1,23 +1,21 @@ -from __future__ import unicode_literals - +import json +import os +import sys +from asyncio import ensure_future, get_event_loop from ctypes import byref, windll from ctypes.wintypes import DWORD -from prompt_toolkit.eventloop import ensure_future, From -from prompt_toolkit.eventloop import get_event_loop + from prompt_toolkit.input.win32 import Win32Input from prompt_toolkit.output import ColorDepth from prompt_toolkit.output.win32 import Win32Output from prompt_toolkit.win32_types import STD_OUTPUT_HANDLE -import json -import os -import sys from ..pipes.win32_client import PipeClient from .base import Client __all__ = [ - 'WindowsClient', - 'list_clients', + "WindowsClient", + "list_clients", ] # See: https://msdn.microsoft.com/pl-pl/library/windows/desktop/ms686033(v=vs.85).aspx @@ -29,32 +27,35 @@ class WindowsClient(Client): def __init__(self, pipe_name): self._input = Win32Input() self._hconsole = windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE) - self._data_buffer = b'' + self._data_buffer = b"" self.pipe = PipeClient(pipe_name) - def attach(self, detach_other_clients=False, color_depth=ColorDepth.DEPTH_8_BIT): - assert isinstance(detach_other_clients, bool) + def attach( + self, detach_other_clients: bool = False, color_depth=ColorDepth.DEPTH_8_BIT + ): self._send_size() - self._send_packet({ - 'cmd': 'start-gui', - 'detach-others': detach_other_clients, - 'color-depth': color_depth, - 'term': os.environ.get('TERM', ''), - 'data': '' - }) + self._send_packet( + { + "cmd": "start-gui", + "detach-others": detach_other_clients, + "color-depth": color_depth, + "term": os.environ.get("TERM", ""), + "data": "", + } + ) f = ensure_future(self._start_reader()) with self._input.attach(self._input_ready): # Run as long as we have a connection with the server. get_event_loop().run_until_complete(f) # Run forever. - def _start_reader(self): + async def _start_reader(self): """ Read messages from the Win32 pipe server and handle them. """ while True: - message = yield From(self.pipe.read_message()) + message = await self.pipe.read_message() self._process(message) def _process(self, data_buffer): @@ -63,24 +64,26 @@ def _process(self, data_buffer): """ packet = json.loads(data_buffer) - if packet['cmd'] == 'out': + if packet["cmd"] == "out": # Call os.write manually. In Python2.6, sys.stdout.write doesn't use UTF-8. original_mode = DWORD(0) windll.kernel32.GetConsoleMode(self._hconsole, byref(original_mode)) - windll.kernel32.SetConsoleMode(self._hconsole, DWORD( - ENABLE_PROCESSED_INPUT | ENABLE_VIRTUAL_TERMINAL_PROCESSING)) + windll.kernel32.SetConsoleMode( + self._hconsole, + DWORD(ENABLE_PROCESSED_INPUT | ENABLE_VIRTUAL_TERMINAL_PROCESSING), + ) try: - os.write(sys.stdout.fileno(), packet['data'].encode('utf-8')) + os.write(sys.stdout.fileno(), packet["data"].encode("utf-8")) finally: windll.kernel32.SetConsoleMode(self._hconsole, original_mode) - elif packet['cmd'] == 'suspend': + elif packet["cmd"] == "suspend": # Suspend client process to background. pass - elif packet['cmd'] == 'mode': + elif packet["cmd"] == "mode": pass # # Set terminal to raw/cooked. @@ -103,25 +106,24 @@ def _process(self, data_buffer): def _input_ready(self): keys = self._input.read_keys() if keys: - self._send_packet({ - 'cmd': 'in', - 'data': ''.join(key_press.data for key_press in keys), - }) + self._send_packet( + { + "cmd": "in", + "data": "".join(key_press.data for key_press in keys), + } + ) def _send_packet(self, data): - " Send to server. " + "Send to server." data = json.dumps(data) ensure_future(self.pipe.write_message(data)) def _send_size(self): - " Report terminal size to server. " + "Report terminal size to server." output = Win32Output(sys.stdout) rows, cols = output.get_size() - self._send_packet({ - 'cmd': 'size', - 'data': [rows, cols] - }) + self._send_packet({"cmd": "size", "data": [rows, cols]}) def list_clients(): diff --git a/pymux/commands/aliases.py b/pymux/commands/aliases.py index 9984641..a46e333 100644 --- a/pymux/commands/aliases.py +++ b/pymux/commands/aliases.py @@ -2,47 +2,43 @@ Aliases for all commands. (On purpose kept compatible with tmux.) """ -from __future__ import unicode_literals - -__all__ = ( - 'ALIASES', -) +__all__ = ["ALIASES"] ALIASES = { - 'bind': 'bind-key', - 'breakp': 'break-pane', - 'clearhist': 'clear-history', - 'confirm': 'confirm-before', - 'detach': 'detach-client', - 'display': 'display-message', - 'displayp': 'display-panes', - 'killp': 'kill-pane', - 'killw': 'kill-window', - 'last': 'last-window', - 'lastp': 'last-pane', - 'lextl': 'next-layout', - 'lsk': 'list-keys', - 'lsp': 'list-panes', - 'movew': 'move-window', - 'neww': 'new-window', - 'next': 'next-window', - 'pasteb': 'paste-buffer', - 'prev': 'previous-window', - 'prevl': 'previous-layout', - 'rename': 'rename-session', - 'renamew': 'rename-window', - 'resizep': 'resize-pane', - 'rotatew': 'rotate-window', - 'selectl': 'select-layout', - 'selectp': 'select-pane', - 'selectw': 'select-window', - 'send': 'send-keys', - 'set': 'set-option', - 'setw': 'set-window-option', - 'source': 'source-file', - 'splitw': 'split-window', - 'suspendc': 'suspend-client', - 'swapp': 'swap-pane', - 'unbind': 'unbind-key', + "bind": "bind-key", + "breakp": "break-pane", + "clearhist": "clear-history", + "confirm": "confirm-before", + "detach": "detach-client", + "display": "display-message", + "displayp": "display-panes", + "killp": "kill-pane", + "killw": "kill-window", + "last": "last-window", + "lastp": "last-pane", + "lextl": "next-layout", + "lsk": "list-keys", + "lsp": "list-panes", + "movew": "move-window", + "neww": "new-window", + "next": "next-window", + "pasteb": "paste-buffer", + "prev": "previous-window", + "prevl": "previous-layout", + "rename": "rename-session", + "renamew": "rename-window", + "resizep": "resize-pane", + "rotatew": "rotate-window", + "selectl": "select-layout", + "selectp": "select-pane", + "selectw": "select-window", + "send": "send-keys", + "set": "set-option", + "setw": "set-window-option", + "source": "source-file", + "splitw": "split-window", + "suspendc": "suspend-client", + "swapp": "swap-pane", + "unbind": "unbind-key", } diff --git a/pymux/commands/commands.py b/pymux/commands/commands.py index 6169aad..17ef697 100644 --- a/pymux/commands/commands.py +++ b/pymux/commands/commands.py @@ -1,92 +1,98 @@ -from __future__ import unicode_literals -import docopt import os import re import shlex -import six +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, TypeVar +import docopt from prompt_toolkit.application.current import get_app from prompt_toolkit.document import Document from prompt_toolkit.key_binding.vi_state import InputMode - from pymux.arrangement import LayoutTypes from pymux.commands.aliases import ALIASES from pymux.commands.utils import wrap_argument from pymux.format import format_pymux_string -from pymux.key_mappings import pymux_key_to_prompt_toolkit_key_sequence, prompt_toolkit_key_to_vt100_key -from pymux.layout import focus_right, focus_left, focus_up, focus_down +from pymux.key_mappings import ( + prompt_toolkit_key_to_vt100_key, + pymux_key_to_prompt_toolkit_key_sequence, +) +from pymux.layout import focus_down, focus_left, focus_right, focus_up from pymux.log import logger from pymux.options import SetOptionError -__all__ = ( - 'call_command_handler', - 'get_documentation_for_command', - 'get_option_flags_for_command', - 'handle_command', - 'has_command_handler', -) +if TYPE_CHECKING: + from pymux.main import Pymux + +__all__ = [ + "call_command_handler", + "get_documentation_for_command", + "get_option_flags_for_command", + "handle_command", + "has_command_handler", +] -COMMANDS_TO_HANDLERS = {} # Global mapping of pymux commands to their handlers. -COMMANDS_TO_HELP = {} -COMMANDS_TO_OPTION_FLAGS = {} +_VariablesList = List[str] +_VariablesDict = Dict[str, Any] +_PymuxHandler = Callable[["Pymux", _VariablesList], None] +_PymuxDictHandler = Callable[["Pymux", _VariablesDict], None] +# Global mapping of pymux commands to their handlers. +COMMANDS_TO_HANDLERS: Dict[str, _PymuxHandler] = {} -def has_command_handler(command): +COMMANDS_TO_HELP: Dict[str, str] = {} +COMMANDS_TO_OPTION_FLAGS: Dict[str, List[str]] = {} + + +def has_command_handler(command: str) -> bool: return command in COMMANDS_TO_HANDLERS -def get_documentation_for_command(command): - """ Return the help text for this command, or None if the command is not - known. """ +def get_documentation_for_command(command: str) -> Optional[str]: + """ + Return the help text for this command, or None if the command is not known. + """ if command in COMMANDS_TO_HELP: - return 'Usage: %s %s' % (command, COMMANDS_TO_HELP.get(command, '')) + return "Usage: %s %s" % (command, COMMANDS_TO_HELP.get(command, "")) + + return None -def get_option_flags_for_command(command): - " Return a list of options (-x flags) for this command. " +def get_option_flags_for_command(command: str) -> List[str]: + "Return a list of options (-x flags) for this command." return COMMANDS_TO_OPTION_FLAGS.get(command, []) -def handle_command(pymux, input_string): +def handle_command(pymux: "Pymux", input_string: str) -> None: """ Handle command. """ - assert isinstance(input_string, six.text_type) - input_string = input_string.strip() - logger.info('handle command: %s %s.', input_string, type(input_string)) + logger.info("handle command: %s %s.", input_string, type(input_string)) - if input_string and not input_string.startswith('#'): # Ignore comments. + if input_string and not input_string.startswith("#"): # Ignore comments. try: - if six.PY2: - # In Python2.6, shlex doesn't work with unicode input at all. - # In Python2.7, shlex tries to encode using ASCII. - parts = shlex.split(input_string.encode('utf-8')) - parts = [p.decode('utf-8') for p in parts] - else: - parts = shlex.split(input_string) + parts = shlex.split(input_string) except ValueError as e: # E.g. missing closing quote. - pymux.show_message('Invalid command %s: %s' % (input_string, e)) + pymux.show_message("Invalid command %s: %s" % (input_string, e)) else: call_command_handler(parts[0], pymux, parts[1:]) -def call_command_handler(command, pymux, arguments): +def call_command_handler( + command: str, pymux: "Pymux", arguments: _VariablesList +) -> None: """ Execute command. :param arguments: List of options. """ - assert isinstance(arguments, list) - # Resolve aliases. command = ALIASES.get(command, command) try: handler = COMMANDS_TO_HANDLERS[command] except KeyError: - pymux.show_message('Invalid command: %s' % (command,)) + pymux.show_message("Invalid command: %s" % (command,)) else: try: handler(pymux, arguments) @@ -94,7 +100,10 @@ def call_command_handler(command, pymux, arguments): pymux.show_message(e.message) -def cmd(name, options=''): +_F = TypeVar("_F", bound=_PymuxDictHandler) + + +def cmd(name: str, options: str = "") -> Callable[[_F], _F]: """ Decorator for all commands. @@ -104,45 +113,35 @@ def cmd(name, options=''): # Validate options. if options: try: - docopt.docopt('Usage:\n %s %s' % (name, options, ), []) + docopt.docopt( + "Usage:\n %s %s" + % ( + name, + options, + ), + [], + ) except SystemExit: pass - def decorator(func): - def command_wrapper(pymux, arguments): + def decorator(func: _F) -> _F: + def command_wrapper(pymux: "Pymux", arguments: _VariablesList) -> None: # Hack to make the 'bind-key' option work. # (bind-key expects a variable number of arguments.) - if name == 'bind-key' and '--' not in arguments: + if name == "bind-key" and "--" not in arguments: # Insert a double dash after the first non-option. for i, p in enumerate(arguments): - if not p.startswith('-'): - arguments.insert(i + 1, '--') + if not p.startswith("-"): + arguments.insert(i + 1, "--") break # Parse options. try: - # Python 2 workaround: pass bytes to docopt. - # From the following, only the bytes version returns the right - # output in Python 2: - # docopt.docopt('Usage:\n app ...', [b'a', b'b']) - # docopt.docopt('Usage:\n app ...', [u'a', u'b']) - # https://github.com/docopt/docopt/issues/30 - # (Not sure how reliable this is...) - if six.PY2: - arguments = [a.encode('utf-8') for a in arguments] - - received_options = docopt.docopt( - 'Usage:\n %s %s' % (name, options), - arguments, - help=False) # Don't interpret the '-h' option as help. - - # Make sure that all the received options from docopt are - # unicode objects. (Docopt returns 'str' for Python2.) - for k, v in received_options.items(): - if isinstance(v, six.binary_type): - received_options[k] = v.decode('utf-8') + received_options: Dict[str, str] = docopt.docopt( + "Usage:\n %s %s" % (name, options), arguments, help=False + ) # Don't interpret the '-h' option as help. except SystemExit: - raise CommandException('Usage: %s %s' % (name, options)) + raise CommandException("Usage: %s %s" % (name, options)) # Call handler. func(pymux, received_options) @@ -154,70 +153,76 @@ def command_wrapper(pymux, arguments): COMMANDS_TO_HELP[name] = options # Get list of option flags. - flags = re.findall(r'-[a-zA-Z0-9]\b', options) + flags = re.findall(r"-[a-zA-Z0-9]\b", options) COMMANDS_TO_OPTION_FLAGS[name] = flags return func + return decorator class CommandException(Exception): - " When raised from a command handler, this message will be shown. " - def __init__(self, message): + "When raised from a command handler, this message will be shown." + + def __init__(self, message: str) -> None: self.message = message + # # The actual commands. # -@cmd('break-pane', options='[-d]') -def break_pane(pymux, variables): - dont_focus_window = variables['-d'] +@cmd("break-pane", options="[-d]") +def break_pane(pymux: "Pymux", variables: _VariablesDict) -> None: + dont_focus_window = variables["-d"] pymux.arrangement.break_pane(set_active=not dont_focus_window) pymux.invalidate() -@cmd('select-pane', options='(-L|-R|-U|-D|-t )') -def select_pane(pymux, variables): - - if variables['-t']: - pane_id = variables[''] +@cmd("select-pane", options="(-L|-R|-U|-D|-t )") +def select_pane(pymux: "Pymux", variables: _VariablesDict) -> None: + if variables["-t"]: + pane_id = variables[""] w = pymux.arrangement.get_active_window() - if pane_id == ':.+': + if pane_id == ":.+": w.focus_next() - elif pane_id == ':.-': + elif pane_id == ":.-": w.focus_previous() else: # Select pane by index. try: - pane_id = int(pane_id[1:]) - w.active_pane = w.panes[pane_id] + pane_id_int = int(pane_id[1:]) + w.active_pane = w.panes[pane_id_int] except (IndexError, ValueError): - raise CommandException('Invalid pane.') + raise CommandException("Invalid pane.") else: - if variables['-L']: h = focus_left - if variables['-U']: h = focus_up - if variables['-D']: h = focus_down - if variables['-R']: h = focus_right + if variables["-L"]: + h = focus_left + if variables["-U"]: + h = focus_up + if variables["-D"]: + h = focus_down + if variables["-R"]: + h = focus_right h(pymux) -@cmd('select-window', options='(-t )') -def select_window(pymux, variables): +@cmd("select-window", options="(-t )") +def select_window(pymux: "Pymux", variables: _VariablesDict) -> None: """ Select a window. E.g: select-window -t :3 """ - window_id = variables[''] + window_id = variables[""] def invalid_window(): - raise CommandException('Invalid window: %s' % window_id) + raise CommandException("Invalid window: %s" % window_id) - if window_id.startswith(':'): + if window_id.startswith(":"): try: number = int(window_id[1:]) except ValueError: @@ -232,16 +237,16 @@ def invalid_window(): invalid_window() -@cmd('move-window', options='(-t )') -def move_window(pymux, variables): +@cmd("move-window", options="(-t )") +def move_window(pymux: "Pymux", variables: _VariablesDict) -> None: """ Move window to a new index. """ - dst_window = variables[''] + dst_window = variables[""] try: new_index = int(dst_window) except ValueError: - raise CommandException('Invalid window index: %r' % (dst_window, )) + raise CommandException("Invalid window index: %r" % (dst_window,)) # Check first whether the index was not yet taken. if pymux.arrangement.get_window_by_index(new_index): @@ -252,49 +257,49 @@ def move_window(pymux, variables): pymux.arrangement.move_window(w, new_index) -@cmd('rotate-window', options='[-D|-U]') -def rotate_window(pymux, variables): - if variables['-D']: +@cmd("rotate-window", options="[-D|-U]") +def rotate_window(pymux: "Pymux", variables: _VariablesDict) -> None: + if variables["-D"]: pymux.arrangement.rotate_window(count=-1) else: pymux.arrangement.rotate_window() -@cmd('swap-pane', options='(-D|-U)') -def swap_pane(pymux, variables): - pymux.arrangement.get_active_window().rotate(with_pane_after_only=variables['-U']) +@cmd("swap-pane", options="(-D|-U)") +def swap_pane(pymux: "Pymux", variables: _VariablesDict) -> None: + pymux.arrangement.get_active_window().rotate(with_pane_after_only=variables["-U"]) -@cmd('kill-pane') -def kill_pane(pymux, variables): +@cmd("kill-pane") +def kill_pane(pymux: "Pymux", variables: _VariablesDict) -> None: pane = pymux.arrangement.get_active_pane() pymux.kill_pane(pane) -@cmd('kill-window') -def kill_window(pymux, variables): - " Kill all panes in the current window. " +@cmd("kill-window") +def kill_window(pymux: "Pymux", variables: _VariablesDict) -> None: + "Kill all panes in the current window." for pane in pymux.arrangement.get_active_window().panes: pymux.kill_pane(pane) -@cmd('suspend-client') -def suspend_client(pymux, variables): +@cmd("suspend-client") +def suspend_client(pymux: "Pymux", variables: _VariablesDict) -> None: connection = pymux.get_connection() if connection: connection.suspend_client_to_background() -@cmd('clock-mode') -def clock_mode(pymux, variables): +@cmd("clock-mode") +def clock_mode(pymux: "Pymux", variables: _VariablesDict) -> None: pane = pymux.arrangement.get_active_pane() if pane: pane.clock_mode = not pane.clock_mode -@cmd('last-pane') -def last_pane(pymux, variables): +@cmd("last-pane") +def last_pane(pymux: "Pymux", variables: _VariablesDict) -> None: w = pymux.arrangement.get_active_window() prev_active_pane = w.previous_active_pane @@ -302,111 +307,116 @@ def last_pane(pymux, variables): w.active_pane = prev_active_pane -@cmd('next-layout') -def next_layout(pymux, variables): - " Select next layout. " +@cmd("next-layout") +def next_layout(pymux: "Pymux", variables: _VariablesDict) -> None: + "Select next layout." pane = pymux.arrangement.get_active_window() if pane: pane.select_next_layout() -@cmd('previous-layout') -def previous_layout(pymux, variables): - " Select previous layout. " +@cmd("previous-layout") +def previous_layout(pymux: "Pymux", variables: _VariablesDict) -> None: + "Select previous layout." pane = pymux.arrangement.get_active_window() if pane: pane.select_previous_layout() -@cmd('new-window', options='[(-n )] [(-c )] []') -def new_window(pymux, variables): - executable = variables[''] - start_directory = variables[''] - name = variables[''] +@cmd("new-window", options="[(-n )] [(-c )] []") +def new_window(pymux: "Pymux", variables: _VariablesDict) -> None: + executable = variables[""] + start_directory = variables[""] + name = variables[""] pymux.create_window(executable, start_directory=start_directory, name=name) -@cmd('next-window') -def next_window(pymux, variables): - " Focus the next window. " +@cmd("next-window") +def next_window(pymux: "Pymux", variables: _VariablesDict) -> None: + "Focus the next window." pymux.arrangement.focus_next_window() -@cmd('last-window') -def _(pymux, variables): - " Go to previous active window. " +@cmd("last-window") +def _(pymux: "Pymux", variables: _VariablesDict) -> None: + "Go to previous active window." w = pymux.arrangement.get_previous_active_window() if w: pymux.arrangement.set_active_window(w) -@cmd('previous-window') -def previous_window(pymux, variables): - " Focus the previous window. " +@cmd("previous-window") +def previous_window(pymux: "Pymux", variables: _VariablesDict) -> None: + "Focus the previous window." pymux.arrangement.focus_previous_window() -@cmd('select-layout', options='') -def select_layout(pymux, variables): - layout_type = variables[''] +@cmd("select-layout", options="") +def select_layout(pymux: "Pymux", variables: _VariablesDict) -> None: + layout_type = variables[""] - if layout_type in LayoutTypes._ALL: - pymux.arrangement.get_active_window().select_layout(layout_type) + try: + layout_type_obj: LayoutTypes = LayoutTypes(layout_type_obj) + except ValueError: + pymux.arrangement.get_active_window().select_layout(layout_type_obj) else: - raise CommandException('Invalid layout type.') + raise CommandException("Invalid layout type.") -@cmd('rename-window', options='') -def rename_window(pymux, variables): +@cmd("rename-window", options="") +def rename_window(pymux: "Pymux", variables: _VariablesDict) -> None: """ Rename the active window. """ - pymux.arrangement.get_active_window().chosen_name = variables[''] + pymux.arrangement.get_active_window().chosen_name = variables[""] -@cmd('rename-pane', options='') -def rename_pane(pymux, variables): +@cmd("rename-pane", options="") +def rename_pane(pymux: "Pymux", variables: _VariablesDict) -> None: """ Rename the active pane. """ - pymux.arrangement.get_active_pane().chosen_name = variables[''] + pymux.arrangement.get_active_pane().chosen_name = variables[""] -@cmd('rename-session', options='') -def rename_session(pymux, variables): +@cmd("rename-session", options="") +def rename_session(pymux: "Pymux", variables: _VariablesDict) -> None: """ Rename this session. """ - pymux.session_name = variables[''] + pymux.session_name = variables[""] -@cmd('split-window', options='[-v|-h] [(-c )] []') -def split_window(pymux, variables): +@cmd("split-window", options="[-v|-h] [(-c )] []") +def split_window(pymux: "Pymux", variables: _VariablesDict) -> None: """ Split horizontally or vertically. """ - executable = variables[''] - start_directory = variables[''] + executable = variables[""] + start_directory = variables[""] # The tmux definition of horizontal is the opposite of prompt_toolkit. - pymux.add_process(executable, vsplit=variables['-h'], - start_directory=start_directory) + pymux.add_process( + executable, vsplit=variables["-h"], start_directory=start_directory + ) -@cmd('resize-pane', options="[(-L )] [(-U )] [(-D )] [(-R )] [-Z]") -def resize_pane(pymux, variables): +@cmd( + "resize-pane", options="[(-L )] [(-U )] [(-D )] [(-R )] [-Z]" +) +def resize_pane(pymux: "Pymux", variables: _VariablesDict) -> None: """ Resize/zoom the active pane. """ try: - left = int(variables[''] or 0) - right = int(variables[''] or 0) - up = int(variables[''] or 0) - down = int(variables[''] or 0) + left = int(variables[""] or 0) + right = int(variables[""] or 0) + up = int(variables[""] or 0) + down = int(variables[""] or 0) except ValueError: - raise CommandException('Expecting an integer.') + raise CommandException("Expecting an integer.") w = pymux.arrangement.get_active_window() @@ -414,47 +424,50 @@ def resize_pane(pymux, variables): w.change_size_for_active_pane(up=up, right=right, down=down, left=left) # Zoom in/out. - if variables['-Z']: + if variables["-Z"]: w.zoom = not w.zoom -@cmd('detach-client') -def detach_client(pymux, variables): +@cmd("detach-client") +def detach_client(pymux: "Pymux", variables: _VariablesDict) -> None: """ Detach client. """ pymux.detach_client(get_app()) -@cmd('confirm-before', options='[(-p )] ') -def confirm_before(pymux, variables): +@cmd("confirm-before", options="[(-p )] ") +def confirm_before(pymux: "Pymux", variables: _VariablesDict) -> None: client_state = pymux.get_client_state() - client_state.confirm_text = variables[''] or '' - client_state.confirm_command = variables[''] + client_state.confirm_text = variables[""] or "" + client_state.confirm_command = variables[""] -@cmd('command-prompt', options='[(-p )] [(-I )] []') -def command_prompt(pymux, variables): +@cmd("command-prompt", options="[(-p )] [(-I )] []") +def command_prompt(pymux: "Pymux", variables: _VariablesDict) -> None: """ Enter command prompt. """ client_state = pymux.get_client_state() - if variables['']: + if variables[""]: # When a 'command' has been given. - client_state.prompt_text = variables[''] or '(%s)' % variables[''].split()[0] - client_state.prompt_command = variables[''] + client_state.prompt_text = ( + variables[""] or "(%s)" % variables[""].split()[0] + ) + client_state.prompt_command = variables[""] client_state.prompt_mode = True - client_state.prompt_buffer.reset(Document( - format_pymux_string(pymux, variables[''] or ''))) + client_state.prompt_buffer.reset( + Document(format_pymux_string(pymux, variables[""] or "")) + ) get_app().layout.focus(client_state.prompt_buffer) else: # Show the ':' prompt. - client_state.prompt_text = '' - client_state.prompt_command = '' + client_state.prompt_text = "" + client_state.prompt_command = "" get_app().layout.focus(client_state.command_buffer) @@ -462,8 +475,8 @@ def command_prompt(pymux, variables): get_app().vi_state.input_mode = InputMode.INSERT -@cmd('send-prefix') -def send_prefix(pymux, variables): +@cmd("send-prefix") +def send_prefix(pymux: "Pymux", variables: _VariablesDict) -> None: """ Send prefix to active pane. """ @@ -474,72 +487,72 @@ def send_prefix(pymux, variables): process.write_input(vt100_data) -@cmd('bind-key', options='[-n] [--] [...]') -def bind_key(pymux, variables): +@cmd("bind-key", options="[-n] [--] [...]") +def bind_key(pymux: "Pymux", variables: _VariablesDict) -> None: """ Bind a key sequence. -n: Not necessary to use the prefix. """ - key = variables[''] - command = variables[''] - arguments = variables[''] - needs_prefix = not variables['-n'] + key = variables[""] + command = variables[""] + arguments = variables[""] + needs_prefix = not variables["-n"] try: pymux.key_bindings_manager.add_custom_binding( - key, command, arguments, needs_prefix=needs_prefix) + key, command, arguments, needs_prefix=needs_prefix + ) except ValueError: - raise CommandException('Invalid key: %r' % (key, )) + raise CommandException("Invalid key: %r" % (key,)) -@cmd('unbind-key', options='[-n] ') -def unbind_key(pymux, variables): +@cmd("unbind-key", options="[-n] ") +def unbind_key(pymux: "Pymux", variables: _VariablesDict) -> None: """ Remove key binding. """ - key = variables[''] - needs_prefix = not variables['-n'] + key = variables[""] + needs_prefix = not variables["-n"] - pymux.key_bindings_manager.remove_custom_binding( - key, needs_prefix=needs_prefix) + pymux.key_bindings_manager.remove_custom_binding(key, needs_prefix=needs_prefix) -@cmd('send-keys', options='...') -def send_keys(pymux, variables): +@cmd("send-keys", options="...") +def send_keys(pymux: "Pymux", variables: _VariablesDict) -> None: """ Send key strokes to the active process. """ pane = pymux.arrangement.get_active_pane() if pane.display_scroll_buffer: - raise CommandException('Cannot send keys. Pane is in copy mode.') + raise CommandException("Cannot send keys. Pane is in copy mode.") - for key in variables['']: + for key in variables[""]: # Translate key from pymux key to prompt_toolkit key. try: keys_sequence = pymux_key_to_prompt_toolkit_key_sequence(key) except ValueError: - raise CommandException('Invalid key: %r' % (key, )) + raise CommandException("Invalid key: %r" % (key,)) # Translate prompt_toolkit key to VT100 key. for k in keys_sequence: pane.process.write_key(k) -@cmd('copy-mode', options='[-u]') -def copy_mode(pymux, variables): +@cmd("copy-mode", options="[-u]") +def copy_mode(pymux: "Pymux", variables: _VariablesDict) -> None: """ Enter copy mode. """ - go_up = variables['-u'] # Go in copy mode and page-up directly. + go_up = variables["-u"] # Go in copy mode and page-up directly. # TODO: handle '-u' pane = pymux.arrangement.get_active_pane() pane.enter_copy_mode() -@cmd('paste-buffer') -def paste_buffer(pymux, variables): +@cmd("paste-buffer") +def paste_buffer(pymux: "Pymux", variables: _VariablesDict) -> None: """ Paste clipboard content into buffer. """ @@ -547,25 +560,24 @@ def paste_buffer(pymux, variables): pane.process.write_input(get_app().clipboard.get_data().text, paste=True) -@cmd('source-file', options='') -def source_file(pymux, variables): +@cmd("source-file", options="") +def source_file(pymux: "Pymux", variables: _VariablesDict) -> None: """ Source configuration file. """ - filename = os.path.expanduser(variables['']) + filename = os.path.expanduser(variables[""]) try: - with open(filename, 'rb') as f: + with open(filename, "r") as f: for line in f: - line = line.decode('utf-8') handle_command(pymux, line) except IOError as e: - raise CommandException('IOError: %s' % (e, )) + raise CommandException("IOError: %s" % (e,)) -@cmd('set-option', options='