From 312b3c7f66cc4bed7085e6a346b913fde50bb89b Mon Sep 17 00:00:00 2001 From: fishroot Date: Sun, 25 Nov 2018 19:19:40 +0100 Subject: [PATCH] fixes #470 --- nemoa/__init__.py | 2 +- nemoa/base/__test__.py | 18 ++- nemoa/base/tty.py | 254 +++++++++++++++++++++------------- nemoa/model/morphisms/base.py | 23 +-- nemoa/session/classes/base.py | 59 ++------ nemoa/test.py | 4 + 6 files changed, 203 insertions(+), 157 deletions(-) diff --git a/nemoa/__init__.py b/nemoa/__init__.py index 37dd550a..783a7440 100644 --- a/nemoa/__init__.py +++ b/nemoa/__init__.py @@ -11,7 +11,7 @@ https://en.wikipedia.org/wiki/Structured_data_analysis_(statistics) """ -__version__ = '0.5.292' +__version__ = '0.5.294' __status__ = 'Development' __description__ = 'Network-based Modeling and Data Analysis' __url__ = 'https://frootlab.github.io/nemoa' diff --git a/nemoa/base/__test__.py b/nemoa/base/__test__.py index 1fa13a0e..54d8aaff 100644 --- a/nemoa/base/__test__.py +++ b/nemoa/base/__test__.py @@ -9,6 +9,7 @@ import tempfile import datetime from pathlib import Path +from unittest import skipIf from nemoa.base import entity, binary, check, env, literal, this, tty from nemoa.base import nbase, ndict from nemoa.test import ModuleTestCase, Case @@ -20,6 +21,7 @@ # osname = env.get_osname() +ttylib = tty.get_lib().__name__ # # Test Cases @@ -648,9 +650,19 @@ class TestTTY(ModuleTestCase): def test_get_lib(self) -> None: self.assertIsInstance(tty.get_lib(), Module) - def test_Getch(self) -> None: - obj = tty.Getch() if callable(tty.Getch) else None - self.assertIsInstance(obj, tty.GetchBase) + def test_get_class(self) -> None: + self.assertIsSubclass(tty.get_class(), tty.TTYBase) + + def test_get_instance(self) -> None: + self.assertIsInstance(tty.get_instance(), tty.TTYBase) + + @skipIf(ttylib != 'msvcrt', 'Requires Windows') + def test_TTYMsvcrt(self) -> None: + pass + + @skipIf(ttylib != 'termios', 'Requires Unix') + def test_TTYTermios(self) -> None: + pass class TestLiteral(ModuleTestCase): """Testcase for the module nemoa.base.literal.""" diff --git a/nemoa/base/tty.py b/nemoa/base/tty.py index 2dc9c028..c47fe3cb 100644 --- a/nemoa/base/tty.py +++ b/nemoa/base/tty.py @@ -12,60 +12,65 @@ from queue import Empty, Queue from threading import Thread from nemoa.base import entity, env -from nemoa.types import Module, OptModule, ClassVar +from nemoa.types import Any, Module, ClassVar, Exc, ExcType, Traceback, Method +from nemoa.types import OptStr -def get_lib() -> OptModule: - """Get module for tty I/O control. +# +# TTY Classes +# - Depending on the plattform the module within the standard library, which is - required for tty I/O control differs. The module :py:mod:`termios` provides - an interface to the POSIX calls for tty I/O control. The module - :py:mod:`msvcrt` provides access to some useful capabilities on Windows - platforms. +class TTYBase(ABC): + """Abstract base class for text terminals.""" - Returns: - Reference to module for tty I/O control or None, if the module could - not be determined. - - """ - libs = ['msvcrt', 'termios'] - for name in libs: - ref = entity.get_module(name) - if ref: - return ref - return None + _ttylib: Module + _cur_attr: Any -class GetchBase(ABC): - """Abstract base class for Getch classes.""" + def __init__(self, mode: OptStr = None) -> None: + """Modify terminal attributes.""" + self._ttylib = get_lib() + self._cur_attr = self.get_attr() + if mode: + self.set_mode(mode) - ttylib: Module + def __enter__(self) -> 'TTYBase': + return self - def __init__(self) -> None: - """Initialize instance.""" - self.ttylib = get_lib() - self.start() + def __exit__(self, cls: ExcType, obj: Exc, tb: Traceback) -> None: + """Reset current terminal attributes.""" + self.reset() def __del__(self) -> None: - """Release resources required for handling :meth:`.getch` requests.""" - self.stop() - self.ttylib = None + """Reset current terminal attributes.""" + self.reset() + if hasattr(self, 'ttylib'): + del self._ttylib + + def reset(self) -> None: + """Reset current terminal attributes to it's initial value.""" + if hasattr(self, 'cur_attr'): + self.set_attr(self._cur_attr) @abstractmethod - def start(self) -> None: - """Start handling of :meth:`.getch` requests.""" + def get_attr(self) -> Any: + """Get current terminal attributes.""" raise NotImplementedError() @abstractmethod - def getch(self) -> str: - """Get character from TTY.""" + def set_attr(self, attr: Any) -> None: + """Set current terminal attributes.""" raise NotImplementedError() @abstractmethod - def stop(self) -> None: - """Stop handling of :meth:`.getch` requests.""" + def set_mode(self, mode: str) -> None: + """Set current terminal mode.""" raise NotImplementedError() -class GetchMsvcrt(GetchBase): + @abstractmethod + def getch(self) -> str: + """Get character from TTY.""" + raise NotImplementedError() + +class TTYMsvcrt(TTYBase): """Windows/msvcrt implementation of Getch. This implementation supports Microsoft Windows by using the Microsoft Visual @@ -73,25 +78,25 @@ class GetchMsvcrt(GetchBase): """ - encoding: ClassVar[str] = env.get_encoding() + _encoding: ClassVar[str] = env.get_encoding() - def start(self) -> None: - """Start handling of :meth:`.getch` requests.""" - pass + def get_attr(self) -> Any: + """Get attributes of current terminal.""" + return None - def getch(self) -> str: - """Get character from tty.""" - if not isinstance(self.ttylib, Module): - return '' - if not getattr(self.ttylib, 'kbhit')(): - return '' - return str(getattr(self.ttylib, 'getch')(), self.encoding) + def set_attr(self, attr: Any) -> None: + """Set attributes of current terminal.""" + pass - def stop(self) -> None: - """Stop handling of :meth:`.getch` requests.""" + def set_mode(self, mode: str) -> None: + """Set mode of current terminal.""" pass -class GetchTermios(GetchBase): + def getch(self) -> str: + """Get character from TTY.""" + return str(getattr(self._ttylib, 'getch')(), self._encoding) + +class TTYTermios(TTYBase): """Unix/Termios implementation of Getch. This implementation supports Unix-like systems by using the Unix Terminal @@ -99,69 +104,118 @@ class GetchTermios(GetchBase): """ - buffer: Queue - runsignal: bool - time: float - curterm: list - fdesc: int - thread: Thread - - def start(self) -> None: - """Change terminal mode and start reading stdin to buffer.""" - # Get current tty attributes - tcgetattr = getattr(self.ttylib, 'tcgetattr') - self.fdesc = sys.stdin.fileno() - self.curterm = tcgetattr(self.fdesc) - - # Modify lflag from current TTY attributes - # to set terminal to unbuffered mode (not waiting for Enter) - newattr = tcgetattr(self.fdesc) - if isinstance(newattr[3], int): - ECHO = getattr(self.ttylib, 'ECHO') - ICANON = getattr(self.ttylib, 'ICANON') - newattr[3] = newattr[3] & ~ICANON & ~ECHO - tcsetattr = getattr(self.ttylib, 'tcsetattr') - TCSAFLUSH = getattr(self.ttylib, 'TCSAFLUSH') - tcsetattr(self.fdesc, TCSAFLUSH, newattr) + _fd: int + _tcgetattr: Method + _tcsetattr: Method + _buffer: Queue + _resume: bool + _thread: Thread + _time: float + + def __init__(self, mode: OptStr = None) -> None: + self._fd = sys.stdin.fileno() + super().__init__(mode) + def get_attr(self) -> Any: + """Get attributes of current terminal.""" + try: + return self._tcgetattr(self._fd) + except AttributeError: + self._tcgetattr = getattr(self._ttylib, 'tcgetattr') + return self._tcgetattr(self._fd) + + def set_attr(self, attr: Any) -> None: + """Set attributes of current terminal.""" + TCSAFLUSH = getattr(self._ttylib, 'TCSAFLUSH') + try: + return self._tcsetattr(self._fd, TCSAFLUSH, attr) + except AttributeError: + self._tcsetattr = getattr(self._ttylib, 'tcsetattr') + self._tcsetattr(self._fd, TCSAFLUSH, attr) + + def set_mode(self, mode: str) -> None: + """Set mode of current terminal.""" + # Buffered terminal for 'line'-mode: + # Echo Chars; Wait for Enter + if mode == 'line': + # Modify lflag from current TTY attributes + attr = self._cur_attr.copy() + if isinstance(attr[3], int): + ECHO = getattr(self._ttylib, 'ECHO') + ICANON = getattr(self._ttylib, 'ICANON') + attr[3] = attr[3] | ICANON | ECHO + self.set_attr(attr) + # Unbufered terminal for 'key'-mode: + # No Echo; Don't wait for Enter + elif mode == 'key': + # Modify lflag from current TTY attributes + attr = self._cur_attr.copy() + if isinstance(attr[3], int): + ECHO = getattr(self._ttylib, 'ECHO') + ICANON = getattr(self._ttylib, 'ICANON') + attr[3] = attr[3] & ~ICANON & ~ECHO + self.set_attr(attr) + + def start_getch(self) -> None: + """Start handling of :meth:`.getch` requests.""" # Initialize buffer and start thread for reading stdio to buffer def buffer(attr: dict) -> None: - while attr['resume']: - attr['buffer'].put(sys.stdin.read(1)) - self.resume = True - self.buffer = Queue() - self.thread = Thread( + while attr['_resume']: + attr['_buffer'].put(sys.stdin.read(1)) + + self._resume = True + self._buffer = Queue() + self._thread = Thread( target=buffer, args=(self.__dict__, ), daemon=True) - self.thread.start() + self._thread.start() # Update time - self.time = time.time() + self._time = time.time() def getch(self) -> str: """Return single Character from buffer.""" now = time.time() - if now < self.time + .1: # Wait for 100 milliseconds + if now < self._time + .1: # Wait for 100 milliseconds return '' # Update time - self.time = now + self._time = now try: - return self.buffer.get_nowait() + return self._buffer.get_nowait() except Empty: return '' - def stop(self) -> None: + def stop_getch(self) -> None: """Stop handling of :meth:`.getch` requests.""" - # Reset terminal mode to previous tty attributes - TCSAFLUSH = getattr(self.ttylib, 'TCSAFLUSH') - tcsetattr = getattr(self.ttylib, 'tcsetattr') - tcsetattr(self.fdesc, TCSAFLUSH, self.curterm) + self._resume = False # Stop thread from reading characters + +# +# Functions +# + +def get_lib() -> Module: + """Get module for TTY I/O control. - # Stop thread from reading characters - self.resume = False + Depending on the plattform the module within the standard library, which is + required for tty I/O control differs. The module :py:mod:`termios` provides + an interface to the POSIX calls for tty I/O control. The module + :py:mod:`msvcrt` provides access to some useful capabilities on Windows + platforms. + + Returns: + Reference to module for tty I/O control or None, if the module could + not be determined. + + """ + libs = ['msvcrt', 'termios'] + for name in libs: + module = entity.get_module(name) + if module: + return module + raise ImportError("no module for TTY I/O could be imported") -def getch_class() -> GetchBase: +def get_class() -> type: """Get platform specific class to handle getch() requests. This implementation supports Microsoft Windows by using the Microsoft Visual @@ -170,13 +224,15 @@ def getch_class() -> GetchBase: """ # Get platform specific tty I/O module. - ref = get_lib() - if not ref: - raise ImportError("no module for tty I/O could be imported") - cname = 'Getch' + ref.__name__.capitalize() + module = get_lib() + mname = module.__name__ + cname = 'TTY' + mname.capitalize() if not cname in globals() or not callable(globals()[cname]): - raise RuntimeError( - f"tty I/O module '{ref.__name__}' is not supported") + raise RuntimeError(f"TTY I/O module '{mname}' is not supported") return globals()[cname] -Getch: GetchBase = getch_class() +def get_instance() -> TTYBase: + """Get current terminal instance.""" + if not '_tty' in globals(): + globals()['_tty'] = get_class()() + return globals()['_tty'] diff --git a/nemoa/model/morphisms/base.py b/nemoa/model/morphisms/base.py index 00fad33c..db6369b1 100644 --- a/nemoa/model/morphisms/base.py +++ b/nemoa/model/morphisms/base.py @@ -25,14 +25,20 @@ def get(self, key, *args, **kwds): # algorithms if key == 'algorithm': return self._get_algorithm(*args, **kwds) - if key == 'algorithms': return self._get_algorithms( - attribute = 'about', *args, **kwds) - - if key == 'data': return self._get_data(*args, **kwds) - if key == 'epoch': return self._get_epoch() - if key == 'estimatetime': return self._get_estimatetime() - if key == 'progress': return self._get_progress() - if key == 'model': return self._get_model() + if key == 'algorithms': + return self._get_algorithms( + attribute = 'about', *args, **kwds) + + if key == 'data': + return self._get_data(*args, **kwds) + if key == 'epoch': + return self._get_epoch() + if key == 'estimatetime': + return self._get_estimatetime() + if key == 'progress': + return self._get_progress() + if key == 'model': + return self._get_model() if key in self._buffer: return self._buffer[key] @@ -257,7 +263,6 @@ def optimize(self, config = None, **kwds): retval &= self.model.network.initialize(self.model.system) except KeyboardInterrupt: retval = False - finally: nemoa.set('shell', 'buffmode', 'line') return retval diff --git a/nemoa/session/classes/base.py b/nemoa/session/classes/base.py index 687db4d9..0df64c6e 100644 --- a/nemoa/session/classes/base.py +++ b/nemoa/session/classes/base.py @@ -259,7 +259,7 @@ def _get_list_workspaces(self, base=None): return sorted(workspaces) - def _get_shell(self, key = None, *args, **kwds): + def _get_shell(self, key=None, *args, **kwds): """Get shell attribute.""" if key == 'inkey': @@ -467,7 +467,8 @@ def _get_workspace(self): def log(self, *args, **kwds): """Log message to file and console output.""" - if not args: return True + if not args: + return True mode = self._get_mode() obj = args[0] @@ -630,36 +631,6 @@ def _init_logging(self): return True - # def _init_exception_handler(self): - # """Initialize exception handler.""" - # - # from functools import wraps - # import sys - # - # def bypass(func): - # - # @wraps(func) - # def wrapper(*args, **kwds): - # print('hook from Python') - # - # exc_info = sys.exc_info() - # msg = exc_info[1] - # log.exception(msg, exc_info=exc_info) - # return func(*args, **kwds) - # - # return wrapper - # # - # # # pipe exceptions to logfile - # # def hook(*args, **kwds): - # # exc_info = sys.exc_info() - # # msg = exc_info[1] - # # log.exception(msg, exc_info=exc_info) - # # return sys.__excepthook__(*args, **kwds) - # - # sys.excepthook = bypass(sys.excepthook) - # - # return True - def run(self, script = None, *args, **kwds): """Run python script.""" @@ -709,8 +680,10 @@ def run(self, script = None, *args, **kwds): def set(self, key, *args, **kwds): """Set configuration parameters and env vars.""" - if key == 'shell': return self._set_shell(*args, **kwds) - if key == 'mode': return self._set_mode(*args, **kwds) + if key == 'shell': + return self._set_shell(*args, **kwds) + if key == 'mode': + return self._set_mode(*args, **kwds) if key == 'workspace': return self._set_workspace(*args, **kwds) @@ -726,22 +699,18 @@ def _set_shell(self, key, *args, **kwds): def _set_shell_buffmode(self, mode='line'): """Set current key buffer mode.""" + terminal = tty.get_instance() # type: ignore - curmode = self._get_shell_buffmode() - - if mode == curmode: + if mode == 'key': + terminal.set_mode('key') + terminal.start_getch() return True - if curmode == 'line' and mode == 'key': - if not self._buffer.get('inkey', None): - self._buffer['inkey'] = tty.Getch() # type: ignore - self._buffer['inkey'].start() + if mode == 'line': + terminal.stop_getch() + terminal.set_mode('line') return True - if curmode == 'key' and mode == 'line': - self._buffer['inkey'].stop() - del self._buffer['inkey'] - return True return False def _set_mode(self, mode = None, *args, **kwds): diff --git a/nemoa/test.py b/nemoa/test.py index bee0591f..13692614 100644 --- a/nemoa/test.py +++ b/nemoa/test.py @@ -41,6 +41,10 @@ class Case(NamedTuple): class BaseTestCase(TestCase): """Custom testcase.""" + def assertIsSubclass(self, cls: type, supercls: type) -> None: + """Assert that a class is a subclass of another.""" + self.assertTrue(issubclass(cls, supercls)) + def assertAllIn(self, func: AnyFunc, cases: Cases) -> None: """Assert that all function evaluations are in the given values.""" for case in cases: