Skip to content

Commit

Permalink
fixes #470
Browse files Browse the repository at this point in the history
  • Loading branch information
fishroot committed Nov 25, 2018
1 parent 9e5905a commit 312b3c7
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 157 deletions.
2 changes: 1 addition & 1 deletion nemoa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
https://en.wikipedia.org/wiki/Structured_data_analysis_(statistics)
"""
__version__ = '0.5.292'
__version__ = '0.5.294'
__status__ = 'Development'
__description__ = 'Network-based Modeling and Data Analysis'
__url__ = 'https://frootlab.github.io/nemoa'
Expand Down
18 changes: 15 additions & 3 deletions nemoa/base/__test__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tempfile
import datetime
from pathlib import Path
from unittest import skipIf
from nemoa.base import entity, binary, check, env, literal, this, tty
from nemoa.base import nbase, ndict
from nemoa.test import ModuleTestCase, Case
Expand All @@ -20,6 +21,7 @@
#

osname = env.get_osname()
ttylib = tty.get_lib().__name__

#
# Test Cases
Expand Down Expand Up @@ -648,9 +650,19 @@ class TestTTY(ModuleTestCase):
def test_get_lib(self) -> None:
self.assertIsInstance(tty.get_lib(), Module)

def test_Getch(self) -> None:
obj = tty.Getch() if callable(tty.Getch) else None
self.assertIsInstance(obj, tty.GetchBase)
def test_get_class(self) -> None:
self.assertIsSubclass(tty.get_class(), tty.TTYBase)

def test_get_instance(self) -> None:
self.assertIsInstance(tty.get_instance(), tty.TTYBase)

@skipIf(ttylib != 'msvcrt', 'Requires Windows')
def test_TTYMsvcrt(self) -> None:
pass

@skipIf(ttylib != 'termios', 'Requires Unix')
def test_TTYTermios(self) -> None:
pass

class TestLiteral(ModuleTestCase):
"""Testcase for the module nemoa.base.literal."""
Expand Down
254 changes: 155 additions & 99 deletions nemoa/base/tty.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,156 +12,210 @@
from queue import Empty, Queue
from threading import Thread
from nemoa.base import entity, env
from nemoa.types import Module, OptModule, ClassVar
from nemoa.types import Any, Module, ClassVar, Exc, ExcType, Traceback, Method
from nemoa.types import OptStr

def get_lib() -> OptModule:
"""Get module for tty I/O control.
#
# TTY Classes
#

Depending on the plattform the module within the standard library, which is
required for tty I/O control differs. The module :py:mod:`termios` provides
an interface to the POSIX calls for tty I/O control. The module
:py:mod:`msvcrt` provides access to some useful capabilities on Windows
platforms.
class TTYBase(ABC):
"""Abstract base class for text terminals."""

Returns:
Reference to module for tty I/O control or None, if the module could
not be determined.
"""
libs = ['msvcrt', 'termios']
for name in libs:
ref = entity.get_module(name)
if ref:
return ref
return None
_ttylib: Module
_cur_attr: Any

class GetchBase(ABC):
"""Abstract base class for Getch classes."""
def __init__(self, mode: OptStr = None) -> None:
"""Modify terminal attributes."""
self._ttylib = get_lib()
self._cur_attr = self.get_attr()
if mode:
self.set_mode(mode)

ttylib: Module
def __enter__(self) -> 'TTYBase':
return self

def __init__(self) -> None:
"""Initialize instance."""
self.ttylib = get_lib()
self.start()
def __exit__(self, cls: ExcType, obj: Exc, tb: Traceback) -> None:
"""Reset current terminal attributes."""
self.reset()

def __del__(self) -> None:
"""Release resources required for handling :meth:`.getch` requests."""
self.stop()
self.ttylib = None
"""Reset current terminal attributes."""
self.reset()
if hasattr(self, 'ttylib'):
del self._ttylib

def reset(self) -> None:
"""Reset current terminal attributes to it's initial value."""
if hasattr(self, 'cur_attr'):
self.set_attr(self._cur_attr)

@abstractmethod
def start(self) -> None:
"""Start handling of :meth:`.getch` requests."""
def get_attr(self) -> Any:
"""Get current terminal attributes."""
raise NotImplementedError()

@abstractmethod
def getch(self) -> str:
"""Get character from TTY."""
def set_attr(self, attr: Any) -> None:
"""Set current terminal attributes."""
raise NotImplementedError()

@abstractmethod
def stop(self) -> None:
"""Stop handling of :meth:`.getch` requests."""
def set_mode(self, mode: str) -> None:
"""Set current terminal mode."""
raise NotImplementedError()

class GetchMsvcrt(GetchBase):
@abstractmethod
def getch(self) -> str:
"""Get character from TTY."""
raise NotImplementedError()

class TTYMsvcrt(TTYBase):
"""Windows/msvcrt implementation of Getch.
This implementation supports Microsoft Windows by using the Microsoft Visual
C/C++ Runtime Library for Python :py:mod:`msvcrt`.
"""

encoding: ClassVar[str] = env.get_encoding()
_encoding: ClassVar[str] = env.get_encoding()

def start(self) -> None:
"""Start handling of :meth:`.getch` requests."""
pass
def get_attr(self) -> Any:
"""Get attributes of current terminal."""
return None

def getch(self) -> str:
"""Get character from tty."""
if not isinstance(self.ttylib, Module):
return ''
if not getattr(self.ttylib, 'kbhit')():
return ''
return str(getattr(self.ttylib, 'getch')(), self.encoding)
def set_attr(self, attr: Any) -> None:
"""Set attributes of current terminal."""
pass

def stop(self) -> None:
"""Stop handling of :meth:`.getch` requests."""
def set_mode(self, mode: str) -> None:
"""Set mode of current terminal."""
pass

class GetchTermios(GetchBase):
def getch(self) -> str:
"""Get character from TTY."""
return str(getattr(self._ttylib, 'getch')(), self._encoding)

class TTYTermios(TTYBase):
"""Unix/Termios implementation of Getch.
This implementation supports Unix-like systems by using the Unix Terminal
I/O API for Python :py:mod:`termios`.
"""

buffer: Queue
runsignal: bool
time: float
curterm: list
fdesc: int
thread: Thread

def start(self) -> None:
"""Change terminal mode and start reading stdin to buffer."""
# Get current tty attributes
tcgetattr = getattr(self.ttylib, 'tcgetattr')
self.fdesc = sys.stdin.fileno()
self.curterm = tcgetattr(self.fdesc)

# Modify lflag from current TTY attributes
# to set terminal to unbuffered mode (not waiting for Enter)
newattr = tcgetattr(self.fdesc)
if isinstance(newattr[3], int):
ECHO = getattr(self.ttylib, 'ECHO')
ICANON = getattr(self.ttylib, 'ICANON')
newattr[3] = newattr[3] & ~ICANON & ~ECHO
tcsetattr = getattr(self.ttylib, 'tcsetattr')
TCSAFLUSH = getattr(self.ttylib, 'TCSAFLUSH')
tcsetattr(self.fdesc, TCSAFLUSH, newattr)
_fd: int
_tcgetattr: Method
_tcsetattr: Method
_buffer: Queue
_resume: bool
_thread: Thread
_time: float

def __init__(self, mode: OptStr = None) -> None:
self._fd = sys.stdin.fileno()
super().__init__(mode)

def get_attr(self) -> Any:
"""Get attributes of current terminal."""
try:
return self._tcgetattr(self._fd)
except AttributeError:
self._tcgetattr = getattr(self._ttylib, 'tcgetattr')
return self._tcgetattr(self._fd)

def set_attr(self, attr: Any) -> None:
"""Set attributes of current terminal."""
TCSAFLUSH = getattr(self._ttylib, 'TCSAFLUSH')
try:
return self._tcsetattr(self._fd, TCSAFLUSH, attr)
except AttributeError:
self._tcsetattr = getattr(self._ttylib, 'tcsetattr')
self._tcsetattr(self._fd, TCSAFLUSH, attr)

def set_mode(self, mode: str) -> None:
"""Set mode of current terminal."""
# Buffered terminal for 'line'-mode:
# Echo Chars; Wait for Enter
if mode == 'line':
# Modify lflag from current TTY attributes
attr = self._cur_attr.copy()
if isinstance(attr[3], int):
ECHO = getattr(self._ttylib, 'ECHO')
ICANON = getattr(self._ttylib, 'ICANON')
attr[3] = attr[3] | ICANON | ECHO
self.set_attr(attr)
# Unbufered terminal for 'key'-mode:
# No Echo; Don't wait for Enter
elif mode == 'key':
# Modify lflag from current TTY attributes
attr = self._cur_attr.copy()
if isinstance(attr[3], int):
ECHO = getattr(self._ttylib, 'ECHO')
ICANON = getattr(self._ttylib, 'ICANON')
attr[3] = attr[3] & ~ICANON & ~ECHO
self.set_attr(attr)

def start_getch(self) -> None:
"""Start handling of :meth:`.getch` requests."""
# Initialize buffer and start thread for reading stdio to buffer
def buffer(attr: dict) -> None:
while attr['resume']:
attr['buffer'].put(sys.stdin.read(1))
self.resume = True
self.buffer = Queue()
self.thread = Thread(
while attr['_resume']:
attr['_buffer'].put(sys.stdin.read(1))

self._resume = True
self._buffer = Queue()
self._thread = Thread(
target=buffer, args=(self.__dict__, ), daemon=True)
self.thread.start()
self._thread.start()

# Update time
self.time = time.time()
self._time = time.time()

def getch(self) -> str:
"""Return single Character from buffer."""
now = time.time()
if now < self.time + .1: # Wait for 100 milliseconds
if now < self._time + .1: # Wait for 100 milliseconds
return ''

# Update time
self.time = now
self._time = now

try:
return self.buffer.get_nowait()
return self._buffer.get_nowait()
except Empty:
return ''

def stop(self) -> None:
def stop_getch(self) -> None:
"""Stop handling of :meth:`.getch` requests."""
# Reset terminal mode to previous tty attributes
TCSAFLUSH = getattr(self.ttylib, 'TCSAFLUSH')
tcsetattr = getattr(self.ttylib, 'tcsetattr')
tcsetattr(self.fdesc, TCSAFLUSH, self.curterm)
self._resume = False # Stop thread from reading characters

#
# Functions
#

def get_lib() -> Module:
"""Get module for TTY I/O control.
# Stop thread from reading characters
self.resume = False
Depending on the plattform the module within the standard library, which is
required for tty I/O control differs. The module :py:mod:`termios` provides
an interface to the POSIX calls for tty I/O control. The module
:py:mod:`msvcrt` provides access to some useful capabilities on Windows
platforms.
Returns:
Reference to module for tty I/O control or None, if the module could
not be determined.
"""
libs = ['msvcrt', 'termios']
for name in libs:
module = entity.get_module(name)
if module:
return module
raise ImportError("no module for TTY I/O could be imported")

def getch_class() -> GetchBase:
def get_class() -> type:
"""Get platform specific class to handle getch() requests.
This implementation supports Microsoft Windows by using the Microsoft Visual
Expand All @@ -170,13 +224,15 @@ def getch_class() -> GetchBase:
"""
# Get platform specific tty I/O module.
ref = get_lib()
if not ref:
raise ImportError("no module for tty I/O could be imported")
cname = 'Getch' + ref.__name__.capitalize()
module = get_lib()
mname = module.__name__
cname = 'TTY' + mname.capitalize()
if not cname in globals() or not callable(globals()[cname]):
raise RuntimeError(
f"tty I/O module '{ref.__name__}' is not supported")
raise RuntimeError(f"TTY I/O module '{mname}' is not supported")
return globals()[cname]

Getch: GetchBase = getch_class()
def get_instance() -> TTYBase:
"""Get current terminal instance."""
if not '_tty' in globals():
globals()['_tty'] = get_class()()
return globals()['_tty']
Loading

0 comments on commit 312b3c7

Please sign in to comment.